diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index d3b0e6a5f..9a4005e6c 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -78,16 +78,17 @@ type Client struct { token string } -func newClient(ctx *UserConnContext, conn LongConn, isCompress bool) *Client { - return &Client{ - w: new(sync.Mutex), - conn: conn, - PlatformID: utils.StringToInt(ctx.GetPlatformID()), - IsCompress: isCompress, - UserID: ctx.GetUserID(), - ctx: ctx, - } -} +// function not used +// func newClient(ctx *UserConnContext, conn LongConn, isCompress bool) *Client { +// return &Client{ +// w: new(sync.Mutex), +// conn: conn, +// PlatformID: utils.StringToInt(ctx.GetPlatformID()), +// IsCompress: isCompress, +// UserID: ctx.GetUserID(), +// ctx: ctx, +// } +// } // ResetClient updates the client's state with new connection and context information. func (c *Client) ResetClient( diff --git a/internal/msggateway/long_conn.go b/internal/msggateway/long_conn.go index 93e5cc33f..a4251a50f 100644 --- a/internal/msggateway/long_conn.go +++ b/internal/msggateway/long_conn.go @@ -108,10 +108,12 @@ func (d *GWebSocket) Dial(urlStr string, requestHeader http.Header) (*http.Respo } func (d *GWebSocket) IsNil() bool { - if d.conn != nil { - return false - } - return true + return d.conn == nil + // + // if d.conn != nil { + // return false + // } + // return true } func (d *GWebSocket) SetConnNil() { diff --git a/internal/msggateway/n_ws_server.go b/internal/msggateway/n_ws_server.go index c16da7c64..b734dee6d 100644 --- a/internal/msggateway/n_ws_server.go +++ b/internal/msggateway/n_ws_server.go @@ -61,11 +61,12 @@ type LongConnServer interface { MessageHandler } -var bufferPool = sync.Pool{ - New: func() any { - return make([]byte, 1024) - }, -} +// bufferPool is unused +// var bufferPool = sync.Pool{ +// New: func() any { +// return make([]byte, 1024) +// }, +// } type WsServer struct { port int diff --git a/internal/msggateway/user_map.go b/internal/msggateway/user_map.go index 052d7de2d..b4cec59fa 100644 --- a/internal/msggateway/user_map.go +++ b/internal/msggateway/user_map.go @@ -58,12 +58,12 @@ func (u *UserMap) Get(key string, platformID int) ([]*Client, bool, bool) { func (u *UserMap) Set(key string, v *Client) { allClients, existed := u.m.Load(key) if existed { - log.ZDebug(context.Background(), "Set existed", "user_id", key, "client", *v) + log.ZDebug(context.Background(), "Set existed", "user_id", key, "client_user_id", v.UserID) oldClients := allClients.([]*Client) oldClients = append(oldClients, v) u.m.Store(key, oldClients) } else { - log.ZDebug(context.Background(), "Set not existed", "user_id", key, "client", *v) + log.ZDebug(context.Background(), "Set not existed", "user_id", key, "client_user_id", v.UserID) var clients []*Client clients = append(clients, v) u.m.Store(key, clients) diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 16d8613db..8ef3efd83 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -71,7 +71,7 @@ func StartTransfer(prometheusPort int) error { return err } - if err := client.CreateRpcRootNodes(config.Config.GetServiceNames()); err != nil { + if err2 := client.CreateRpcRootNodes(config.Config.GetServiceNames()); err2 != nil { return err } client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 6678715d4..6f0ee7706 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -74,10 +74,10 @@ type OnlineHistoryRedisConsumerHandler struct { chArrays [ChannelNum]chan Cmd2Value msgDistributionCh chan Cmd2Value - singleMsgSuccessCount uint64 - singleMsgFailedCount uint64 - singleMsgSuccessCountMutex sync.Mutex - singleMsgFailedCountMutex sync.Mutex + // singleMsgSuccessCount uint64 + // singleMsgFailedCount uint64 + // singleMsgSuccessCountMutex sync.Mutex + // singleMsgFailedCountMutex sync.Mutex msgDatabase controller.CommonMsgDatabase conversationRpcClient *rpcclient.ConversationRpcClient @@ -111,62 +111,59 @@ func NewOnlineHistoryRedisConsumerHandler( } func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) { - for { - select { - case cmd := <-och.chArrays[channelID]: - switch cmd.Cmd { - case SourceMessages: - msgChannelValue := cmd.Value.(MsgChannelValue) - ctxMsgList := msgChannelValue.ctxMsgList - ctx := msgChannelValue.ctx - log.ZDebug( + for cmd := range och.chArrays[channelID] { + switch cmd.Cmd { + case SourceMessages: + msgChannelValue := cmd.Value.(MsgChannelValue) + ctxMsgList := msgChannelValue.ctxMsgList + ctx := msgChannelValue.ctx + log.ZDebug( + ctx, + "msg arrived channel", + "channel id", + channelID, + "msgList length", + len(ctxMsgList), + "uniqueKey", + msgChannelValue.uniqueKey, + ) + storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList, modifyMsgList := och.getPushStorageMsgList( + ctxMsgList, + ) + log.ZDebug( + ctx, + "msg lens", + "storageMsgList", + len(storageMsgList), + "notStorageMsgList", + len(notStorageMsgList), + "storageNotificationList", + len(storageNotificationList), + "notStorageNotificationList", + len(notStorageNotificationList), + "modifyMsgList", + len(modifyMsgList), + ) + conversationIDMsg := msgprocessor.GetChatConversationIDByMsg(ctxMsgList[0].message) + conversationIDNotification := msgprocessor.GetNotificationConversationIDByMsg(ctxMsgList[0].message) + och.handleMsg(ctx, msgChannelValue.uniqueKey, conversationIDMsg, storageMsgList, notStorageMsgList) + och.handleNotification( + ctx, + msgChannelValue.uniqueKey, + conversationIDNotification, + storageNotificationList, + notStorageNotificationList, + ) + if err := och.msgDatabase.MsgToModifyMQ(ctx, msgChannelValue.uniqueKey, conversationIDNotification, modifyMsgList); err != nil { + log.ZError( ctx, - "msg arrived channel", - "channel id", - channelID, - "msgList length", - len(ctxMsgList), + "msg to modify mq error", + err, "uniqueKey", msgChannelValue.uniqueKey, - ) - storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList, modifyMsgList := och.getPushStorageMsgList( - ctxMsgList, - ) - log.ZDebug( - ctx, - "msg lens", - "storageMsgList", - len(storageMsgList), - "notStorageMsgList", - len(notStorageMsgList), - "storageNotificationList", - len(storageNotificationList), - "notStorageNotificationList", - len(notStorageNotificationList), "modifyMsgList", - len(modifyMsgList), + modifyMsgList, ) - conversationIDMsg := msgprocessor.GetChatConversationIDByMsg(ctxMsgList[0].message) - conversationIDNotification := msgprocessor.GetNotificationConversationIDByMsg(ctxMsgList[0].message) - och.handleMsg(ctx, msgChannelValue.uniqueKey, conversationIDMsg, storageMsgList, notStorageMsgList) - och.handleNotification( - ctx, - msgChannelValue.uniqueKey, - conversationIDNotification, - storageNotificationList, - notStorageNotificationList, - ) - if err := och.msgDatabase.MsgToModifyMQ(ctx, msgChannelValue.uniqueKey, conversationIDNotification, modifyMsgList); err != nil { - log.ZError( - ctx, - "msg to modify mq error", - err, - "uniqueKey", - msgChannelValue.uniqueKey, - "modifyMsgList", - modifyMsgList, - ) - } } } }