diff --git a/internal/msggateway/compressor_test.go b/internal/msggateway/compressor_test.go index b1544f063..bb7106d9f 100644 --- a/internal/msggateway/compressor_test.go +++ b/internal/msggateway/compressor_test.go @@ -37,10 +37,16 @@ func TestCompressDecompress(t *testing.T) { // compress dest, err := compressor.CompressWithPool(src) + if err != nil { + t.Log(err) + } assert.Equal(t, nil, err) // decompress res, err := compressor.DecompressWithPool(dest) + if err != nil { + t.Log(err) + } assert.Equal(t, nil, err) // check @@ -60,10 +66,16 @@ func TestCompressDecompressWithConcurrency(t *testing.T) { // compress dest, err := compressor.CompressWithPool(src) + if err != nil { + t.Log(err) + } assert.Equal(t, nil, err) // decompress res, err := compressor.DecompressWithPool(dest) + if err != nil { + t.Log(err) + } assert.Equal(t, nil, err) // check @@ -99,6 +111,7 @@ func BenchmarkDecompress(b *testing.B) { compressor := NewGzipCompressor() comdata, err := compressor.Compress(src) + assert.Equal(b, nil, err) for i := 0; i < b.N; i++ { diff --git a/internal/msggateway/init.go b/internal/msggateway/init.go index b18efcd50..dda2d5e5d 100644 --- a/internal/msggateway/init.go +++ b/internal/msggateway/init.go @@ -23,14 +23,7 @@ import ( // RunWsAndServer run ws server. func RunWsAndServer(rpcPort, wsPort, prometheusPort int) error { - fmt.Println( - "start rpc/msg_gateway server, port: ", - rpcPort, - wsPort, - prometheusPort, - ", OpenIM version: ", - config.Version, - ) + fmt.Println("start rpc/msg_gateway server, port: ", rpcPort, wsPort, prometheusPort, ", OpenIM version: ",config.Version) longServer, err := NewWsServer( WithPort(wsPort), WithMaxConnNum(int64(config.Config.LongConnSvr.WebsocketMaxConnNum)), diff --git a/internal/msggateway/long_conn.go b/internal/msggateway/long_conn.go index d475aa52b..7dc79c834 100644 --- a/internal/msggateway/long_conn.go +++ b/internal/msggateway/long_conn.go @@ -74,7 +74,8 @@ func (d *GWebSocket) GenerateLongConn(w http.ResponseWriter, r *http.Request) er conn, err := upgrader.Upgrade(w, r, nil) if err != nil { - return err + // The upgrader.Upgrade method usually returns enough error messages to diagnose problems that may occur during the upgrade + return errs.Wrap(err, "GenerateLongConn: WebSocket upgrade failed") } d.conn = conn return nil @@ -99,15 +100,15 @@ func (d *GWebSocket) SetReadDeadline(timeout time.Duration) error { func (d *GWebSocket) SetWriteDeadline(timeout time.Duration) error { // TODO add error - if timeout <= 0 { - return errs.Wrap(errors.New("timeout must be greater than 0")) - } + if timeout <= 0 { + return errs.Wrap(errors.New("timeout must be greater than 0")) + } // TODO SetWriteDeadline Future add error handling if err := d.conn.SetWriteDeadline(time.Now().Add(timeout)); err != nil { - return errs.Wrap(err, "GWebSocket.SetWriteDeadline failed") - } - return nil + return errs.Wrap(err, "GWebSocket.SetWriteDeadline failed") + } + return nil } func (d *GWebSocket) Dial(urlStr string, requestHeader http.Header) (*http.Response, error) { diff --git a/internal/msggateway/message_handler.go b/internal/msggateway/message_handler.go index dd5e00f18..24198f465 100644 --- a/internal/msggateway/message_handler.go +++ b/internal/msggateway/message_handler.go @@ -20,6 +20,7 @@ import ( "github.com/OpenIMSDK/protocol/push" "github.com/OpenIMSDK/tools/discoveryregistry" + "github.com/OpenIMSDK/tools/errs" "github.com/go-playground/validator/v10" "google.golang.org/protobuf/proto" @@ -119,18 +120,18 @@ func NewGrpcHandler(validate *validator.Validate, client discoveryregistry.SvcDi func (g GrpcHandler) GetSeq(context context.Context, data *Req) ([]byte, error) { req := sdkws.GetMaxSeqReq{} if err := proto.Unmarshal(data.Data, &req); err != nil { - return nil, err + return nil, errs.Wrap(err, "GetSeq: error unmarshaling request") } if err := g.validate.Struct(&req); err != nil { - return nil, err + return nil, errs.Wrap(err, "GetSeq: validation failed") } resp, err := g.msgRpcClient.GetMaxSeq(context, &req) if err != nil { - return nil, err + return nil, errs.Wrap(err, "GetSeq: error calling GetMaxSeq on msgRpcClient") } c, err := proto.Marshal(resp) if err != nil { - return nil, err + return nil, errs.Wrap(err, "GetSeq: error marshaling response") } return c, nil } diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 9b1b853f2..b5f8516f8 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -45,8 +45,13 @@ import ( ) type MsgTransfer struct { - historyCH *OnlineHistoryRedisConsumerHandler // 这个消费者聚合消息, 订阅的topic:ws2ms_chat, 修改通知发往msg_to_modify topic, 消息存入redis后Incr Redis, 再发消息到ms2pschat topic推送, 发消息到msg_to_mongo topic持久化 - historyMongoCH *OnlineHistoryMongoConsumerHandler // mongoDB批量插入, 成功后删除redis中消息,以及处理删除通知消息删除的 订阅的topic: msg_to_mongo + // This consumer aggregated messages, subscribed to the topic:ws2ms_chat, + // the modification notification is sent to msg_to_modify topic, the message is stored in redis, Incr Redis, + // and then the message is sent to ms2pschat topic for push, and the message is sent to msg_to_mongo topic for persistence + historyCH *OnlineHistoryRedisConsumerHandler + // mongoDB batch insert, delete messages in redis after success, + // and handle the deletion notification message deleted subscriptions topic: msg_to_mongo + historyMongoCH *OnlineHistoryMongoConsumerHandler ctx context.Context cancel context.CancelFunc } @@ -65,6 +70,7 @@ func StartTransfer(prometheusPort int) error { if err = mongo.CreateMsgIndex(); err != nil { return err } + client, err := kdisc.NewDiscoveryRegister(config.Config.Envs.Discovery) if err != nil { return err @@ -73,6 +79,7 @@ func StartTransfer(prometheusPort int) error { if err := client.CreateRpcRootNodes(config.Config.GetServiceNames()); err != nil { return err } + client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) msgModel := cache.NewMsgCacheModel(rdb) msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase()) @@ -106,7 +113,7 @@ func NewMsgTransfer(msgDatabase controller.CommonMsgDatabase, conversationRpcCli } func (m *MsgTransfer) Start(prometheusPort int) error { - fmt.Println("start msg transfer", "prometheusPort:", prometheusPort) + fmt.Println("Start msg transfer", "prometheusPort:", prometheusPort) if prometheusPort <= 0 { return errs.Wrap(errors.New("prometheusPort not correct")) } diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index be7c36718..5023ac008 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -155,15 +155,7 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) { notStorageNotificationList, ) if err := och.msgDatabase.MsgToModifyMQ(ctx, msgChannelValue.uniqueKey, conversationIDNotification, modifyMsgList); err != nil { - log.ZError( - ctx, - "msg to modify mq error", - err, - "uniqueKey", - msgChannelValue.uniqueKey, - "modifyMsgList", - modifyMsgList, - ) + log.ZError(ctx, "msg to modify mq error", err, "uniqueKey", msgChannelValue.uniqueKey, "modifyMsgList", modifyMsgList) } } } diff --git a/internal/push/callback.go b/internal/push/callback.go index a572fa572..5ce2b7049 100644 --- a/internal/push/callback.go +++ b/internal/push/callback.go @@ -31,12 +31,7 @@ func url() string { return config.Config.Callback.CallbackUrl } -func callbackOfflinePush( - ctx context.Context, - userIDs []string, - msg *sdkws.MsgData, - offlinePushUserIDs *[]string, -) error { +func callbackOfflinePush(ctx context.Context, userIDs []string, msg *sdkws.MsgData, offlinePushUserIDs *[]string) error { if !config.Config.Callback.CallbackOfflinePush.Enable || msg.ContentType == constant.Typing { return nil } @@ -59,10 +54,12 @@ func callbackOfflinePush( AtUserIDs: msg.AtUserIDList, Content: GetContent(msg), } + resp := &callbackstruct.CallbackBeforePushResp{} if err := http.CallBackPostReturn(ctx, url(), req, resp, config.Config.Callback.CallbackOfflinePush); err != nil { return err } + if len(resp.UserIDs) != 0 { *offlinePushUserIDs = resp.UserIDs } diff --git a/pkg/common/http/http_client.go b/pkg/common/http/http_client.go index 7fc456a1d..cd1135633 100644 --- a/pkg/common/http/http_client.go +++ b/pkg/common/http/http_client.go @@ -25,7 +25,6 @@ import ( "github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/log" - "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" "github.com/openimsdk/open-im-server/v3/pkg/common/config" ) @@ -102,7 +101,10 @@ func PostReturn(ctx context.Context, url string, header map[string]string, input return err } err = json.Unmarshal(b, output) - return err + if err != nil { + return errs.Wrap(err, "PostReturn: JSON unmarshal failed") + } + return nil } func callBackPostReturn(ctx context.Context, url, command string, input interface{}, output callbackstruct.CallbackResp, callbackConfig config.CallBackConfig) error { @@ -127,7 +129,6 @@ func callBackPostReturn(ctx context.Context, url, command string, input interfac } if err := output.Parse(); err != nil { log.ZWarn(ctx, "callback parse failed", err, "url", url, "input", input, "response", string(b)) - return err } log.ZInfo(ctx, "callback success", "url", url, "input", input, "response", string(b)) return nil diff --git a/pkg/common/kafka/consumer.go b/pkg/common/kafka/consumer.go index 98adfdcf1..d0e06d482 100644 --- a/pkg/common/kafka/consumer.go +++ b/pkg/common/kafka/consumer.go @@ -17,9 +17,8 @@ package kafka import ( "sync" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/IBM/sarama" + "github.com/OpenIMSDK/tools/errs" ) type Consumer struct { @@ -30,28 +29,33 @@ type Consumer struct { Consumer sarama.Consumer } -func NewKafkaConsumer(addr []string, topic string) *Consumer { - p := Consumer{} - p.Topic = topic - p.addr = addr - consumerConfig := sarama.NewConfig() - if config.Config.Kafka.Username != "" && config.Config.Kafka.Password != "" { - consumerConfig.Net.SASL.Enable = true - consumerConfig.Net.SASL.User = config.Config.Kafka.Username - consumerConfig.Net.SASL.Password = config.Config.Kafka.Password +func NewKafkaConsumer(addr []string, topic string, kafkaConfig *sarama.Config) (*Consumer, error) { + p := Consumer{ + Topic: topic, + addr: addr, + } + + if kafkaConfig.Net.SASL.User != "" && kafkaConfig.Net.SASL.Password != "" { + kafkaConfig.Net.SASL.Enable = true + } + + err := SetupTLSConfig(kafkaConfig) + if err != nil { + return nil, err } - SetupTLSConfig(consumerConfig) - consumer, err := sarama.NewConsumer(p.addr, consumerConfig) + + consumer, err := sarama.NewConsumer(p.addr, kafkaConfig) if err != nil { - panic(err.Error()) + return nil, errs.Wrap(err, "NewKafkaConsumer: creating consumer failed") } p.Consumer = consumer partitionList, err := consumer.Partitions(p.Topic) if err != nil { - panic(err.Error()) + return nil, errs.Wrap(err, "NewKafkaConsumer: getting partitions failed") } p.PartitionList = partitionList - return &p + return &p, nil + } diff --git a/pkg/common/kafka/producer.go b/pkg/common/kafka/producer.go index bdb839404..54d99a043 100644 --- a/pkg/common/kafka/producer.go +++ b/pkg/common/kafka/producer.go @@ -94,7 +94,6 @@ func NewKafkaProducer(addr []string, topic string) (*Producer, error) { } time.Sleep(1 * time.Second) // Wait before retrying } - // Panic if unable to create producer after retries if err != nil { return nil, errs.Wrap(errors.New("failed to create Kafka producer: " + err.Error())) diff --git a/pkg/common/kafka/util.go b/pkg/common/kafka/util.go index f318ecf73..bdbe56fec 100644 --- a/pkg/common/kafka/util.go +++ b/pkg/common/kafka/util.go @@ -20,22 +20,28 @@ import ( "strings" "github.com/IBM/sarama" + "github.com/OpenIMSDK/tools/errs" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/tls" ) // SetupTLSConfig set up the TLS config from config file. -func SetupTLSConfig(cfg *sarama.Config) { +func SetupTLSConfig(cfg *sarama.Config) error { if config.Config.Kafka.TLS != nil { cfg.Net.TLS.Enable = true - cfg.Net.TLS.Config = tls.NewTLSConfig( + tlsConfig, err := tls.NewTLSConfig( config.Config.Kafka.TLS.ClientCrt, config.Config.Kafka.TLS.ClientKey, config.Config.Kafka.TLS.CACrt, []byte(config.Config.Kafka.TLS.ClientKeyPwd), ) + if err != nil { + return errs.Wrap(err, "SetupTLSConfig: failed to set up TLS config") + } + cfg.Net.TLS.Config = tlsConfig } + return nil } // getEnvOrConfig returns the value of the environment variable if it exists, diff --git a/pkg/common/tls/tls.go b/pkg/common/tls/tls.go index dba49e605..a52f46df7 100755 --- a/pkg/common/tls/tls.go +++ b/pkg/common/tls/tls.go @@ -21,6 +21,7 @@ import ( "errors" "os" + "github.com/OpenIMSDK/tools/errs" "github.com/openimsdk/open-im-server/v3/pkg/common/config" ) @@ -49,37 +50,41 @@ func readEncryptablePEMBlock(path string, pwd []byte) ([]byte, error) { } // NewTLSConfig setup the TLS config from general config file. -func NewTLSConfig(clientCertFile, clientKeyFile, caCertFile string, keyPwd []byte) *tls.Config { - tlsConfig := tls.Config{} +func NewTLSConfig(clientCertFile, clientKeyFile, caCertFile string, keyPwd []byte) (*tls.Config, error) { + var tlsConfig tls.Config if clientCertFile != "" && clientKeyFile != "" { certPEMBlock, err := os.ReadFile(clientCertFile) if err != nil { - panic(err) + return nil, errs.Wrap(err, "NewTLSConfig: failed to read client cert file") } + keyPEMBlock, err := readEncryptablePEMBlock(clientKeyFile, keyPwd) if err != nil { - panic(err) + return nil, err } + cert, err := tls.X509KeyPair(certPEMBlock, keyPEMBlock) if err != nil { - panic(err) + return nil, errs.Wrap(err, "NewTLSConfig: failed to create X509 key pair") } tlsConfig.Certificates = []tls.Certificate{cert} } - caCert, err := os.ReadFile(caCertFile) - if err != nil { - panic(err) - } - caCertPool := x509.NewCertPool() - ok := caCertPool.AppendCertsFromPEM(caCert) - if !ok { - panic(errors.New("not a valid CA cert")) + if caCertFile != "" { + caCert, err := os.ReadFile(caCertFile) + if err != nil { + return nil, errs.Wrap(err, "NewTLSConfig: failed to read CA cert file") + } + + caCertPool := x509.NewCertPool() + if ok := caCertPool.AppendCertsFromPEM(caCert); !ok { + return nil, errors.New("NewTLSConfig: not a valid CA cert") + } + tlsConfig.RootCAs = caCertPool } - tlsConfig.RootCAs = caCertPool tlsConfig.InsecureSkipVerify = config.Config.Kafka.TLS.InsecureSkipVerify - return &tlsConfig + return &tlsConfig, nil } diff --git a/pkg/rpcclient/conversation.go b/pkg/rpcclient/conversation.go index 3ba8dd8c0..ecbe86d95 100644 --- a/pkg/rpcclient/conversation.go +++ b/pkg/rpcclient/conversation.go @@ -23,6 +23,7 @@ import ( pbconversation "github.com/OpenIMSDK/protocol/conversation" "github.com/OpenIMSDK/tools/discoveryregistry" "github.com/OpenIMSDK/tools/errs" + util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" "github.com/openimsdk/open-im-server/v3/pkg/common/config" ) @@ -36,7 +37,7 @@ type Conversation struct { func NewConversation(discov discoveryregistry.SvcDiscoveryRegistry) *Conversation { conn, err := discov.GetConn(context.Background(), config.Config.RpcRegisterName.OpenImConversationName) if err != nil { - panic(err) + util.ExitWithError(err) } client := pbconversation.NewConversationClient(conn) return &Conversation{discov: discov, conn: conn, Client: client} diff --git a/pkg/rpcclient/friend.go b/pkg/rpcclient/friend.go index b84db40d4..39026b10c 100644 --- a/pkg/rpcclient/friend.go +++ b/pkg/rpcclient/friend.go @@ -24,6 +24,7 @@ import ( "github.com/OpenIMSDK/tools/discoveryregistry" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" ) type Friend struct { @@ -35,7 +36,7 @@ type Friend struct { func NewFriend(discov discoveryregistry.SvcDiscoveryRegistry) *Friend { conn, err := discov.GetConn(context.Background(), config.Config.RpcRegisterName.OpenImFriendName) if err != nil { - panic(err) + util.ExitWithError(err) } client := friend.NewFriendClient(conn) return &Friend{discov: discov, conn: conn, Client: client} @@ -62,7 +63,7 @@ func (f *FriendRpcClient) GetFriendsInfo( return } -// possibleFriendUserID是否在userID的好友中. +// possibleFriendUserID Is PossibleFriendUserId's friends. func (f *FriendRpcClient) IsFriend(ctx context.Context, possibleFriendUserID, userID string) (bool, error) { resp, err := f.Client.IsFriend(ctx, &friend.IsFriendReq{UserID1: userID, UserID2: possibleFriendUserID}) if err != nil { diff --git a/pkg/rpcclient/group.go b/pkg/rpcclient/group.go index bf0efe60c..cb61579be 100644 --- a/pkg/rpcclient/group.go +++ b/pkg/rpcclient/group.go @@ -28,6 +28,7 @@ import ( "github.com/OpenIMSDK/tools/utils" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" ) type Group struct { @@ -39,7 +40,7 @@ type Group struct { func NewGroup(discov discoveryregistry.SvcDiscoveryRegistry) *Group { conn, err := discov.GetConn(context.Background(), config.Config.RpcRegisterName.OpenImGroupName) if err != nil { - panic(err) + util.ExitWithError(err) } client := group.NewGroupClient(conn) return &Group{discov: discov, conn: conn, Client: client} diff --git a/pkg/rpcclient/push.go b/pkg/rpcclient/push.go index 6d0876972..b5a5c49cf 100644 --- a/pkg/rpcclient/push.go +++ b/pkg/rpcclient/push.go @@ -23,6 +23,7 @@ import ( "github.com/OpenIMSDK/tools/discoveryregistry" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" ) type Push struct { @@ -34,7 +35,7 @@ type Push struct { func NewPush(discov discoveryregistry.SvcDiscoveryRegistry) *Push { conn, err := discov.GetConn(context.Background(), config.Config.RpcRegisterName.OpenImPushName) if err != nil { - panic(err) + util.ExitWithError(err) } return &Push{ discov: discov, @@ -49,9 +50,6 @@ func NewPushRpcClient(discov discoveryregistry.SvcDiscoveryRegistry) PushRpcClie return PushRpcClient(*NewPush(discov)) } -func (p *PushRpcClient) DelUserPushToken( - ctx context.Context, - req *push.DelUserPushTokenReq, -) (*push.DelUserPushTokenResp, error) { +func (p *PushRpcClient) DelUserPushToken(ctx context.Context, req *push.DelUserPushTokenReq) (*push.DelUserPushTokenResp, error) { return p.Client.DelUserPushToken(ctx, req) } diff --git a/pkg/rpcclient/third.go b/pkg/rpcclient/third.go index b3557bf83..05b825060 100755 --- a/pkg/rpcclient/third.go +++ b/pkg/rpcclient/third.go @@ -24,8 +24,10 @@ import ( "github.com/OpenIMSDK/protocol/third" "github.com/OpenIMSDK/tools/discoveryregistry" + "github.com/OpenIMSDK/tools/errs" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" ) type Third struct { @@ -38,35 +40,42 @@ type Third struct { func NewThird(discov discoveryregistry.SvcDiscoveryRegistry) *Third { conn, err := discov.GetConn(context.Background(), config.Config.RpcRegisterName.OpenImThirdName) if err != nil { - panic(err) + util.ExitWithError(err) } client := third.NewThirdClient(conn) minioClient, err := minioInit() if err != nil { - panic(err) + util.ExitWithError(err) } return &Third{discov: discov, Client: client, conn: conn, MinioClient: minioClient} } func minioInit() (*minio.Client, error) { - minioClient := &minio.Client{} - initUrl := config.Config.Object.Minio.Endpoint - minioUrl, err := url.Parse(initUrl) + // Retrieve MinIO configuration details + endpoint := config.Config.Object.Minio.Endpoint + accessKeyID := config.Config.Object.Minio.AccessKeyID + secretAccessKey := config.Config.Object.Minio.SecretAccessKey + + // Parse the MinIO URL to determine if the connection should be secure + minioURL, err := url.Parse(endpoint) if err != nil { - return nil, err + return nil, errs.Wrap(err, "minioInit: failed to parse MinIO endpoint URL") } + + // Determine the security of the connection based on the scheme + secure := minioURL.Scheme == "https" + + // Setup MinIO client options opts := &minio.Options{ - Creds: credentials.NewStaticV4(config.Config.Object.Minio.AccessKeyID, config.Config.Object.Minio.SecretAccessKey, ""), - // Region: config.Config.Credential.Minio.Location, + Creds: credentials.NewStaticV4(accessKeyID, secretAccessKey, ""), + Secure: secure, } - if minioUrl.Scheme == "http" { - opts.Secure = false - } else if minioUrl.Scheme == "https" { - opts.Secure = true - } - minioClient, err = minio.New(minioUrl.Host, opts) + + // Initialize MinIO client + minioClient, err := minio.New(minioURL.Host, opts) if err != nil { - return nil, err + return nil, errs.Wrap(err, "minioInit: failed to create MinIO client") } + return minioClient, nil } diff --git a/pkg/rpcclient/user.go b/pkg/rpcclient/user.go index 451914cd3..810a88ad0 100644 --- a/pkg/rpcclient/user.go +++ b/pkg/rpcclient/user.go @@ -19,6 +19,7 @@ import ( "strings" "github.com/openimsdk/open-im-server/v3/pkg/authverify" + util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" "google.golang.org/grpc" @@ -42,7 +43,7 @@ type User struct { func NewUser(discov discoveryregistry.SvcDiscoveryRegistry) *User { conn, err := discov.GetConn(context.Background(), config.Config.RpcRegisterName.OpenImUserName) if err != nil { - panic(err) + util.ExitWithError(err) } client := user.NewUserClient(conn) return &User{Discov: discov, Client: client, conn: conn}