pull/2608/head
icey-yu 4 months ago
parent b28d0fd8fb
commit 7143444bd2

@ -265,7 +265,7 @@ func (ws *WsServer) registerClient(client *Client) {
if clientOK {
ws.clients.Set(client.UserID, client)
// There is already a connection to the platform
log.ZInfo(client.ctx, "repeat login", "userID", client.UserID, "platformID",
log.ZDebug(client.ctx, "repeat login", "userID", client.UserID, "platformID",
client.PlatformID, "old remote addr", getRemoteAdders(oldClients))
ws.onlineUserConnNum.Add(1)
} else {
@ -293,7 +293,7 @@ func (ws *WsServer) registerClient(client *Client) {
wg.Wait()
log.ZInfo(
log.ZDebug(
client.ctx,
"user online",
"online user Num",
@ -360,7 +360,7 @@ func (ws *WsServer) unregisterClient(client *Client) {
ws.onlineUserConnNum.Add(-1)
ws.subscription.DelClient(client)
//ws.SetUserOnlineStatus(client.ctx, client, constant.Offline)
log.ZInfo(client.ctx, "user offline", "close reason", client.closedErr, "online user Num",
log.ZDebug(client.ctx, "user offline", "close reason", client.closedErr, "online user Num",
ws.onlineUserNum.Load(), "online user conn Num",
ws.onlineUserConnNum.Load(),
)

@ -97,7 +97,7 @@ func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID
ctx = mcontext.WithTriggerIDContext(ctx, val.TriggerID())
ctxMessages := och.parseConsumerMessages(ctx, val.Val())
ctx = withAggregationCtx(ctx, ctxMessages)
log.ZInfo(ctx, "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMessages),
log.ZDebug(ctx, "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMessages),
"key", val.Key())
och.doSetReadSeq(ctx, ctxMessages)
@ -256,7 +256,7 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key
if isNewConversation {
switch msg.SessionType {
case constant.ReadGroupChatType:
log.ZInfo(ctx, "group chat first create conversation", "conversationID",
log.ZDebug(ctx, "group chat first create conversation", "conversationID",
conversationID)
userIDs, err := och.groupRpcClient.GetGroupMemberIDs(ctx, msg.GroupID)
if err != nil {
@ -343,7 +343,7 @@ func (och *OnlineHistoryRedisConsumerHandler) Cleanup(_ sarama.ConsumerGroupSess
func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group
log.ZInfo(context.Background(), "online new session msg come", "highWaterMarkOffset",
log.ZDebug(context.Background(), "online new session msg come", "highWaterMarkOffset",
claim.HighWaterMarkOffset(), "topic", claim.Topic(), "partition", claim.Partition())
och.redisMessageBatches.OnComplete = func(lastMessage *sarama.ConsumerMessage, totalCount int) {
session.MarkMessage(lastMessage, "")

@ -56,7 +56,7 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont
log.ZError(ctx, "msgFromMQ.MsgData is empty", nil, "cMsg", cMsg)
return
}
log.ZInfo(ctx, "mongo consumer recv msg", "msgs", msgFromMQ.String())
log.ZDebug(ctx, "mongo consumer recv msg", "msgs", msgFromMQ.String())
err = mc.msgDatabase.BatchInsertChat2DB(ctx, msgFromMQ.ConversationID, msgFromMQ.MsgData, msgFromMQ.LastSeq)
if err != nil {
log.ZError(

@ -28,6 +28,6 @@ type Dummy struct {
}
func (d *Dummy) Push(ctx context.Context, userIDs []string, title, content string, opts *options.Opts) error {
log.ZInfo(ctx, "dummy push")
log.ZDebug(ctx, "dummy push")
return nil
}

@ -27,12 +27,12 @@ func newEmptyOnlinePusher() *emptyOnlinePusher {
func (emptyOnlinePusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData,
pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) {
log.ZWarn(ctx, "emptyOnlinePusher GetConnsAndOnlinePush", nil)
log.ZInfo(ctx, "emptyOnlinePusher GetConnsAndOnlinePush", nil)
return nil, nil
}
func (u emptyOnlinePusher) GetOnlinePushFailedUserIDs(ctx context.Context, msg *sdkws.MsgData,
wsResults []*msggateway.SingleMsgToUserResults, pushToUserIDs *[]string) []string {
log.ZWarn(ctx, "emptyOnlinePusher GetOnlinePushFailedUserIDs", nil)
log.ZInfo(ctx, "emptyOnlinePusher GetOnlinePushFailedUserIDs", nil)
return nil
}

@ -40,8 +40,6 @@ import (
"github.com/openimsdk/tools/utils/timeutil"
"github.com/redis/go-redis/v9"
"google.golang.org/protobuf/proto"
"math/rand"
"strconv"
"time"
)
@ -69,19 +67,6 @@ func NewConsumerHandler(config *Config, offlinePusher offlinepush.OfflinePusher,
return nil, err
}
userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID)
for {
ctx := mcontext.SetOperationID(context.TODO(), strconv.FormatInt(time.Now().UnixNano()+int64(rand.Uint32()), 10))
conns, err := userRpcClient.Discov.GetConns(
ctx,
config.Share.RpcRegisterName.User,
)
if err != nil || len(conns) == 0 {
time.Sleep(time.Second)
log.ZWarn(ctx, "waiting for user rpc", err)
} else {
break
}
}
consumerHandler.offlinePusher = offlinePusher
consumerHandler.onlinePusher = NewOnlinePusher(client, config)
@ -228,48 +213,42 @@ func (c *ConsumerHandler) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.
}
func (c *ConsumerHandler) Push2Group(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) {
//log.ZDebug(ctx, "Get group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID)
log.ZWarn(ctx, "Get group msg from msg_transfer and push msg", nil, "msg", msg.String(), "groupID", groupID)
log.ZInfo(ctx, "Get group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID)
defer func(duration time.Time) {
t := time.Since(duration)
log.ZWarn(ctx, "Get group msg from msg_transfer and push msg end", nil, "msg", msg.String(), "groupID", groupID, "time cost", t)
log.ZInfo(ctx, "Get group msg from msg_transfer and push msg end", "msg", msg.String(), "groupID", groupID, "time cost", t)
}(time.Now())
var pushToUserIDs []string
if err = c.webhookBeforeGroupOnlinePush(ctx, &c.config.WebhooksConfig.BeforeGroupOnlinePush, groupID, msg,
&pushToUserIDs); err != nil {
return err
}
//log.ZDebug(ctx, "webhookBeforeGroupOnlinePush end")
log.ZWarn(ctx, "webhookBeforeGroupOnlinePush end", nil)
log.ZInfo(ctx, "webhookBeforeGroupOnlinePush end")
err = c.groupMessagesHandler(ctx, groupID, &pushToUserIDs, msg)
if err != nil {
return err
}
//log.ZDebug(ctx, "groupMessagesHandler end")
log.ZWarn(ctx, "groupMessagesHandler end", nil)
log.ZInfo(ctx, "groupMessagesHandler end")
wsResults, err := c.GetConnsAndOnlinePush(ctx, msg, pushToUserIDs)
if err != nil {
return err
}
//log.ZDebug(ctx, "group push result", "result", wsResults, "msg", msg)
log.ZWarn(ctx, "group push result", nil, "result", wsResults, "msg", msg)
log.ZInfo(ctx, "group push result", "result", wsResults, "msg", msg)
if !c.shouldPushOffline(ctx, msg) {
return nil
}
needOfflinePushUserIDs := c.onlinePusher.GetOnlinePushFailedUserIDs(ctx, msg, wsResults, &pushToUserIDs)
//log.ZDebug(ctx, "GetOnlinePushFailedUserIDs end")
log.ZWarn(ctx, "GetOnlinePushFailedUserIDs end", nil)
log.ZInfo(ctx, "GetOnlinePushFailedUserIDs end")
//filter some user, like don not disturb or don't need offline push etc.
needOfflinePushUserIDs, err = c.filterGroupMessageOfflinePush(ctx, groupID, msg, needOfflinePushUserIDs)
if err != nil {
return err
}
//log.ZDebug(ctx, "filterGroupMessageOfflinePush end")
log.ZWarn(ctx, "filterGroupMessageOfflinePush end", nil)
log.ZInfo(ctx, "filterGroupMessageOfflinePush end")
// Use offline push messaging
if len(needOfflinePushUserIDs) > 0 {
@ -278,8 +257,7 @@ func (c *ConsumerHandler) Push2Group(ctx context.Context, groupID string, msg *s
if err != nil {
return err
}
//log.ZDebug(ctx, "webhookBeforeOfflinePush end")
log.ZWarn(ctx, "webhookBeforeOfflinePush end", nil)
log.ZInfo(ctx, "webhookBeforeOfflinePush end")
if len(offlinePushUserIDs) > 0 {
needOfflinePushUserIDs = offlinePushUserIDs
}
@ -327,7 +305,7 @@ func (c *ConsumerHandler) groupMessagesHandler(ctx context.Context, groupID stri
if unmarshalNotificationElem(msg.Content, &tips) != nil {
return err
}
log.ZInfo(ctx, "GroupDismissedNotificationInfo****", "groupID", groupID, "num", len(*pushToUserIDs), "list", pushToUserIDs)
log.ZDebug(ctx, "GroupDismissedNotificationInfo****", "groupID", groupID, "num", len(*pushToUserIDs), "list", pushToUserIDs)
if len(c.config.Share.IMAdminUserID) > 0 {
ctx = mcontext.WithOpUserIDContext(ctx, c.config.Share.IMAdminUserID[0])
}

@ -67,7 +67,7 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.
return nil, err
}
log.ZInfo(ctx, "clearing message", "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start))
log.ZDebug(ctx, "clearing message", "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start))
return &msg.ClearMsgResp{}, nil
}

@ -79,13 +79,13 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
now := time.Now()
deltime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.RetainChatRecords))
ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deltime.UnixMilli()))
log.ZInfo(ctx, "clear chat records", "deltime", deltime, "timestamp", deltime.UnixMilli())
log.ZDebug(ctx, "clear chat records", "deltime", deltime, "timestamp", deltime.UnixMilli())
if _, err := msgClient.ClearMsg(ctx, &msg.ClearMsgReq{Timestamp: deltime.UnixMilli()}); err != nil {
log.ZError(ctx, "cron clear chat records failed", err, "deltime", deltime, "cont", time.Since(now))
return
}
log.ZInfo(ctx, "cron clear chat records success", "deltime", deltime, "cont", time.Since(now))
log.ZDebug(ctx, "cron clear chat records success", "deltime", deltime, "cont", time.Since(now))
}
if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, clearMsgFunc); err != nil {
return errs.Wrap(err)
@ -95,7 +95,7 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
msgDestructFunc := func() {
now := time.Now()
ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), now.UnixMilli()))
log.ZInfo(ctx, "msg destruct cron start", "now", now)
log.ZDebug(ctx, "msg destruct cron start", "now", now)
conversations, err := conversationClient.GetConversationsNeedDestructMsgs(ctx, &pbconversation.GetConversationsNeedDestructMsgsReq{})
if err != nil {
@ -108,7 +108,7 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
return
}
}
log.ZInfo(ctx, "msg destruct cron task completed", "cont", time.Since(now))
log.ZDebug(ctx, "msg destruct cron task completed", "cont", time.Since(now))
}
if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, msgDestructFunc); err != nil {
return errs.Wrap(err)
@ -119,18 +119,18 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
// now := time.Now()
// deleteTime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.FileExpireTime))
// ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deleteTime.UnixMilli()))
// log.ZInfo(ctx, "deleteoutDatedData ", "deletetime", deleteTime, "timestamp", deleteTime.UnixMilli())
// log.ZDebug(ctx, "deleteoutDatedData ", "deletetime", deleteTime, "timestamp", deleteTime.UnixMilli())
// if _, err := thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: deleteTime.UnixMilli()}); err != nil {
// log.ZError(ctx, "cron deleteoutDatedData failed", err, "deleteTime", deleteTime, "cont", time.Since(now))
// return
// }
// log.ZInfo(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(now))
// log.ZDebug(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(now))
// }
// if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, deleteObjectFunc); err != nil {
// return errs.Wrap(err)
// }
log.ZInfo(ctx, "start cron task", "CronExecuteTime", config.CronTask.CronExecuteTime)
log.ZDebug(ctx, "start cron task", "CronExecuteTime", config.CronTask.CronExecuteTime)
crontab.Start()
<-ctx.Done()
return nil

@ -69,7 +69,7 @@ func (c *Client) AsyncPost(ctx context.Context, command string, req callbackstru
func (c *Client) post(ctx context.Context, command string, input interface{}, output callbackstruct.CallbackResp, timeout int) error {
ctx = mcontext.WithMustInfoCtx([]string{mcontext.GetOperationID(ctx), mcontext.GetOpUserID(ctx), mcontext.GetOpUserPlatform(ctx), mcontext.GetConnID(ctx)})
fullURL := c.url + "/" + command
log.ZInfo(ctx, "webhook", "url", fullURL, "input", input, "config", timeout)
log.ZDebug(ctx, "webhook", "url", fullURL, "input", input, "config", timeout)
operationID, _ := ctx.Value(constant.OperationID).(string)
b, err := c.client.Post(ctx, fullURL, map[string]string{constant.OperationID: operationID}, input, timeout)
if err != nil {
@ -81,6 +81,6 @@ func (c *Client) post(ctx context.Context, command string, input interface{}, ou
if err := output.Parse(); err != nil {
return err
}
log.ZInfo(ctx, "webhook success", "url", fullURL, "input", input, "response", string(b))
log.ZDebug(ctx, "webhook success", "url", fullURL, "input", input, "response", string(b))
return nil
}

@ -89,7 +89,7 @@ func (o *OnlineCache) initUsersOnlineStatus(ctx context.Context) error {
time.Sleep(time.Second * 10)
defer func(t time.Time) {
log.ZWarn(ctx, "init users online status end", nil, "cost", time.Since(t), "totalSet", totalSet)
log.ZInfo(ctx, "init users online status end", "cost", time.Since(t), "totalSet", totalSet)
}(time.Now())
for page := int32(1); ; page++ {

Loading…
Cancel
Save