diff --git a/internal/msggateway/ws_server.go b/internal/msggateway/ws_server.go index 9b18ade7d..81392897b 100644 --- a/internal/msggateway/ws_server.go +++ b/internal/msggateway/ws_server.go @@ -265,7 +265,7 @@ func (ws *WsServer) registerClient(client *Client) { if clientOK { ws.clients.Set(client.UserID, client) // There is already a connection to the platform - log.ZInfo(client.ctx, "repeat login", "userID", client.UserID, "platformID", + log.ZDebug(client.ctx, "repeat login", "userID", client.UserID, "platformID", client.PlatformID, "old remote addr", getRemoteAdders(oldClients)) ws.onlineUserConnNum.Add(1) } else { @@ -293,7 +293,7 @@ func (ws *WsServer) registerClient(client *Client) { wg.Wait() - log.ZInfo( + log.ZDebug( client.ctx, "user online", "online user Num", @@ -360,7 +360,7 @@ func (ws *WsServer) unregisterClient(client *Client) { ws.onlineUserConnNum.Add(-1) ws.subscription.DelClient(client) //ws.SetUserOnlineStatus(client.ctx, client, constant.Offline) - log.ZInfo(client.ctx, "user offline", "close reason", client.closedErr, "online user Num", + log.ZDebug(client.ctx, "user offline", "close reason", client.closedErr, "online user Num", ws.onlineUserNum.Load(), "online user conn Num", ws.onlineUserConnNum.Load(), ) diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 6de07cfbc..ca2434db5 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -97,7 +97,7 @@ func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID ctx = mcontext.WithTriggerIDContext(ctx, val.TriggerID()) ctxMessages := och.parseConsumerMessages(ctx, val.Val()) ctx = withAggregationCtx(ctx, ctxMessages) - log.ZInfo(ctx, "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMessages), + log.ZDebug(ctx, "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMessages), "key", val.Key()) och.doSetReadSeq(ctx, ctxMessages) @@ -256,7 +256,7 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key if isNewConversation { switch msg.SessionType { case constant.ReadGroupChatType: - log.ZInfo(ctx, "group chat first create conversation", "conversationID", + log.ZDebug(ctx, "group chat first create conversation", "conversationID", conversationID) userIDs, err := och.groupRpcClient.GetGroupMemberIDs(ctx, msg.GroupID) if err != nil { @@ -343,7 +343,7 @@ func (och *OnlineHistoryRedisConsumerHandler) Cleanup(_ sarama.ConsumerGroupSess func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group - log.ZInfo(context.Background(), "online new session msg come", "highWaterMarkOffset", + log.ZDebug(context.Background(), "online new session msg come", "highWaterMarkOffset", claim.HighWaterMarkOffset(), "topic", claim.Topic(), "partition", claim.Partition()) och.redisMessageBatches.OnComplete = func(lastMessage *sarama.ConsumerMessage, totalCount int) { session.MarkMessage(lastMessage, "") diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index cea47fcd5..94d640653 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -56,7 +56,7 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont log.ZError(ctx, "msgFromMQ.MsgData is empty", nil, "cMsg", cMsg) return } - log.ZInfo(ctx, "mongo consumer recv msg", "msgs", msgFromMQ.String()) + log.ZDebug(ctx, "mongo consumer recv msg", "msgs", msgFromMQ.String()) err = mc.msgDatabase.BatchInsertChat2DB(ctx, msgFromMQ.ConversationID, msgFromMQ.MsgData, msgFromMQ.LastSeq) if err != nil { log.ZError( diff --git a/internal/push/offlinepush/dummy/push.go b/internal/push/offlinepush/dummy/push.go index 5698b7294..09831cabf 100644 --- a/internal/push/offlinepush/dummy/push.go +++ b/internal/push/offlinepush/dummy/push.go @@ -28,6 +28,6 @@ type Dummy struct { } func (d *Dummy) Push(ctx context.Context, userIDs []string, title, content string, opts *options.Opts) error { - log.ZInfo(ctx, "dummy push") + log.ZDebug(ctx, "dummy push") return nil } diff --git a/internal/push/onlinepusher.go b/internal/push/onlinepusher.go index d0c65e06b..9521a84a0 100644 --- a/internal/push/onlinepusher.go +++ b/internal/push/onlinepusher.go @@ -27,12 +27,12 @@ func newEmptyOnlinePusher() *emptyOnlinePusher { func (emptyOnlinePusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) { - log.ZWarn(ctx, "emptyOnlinePusher GetConnsAndOnlinePush", nil) + log.ZInfo(ctx, "emptyOnlinePusher GetConnsAndOnlinePush", nil) return nil, nil } func (u emptyOnlinePusher) GetOnlinePushFailedUserIDs(ctx context.Context, msg *sdkws.MsgData, wsResults []*msggateway.SingleMsgToUserResults, pushToUserIDs *[]string) []string { - log.ZWarn(ctx, "emptyOnlinePusher GetOnlinePushFailedUserIDs", nil) + log.ZInfo(ctx, "emptyOnlinePusher GetOnlinePushFailedUserIDs", nil) return nil } diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index ab2cff3ef..e2ba0a20e 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -40,8 +40,6 @@ import ( "github.com/openimsdk/tools/utils/timeutil" "github.com/redis/go-redis/v9" "google.golang.org/protobuf/proto" - "math/rand" - "strconv" "time" ) @@ -69,19 +67,6 @@ func NewConsumerHandler(config *Config, offlinePusher offlinepush.OfflinePusher, return nil, err } userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID) - for { - ctx := mcontext.SetOperationID(context.TODO(), strconv.FormatInt(time.Now().UnixNano()+int64(rand.Uint32()), 10)) - conns, err := userRpcClient.Discov.GetConns( - ctx, - config.Share.RpcRegisterName.User, - ) - if err != nil || len(conns) == 0 { - time.Sleep(time.Second) - log.ZWarn(ctx, "waiting for user rpc", err) - } else { - break - } - } consumerHandler.offlinePusher = offlinePusher consumerHandler.onlinePusher = NewOnlinePusher(client, config) @@ -228,48 +213,42 @@ func (c *ConsumerHandler) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws. } func (c *ConsumerHandler) Push2Group(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) { - //log.ZDebug(ctx, "Get group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID) - log.ZWarn(ctx, "Get group msg from msg_transfer and push msg", nil, "msg", msg.String(), "groupID", groupID) + log.ZInfo(ctx, "Get group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID) defer func(duration time.Time) { t := time.Since(duration) - log.ZWarn(ctx, "Get group msg from msg_transfer and push msg end", nil, "msg", msg.String(), "groupID", groupID, "time cost", t) + log.ZInfo(ctx, "Get group msg from msg_transfer and push msg end", "msg", msg.String(), "groupID", groupID, "time cost", t) }(time.Now()) var pushToUserIDs []string if err = c.webhookBeforeGroupOnlinePush(ctx, &c.config.WebhooksConfig.BeforeGroupOnlinePush, groupID, msg, &pushToUserIDs); err != nil { return err } - //log.ZDebug(ctx, "webhookBeforeGroupOnlinePush end") - log.ZWarn(ctx, "webhookBeforeGroupOnlinePush end", nil) + log.ZInfo(ctx, "webhookBeforeGroupOnlinePush end") err = c.groupMessagesHandler(ctx, groupID, &pushToUserIDs, msg) if err != nil { return err } - //log.ZDebug(ctx, "groupMessagesHandler end") - log.ZWarn(ctx, "groupMessagesHandler end", nil) + log.ZInfo(ctx, "groupMessagesHandler end") wsResults, err := c.GetConnsAndOnlinePush(ctx, msg, pushToUserIDs) if err != nil { return err } - //log.ZDebug(ctx, "group push result", "result", wsResults, "msg", msg) - log.ZWarn(ctx, "group push result", nil, "result", wsResults, "msg", msg) + log.ZInfo(ctx, "group push result", "result", wsResults, "msg", msg) if !c.shouldPushOffline(ctx, msg) { return nil } needOfflinePushUserIDs := c.onlinePusher.GetOnlinePushFailedUserIDs(ctx, msg, wsResults, &pushToUserIDs) - //log.ZDebug(ctx, "GetOnlinePushFailedUserIDs end") - log.ZWarn(ctx, "GetOnlinePushFailedUserIDs end", nil) + log.ZInfo(ctx, "GetOnlinePushFailedUserIDs end") //filter some user, like don not disturb or don't need offline push etc. needOfflinePushUserIDs, err = c.filterGroupMessageOfflinePush(ctx, groupID, msg, needOfflinePushUserIDs) if err != nil { return err } - //log.ZDebug(ctx, "filterGroupMessageOfflinePush end") - log.ZWarn(ctx, "filterGroupMessageOfflinePush end", nil) + log.ZInfo(ctx, "filterGroupMessageOfflinePush end") // Use offline push messaging if len(needOfflinePushUserIDs) > 0 { @@ -278,8 +257,7 @@ func (c *ConsumerHandler) Push2Group(ctx context.Context, groupID string, msg *s if err != nil { return err } - //log.ZDebug(ctx, "webhookBeforeOfflinePush end") - log.ZWarn(ctx, "webhookBeforeOfflinePush end", nil) + log.ZInfo(ctx, "webhookBeforeOfflinePush end") if len(offlinePushUserIDs) > 0 { needOfflinePushUserIDs = offlinePushUserIDs } @@ -327,7 +305,7 @@ func (c *ConsumerHandler) groupMessagesHandler(ctx context.Context, groupID stri if unmarshalNotificationElem(msg.Content, &tips) != nil { return err } - log.ZInfo(ctx, "GroupDismissedNotificationInfo****", "groupID", groupID, "num", len(*pushToUserIDs), "list", pushToUserIDs) + log.ZDebug(ctx, "GroupDismissedNotificationInfo****", "groupID", groupID, "num", len(*pushToUserIDs), "list", pushToUserIDs) if len(c.config.Share.IMAdminUserID) > 0 { ctx = mcontext.WithOpUserIDContext(ctx, c.config.Share.IMAdminUserID[0]) } diff --git a/internal/rpc/msg/clear.go b/internal/rpc/msg/clear.go index 4ffa1f43e..c5bd36b44 100644 --- a/internal/rpc/msg/clear.go +++ b/internal/rpc/msg/clear.go @@ -67,7 +67,7 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg. return nil, err } - log.ZInfo(ctx, "clearing message", "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start)) + log.ZDebug(ctx, "clearing message", "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start)) return &msg.ClearMsgResp{}, nil } diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index 337272d69..dbb4e34f6 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -79,13 +79,13 @@ func Start(ctx context.Context, config *CronTaskConfig) error { now := time.Now() deltime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.RetainChatRecords)) ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deltime.UnixMilli())) - log.ZInfo(ctx, "clear chat records", "deltime", deltime, "timestamp", deltime.UnixMilli()) + log.ZDebug(ctx, "clear chat records", "deltime", deltime, "timestamp", deltime.UnixMilli()) if _, err := msgClient.ClearMsg(ctx, &msg.ClearMsgReq{Timestamp: deltime.UnixMilli()}); err != nil { log.ZError(ctx, "cron clear chat records failed", err, "deltime", deltime, "cont", time.Since(now)) return } - log.ZInfo(ctx, "cron clear chat records success", "deltime", deltime, "cont", time.Since(now)) + log.ZDebug(ctx, "cron clear chat records success", "deltime", deltime, "cont", time.Since(now)) } if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, clearMsgFunc); err != nil { return errs.Wrap(err) @@ -95,7 +95,7 @@ func Start(ctx context.Context, config *CronTaskConfig) error { msgDestructFunc := func() { now := time.Now() ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), now.UnixMilli())) - log.ZInfo(ctx, "msg destruct cron start", "now", now) + log.ZDebug(ctx, "msg destruct cron start", "now", now) conversations, err := conversationClient.GetConversationsNeedDestructMsgs(ctx, &pbconversation.GetConversationsNeedDestructMsgsReq{}) if err != nil { @@ -108,7 +108,7 @@ func Start(ctx context.Context, config *CronTaskConfig) error { return } } - log.ZInfo(ctx, "msg destruct cron task completed", "cont", time.Since(now)) + log.ZDebug(ctx, "msg destruct cron task completed", "cont", time.Since(now)) } if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, msgDestructFunc); err != nil { return errs.Wrap(err) @@ -119,18 +119,18 @@ func Start(ctx context.Context, config *CronTaskConfig) error { // now := time.Now() // deleteTime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.FileExpireTime)) // ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deleteTime.UnixMilli())) - // log.ZInfo(ctx, "deleteoutDatedData ", "deletetime", deleteTime, "timestamp", deleteTime.UnixMilli()) + // log.ZDebug(ctx, "deleteoutDatedData ", "deletetime", deleteTime, "timestamp", deleteTime.UnixMilli()) // if _, err := thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: deleteTime.UnixMilli()}); err != nil { // log.ZError(ctx, "cron deleteoutDatedData failed", err, "deleteTime", deleteTime, "cont", time.Since(now)) // return // } - // log.ZInfo(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(now)) + // log.ZDebug(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(now)) // } // if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, deleteObjectFunc); err != nil { // return errs.Wrap(err) // } - log.ZInfo(ctx, "start cron task", "CronExecuteTime", config.CronTask.CronExecuteTime) + log.ZDebug(ctx, "start cron task", "CronExecuteTime", config.CronTask.CronExecuteTime) crontab.Start() <-ctx.Done() return nil diff --git a/pkg/common/webhook/http_client.go b/pkg/common/webhook/http_client.go index e46f08806..14fe51beb 100644 --- a/pkg/common/webhook/http_client.go +++ b/pkg/common/webhook/http_client.go @@ -69,7 +69,7 @@ func (c *Client) AsyncPost(ctx context.Context, command string, req callbackstru func (c *Client) post(ctx context.Context, command string, input interface{}, output callbackstruct.CallbackResp, timeout int) error { ctx = mcontext.WithMustInfoCtx([]string{mcontext.GetOperationID(ctx), mcontext.GetOpUserID(ctx), mcontext.GetOpUserPlatform(ctx), mcontext.GetConnID(ctx)}) fullURL := c.url + "/" + command - log.ZInfo(ctx, "webhook", "url", fullURL, "input", input, "config", timeout) + log.ZDebug(ctx, "webhook", "url", fullURL, "input", input, "config", timeout) operationID, _ := ctx.Value(constant.OperationID).(string) b, err := c.client.Post(ctx, fullURL, map[string]string{constant.OperationID: operationID}, input, timeout) if err != nil { @@ -81,6 +81,6 @@ func (c *Client) post(ctx context.Context, command string, input interface{}, ou if err := output.Parse(); err != nil { return err } - log.ZInfo(ctx, "webhook success", "url", fullURL, "input", input, "response", string(b)) + log.ZDebug(ctx, "webhook success", "url", fullURL, "input", input, "response", string(b)) return nil } diff --git a/pkg/rpccache/online.go b/pkg/rpccache/online.go index 730e158ce..4e86560a4 100644 --- a/pkg/rpccache/online.go +++ b/pkg/rpccache/online.go @@ -89,7 +89,7 @@ func (o *OnlineCache) initUsersOnlineStatus(ctx context.Context) error { time.Sleep(time.Second * 10) defer func(t time.Time) { - log.ZWarn(ctx, "init users online status end", nil, "cost", time.Since(t), "totalSet", totalSet) + log.ZInfo(ctx, "init users online status end", "cost", time.Since(t), "totalSet", totalSet) }(time.Now()) for page := int32(1); ; page++ {