diff --git a/config/redis.yml b/config/redis.yml index 83e305459..2448bcb5c 100644 --- a/config/redis.yml +++ b/config/redis.yml @@ -4,3 +4,4 @@ password: openIM123 clusterMode: false db: 0 maxRetry: 10 +poolSize: 100 diff --git a/go.mod b/go.mod index ae30db056..9723208e4 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/protocol v0.0.72-alpha.9 - github.com/openimsdk/tools v0.0.49-alpha.55 + github.com/openimsdk/tools v0.0.50-alpha.11 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index 815b6badf..1a5f77999 100644 --- a/go.sum +++ b/go.sum @@ -321,8 +321,8 @@ github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCF github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/protocol v0.0.72-alpha.9 h1:Dyx4vs88IU4rJ2YcP/TdYp4ww8JjsMkV89hB/Eazx+A= github.com/openimsdk/protocol v0.0.72-alpha.9/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= -github.com/openimsdk/tools v0.0.49-alpha.55 h1:KPgC53oqiwZYssLKljhtXbWXifMlTj2SSQEusj4Uf4k= -github.com/openimsdk/tools v0.0.49-alpha.55/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4= +github.com/openimsdk/tools v0.0.50-alpha.11 h1:ClhkRjUVJWbmOiQ14G6do/ES1a6ZueDITv40Apwq/Tc= +github.com/openimsdk/tools v0.0.50-alpha.11/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 8c1978d48..6de07cfbc 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -237,6 +237,10 @@ func (och *OnlineHistoryRedisConsumerHandler) categorizeMessageLists(totalMsgs [ } func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key, conversationID string, storageList, notStorageList []*ContextMsg) { + for _, storageMsg := range storageList { + log.ZDebug(ctx, "handle storage msg", "msg", storageMsg.message.String()) + } + och.toPushTopic(ctx, key, conversationID, notStorageList) var storageMessageList []*sdkws.MsgData for _, msg := range storageList { @@ -311,8 +315,9 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con } } -func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(_ context.Context, key, conversationID string, msgs []*ContextMsg) { +func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, key, conversationID string, msgs []*ContextMsg) { for _, v := range msgs { + log.ZDebug(ctx, "push msg to topic", "msg", v.message.String()) och.msgDatabase.MsgToPushMQ(v.ctx, key, conversationID, v.message) } } diff --git a/internal/push/offlinepush/dummy/push.go b/internal/push/offlinepush/dummy/push.go index 028e7edd3..5698b7294 100644 --- a/internal/push/offlinepush/dummy/push.go +++ b/internal/push/offlinepush/dummy/push.go @@ -17,6 +17,7 @@ package dummy import ( "context" "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options" + "github.com/openimsdk/tools/log" ) func NewClient() *Dummy { @@ -27,5 +28,6 @@ type Dummy struct { } func (d *Dummy) Push(ctx context.Context, userIDs []string, title, content string, opts *options.Opts) error { + log.ZInfo(ctx, "dummy push") return nil } diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index 79d3a9296..5d0359994 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -93,7 +93,8 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) { nowSec := timeutil.GetCurrentTimestampBySecond() if nowSec-sec > 10 { - log.ZWarn(ctx, "long time push msg", nil, "msg", pbData.String(), "sec", sec, "nowSec", nowSec, "nowSec-sec", nowSec-sec) + prommetrics.MsgLoneTimePushCounter.Inc() + log.ZWarn(ctx, "it’s been a while since the message was sent", nil, "msg", pbData.String(), "sec", sec, "nowSec", nowSec, "nowSec-sec", nowSec-sec) } var err error switch msgFromMQ.MsgData.SessionType { diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index e3ce6ed14..e99d94163 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -1028,7 +1028,7 @@ func (g *groupServer) SetGroupInfo(ctx context.Context, req *pbgroup.SetGroupInf } resp, err := g.GetGroupMemberUserIDs(ctx, &pbgroup.GetGroupMemberUserIDsReq{GroupID: req.GroupInfoForSet.GroupID}) if err != nil { - log.ZWarn(ctx, "GetGroupMemberIDs is failed", err) + log.ZWarn(ctx, "GetGroupMemberIDs", err) return } conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.GroupNotification} @@ -1125,33 +1125,36 @@ func (g *groupServer) SetGroupInfoEX(ctx context.Context, req *pbgroup.SetGroupI if req.GroupInfoForSetEX.Notification != nil { num-- - func() { - conversation := &pbconversation.ConversationReq{ - ConversationID: msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, req.GroupInfoForSetEX.GroupID), - ConversationType: constant.ReadGroupChatType, - GroupID: req.GroupInfoForSetEX.GroupID, - } + if req.GroupInfoForSetEX.Notification.Value != "" { + func() { + conversation := &pbconversation.ConversationReq{ + ConversationID: msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, req.GroupInfoForSetEX.GroupID), + ConversationType: constant.ReadGroupChatType, + GroupID: req.GroupInfoForSetEX.GroupID, + } resp, err := g.GetGroupMemberUserIDs(ctx, &pbgroup.GetGroupMemberUserIDsReq{GroupID: req.GroupInfoForSetEX.GroupID}) if err != nil { - log.ZWarn(ctx, "GetGroupMemberIDs is failed.", err) + log.ZWarn(ctx, "GetGroupMemberIDs", err) return } - conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.GroupNotification} + conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.GroupNotification} if err := g.conversationRpcClient.SetConversations(ctx, resp.UserIDs, conversation); err != nil { - log.ZWarn(ctx, "SetConversations", err, "UserIDs", resp.UserIDs, "conversation", conversation) + log.ZWarn(ctx, "SetConversations", err, resp.UserIDs, conversation) } }() - g.notification.GroupInfoSetAnnouncementNotification(ctx, &sdkws.GroupInfoSetAnnouncementTips{Group: tips.Group, OpUser: tips.OpUser}) + g.notification.GroupInfoSetAnnouncementNotification(ctx, &sdkws.GroupInfoSetAnnouncementTips{Group: tips.Group, OpUser: tips.OpUser}) + } } + if req.GroupInfoForSetEX.GroupName != "" { num-- - g.notification.GroupInfoSetNameNotification(ctx, &sdkws.GroupInfoSetNameTips{Group: tips.Group, OpUser: tips.OpUser}) } + if num > 0 { g.notification.GroupInfoSetNotification(ctx, tips) } diff --git a/internal/rpc/group/notification.go b/internal/rpc/group/notification.go index e87e7c495..4a69b6aed 100644 --- a/internal/rpc/group/notification.go +++ b/internal/rpc/group/notification.go @@ -535,28 +535,7 @@ func (g *GroupNotificationSender) MemberEnterNotification(ctx context.Context, g if err := g.conversationRpcClient.GroupChatFirstCreateConversation(ctx, groupID, entrantUserID); err != nil { return err } - opUserID := mcontext.GetOpUserID(ctx) - var opUser *sdkws.GroupMemberFullInfo - if authverify.IsAppManagerUid(ctx, g.config.Share.IMAdminUserID) { - opUser = &sdkws.GroupMemberFullInfo{ - GroupID: groupID, - UserID: opUserID, - AppMangerLevel: constant.AppAdmin, - } - } else { - users, err := g.getGroupMembers(ctx, groupID, []string{opUserID}) - if err != nil { - return err - } - if len(users) == 0 { - opUser = &sdkws.GroupMemberFullInfo{ - GroupID: groupID, - UserID: opUserID, - } - } else { - opUser = users[0] - } - } + var group *sdkws.GroupInfo group, err = g.getGroupInfo(ctx, groupID) if err != nil { @@ -566,7 +545,11 @@ func (g *GroupNotificationSender) MemberEnterNotification(ctx context.Context, g if err != nil { return err } - tips := &sdkws.MemberInvitedTips{Group: group, InvitedUserList: users, OpUser: opUser} + + tips := &sdkws.MemberInvitedTips{Group: group, InvitedUserList: users} + if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { + return nil + } g.setVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID) g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.MemberInvitedNotification, tips) return nil diff --git a/internal/rpc/relation/friend.go b/internal/rpc/relation/friend.go index 3d29ad337..913058932 100644 --- a/internal/rpc/relation/friend.go +++ b/internal/rpc/relation/friend.go @@ -121,7 +121,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg conversationRpcClient: rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation), config: config, webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL), - queue: memamq.NewMemoryQueue(128, 1024*8), + queue: memamq.NewMemoryQueue(16, 1024*1024), }) return nil } @@ -312,16 +312,20 @@ func (s *friendServer) GetPaginationFriendsApplyTo(ctx context.Context, req *rel if err := s.userRpcClient.Access(ctx, req.UserID); err != nil { return nil, err } + total, friendRequests, err := s.db.PageFriendRequestToMe(ctx, req.UserID, req.Pagination) if err != nil { return nil, err } + resp = &relation.GetPaginationFriendsApplyToResp{} resp.FriendRequests, err = convert.FriendRequestDB2Pb(ctx, friendRequests, s.userRpcClient.GetUsersInfoMap) if err != nil { return nil, err } + resp.Total = int32(total) + return resp, nil } diff --git a/pkg/common/cmd/root.go b/pkg/common/cmd/root.go index b43f86557..5edea4377 100644 --- a/pkg/common/cmd/root.go +++ b/pkg/common/cmd/root.go @@ -129,10 +129,11 @@ func (r *RootCmd) applyOptions(opts ...func(*CmdOpts)) *CmdOpts { } func (r *RootCmd) initializeLogger(cmdOpts *CmdOpts) error { - err := log.InitFromConfig( + err := log.InitLoggerFromConfig( cmdOpts.loggerPrefixName, r.processName, + "", "", r.log.RemainLogLevel, r.log.IsStdout, r.log.IsJson, diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 5261e034c..8bd16178d 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -336,7 +336,8 @@ type Redis struct { Password string `mapstructure:"password"` ClusterMode bool `mapstructure:"clusterMode"` DB int `mapstructure:"storage"` - MaxRetry int `mapstructure:"MaxRetry"` + MaxRetry int `mapstructure:"maxRetry"` + PoolSize int `mapstructure:"poolSize"` } type BeforeConfig struct { @@ -474,6 +475,7 @@ func (r *Redis) Build() *redisutil.Config { Password: r.Password, DB: r.DB, MaxRetry: r.MaxRetry, + PoolSize: r.PoolSize, } } diff --git a/pkg/common/prommetrics/grpc_push.go b/pkg/common/prommetrics/grpc_push.go index 0b6c3e76f..5c966310f 100644 --- a/pkg/common/prommetrics/grpc_push.go +++ b/pkg/common/prommetrics/grpc_push.go @@ -23,4 +23,8 @@ var ( Name: "msg_offline_push_failed_total", Help: "The number of msg failed offline pushed", }) + MsgLoneTimePushCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "msg_long_time_push_total", + Help: "The number of messages with a push time exceeding 10 seconds", + }) ) diff --git a/pkg/common/prommetrics/rpc.go b/pkg/common/prommetrics/rpc.go index dc16322da..7162fa7e8 100644 --- a/pkg/common/prommetrics/rpc.go +++ b/pkg/common/prommetrics/rpc.go @@ -47,9 +47,17 @@ func GetGrpcCusMetrics(registerName string, share *config.Share) []prometheus.Co case share.RpcRegisterName.MessageGateway: return []prometheus.Collector{OnlineUserGauge} case share.RpcRegisterName.Msg: - return []prometheus.Collector{SingleChatMsgProcessSuccessCounter, SingleChatMsgProcessFailedCounter, GroupChatMsgProcessSuccessCounter, GroupChatMsgProcessFailedCounter} + return []prometheus.Collector{ + SingleChatMsgProcessSuccessCounter, + SingleChatMsgProcessFailedCounter, + GroupChatMsgProcessSuccessCounter, + GroupChatMsgProcessFailedCounter, + } case share.RpcRegisterName.Push: - return []prometheus.Collector{MsgOfflinePushFailedCounter} + return []prometheus.Collector{ + MsgOfflinePushFailedCounter, + MsgLoneTimePushCounter, + } case share.RpcRegisterName.Auth: return []prometheus.Collector{UserLoginCounter} case share.RpcRegisterName.User: diff --git a/pkg/common/storage/cache/redis/batch_handler.go b/pkg/common/storage/cache/redis/batch_handler.go index f9923e198..1fbd664a3 100644 --- a/pkg/common/storage/cache/redis/batch_handler.go +++ b/pkg/common/storage/cache/redis/batch_handler.go @@ -118,7 +118,7 @@ func getCache[T any](ctx context.Context, rcClient *rockscache.Client, key strin v, err := rcClient.Fetch2(ctx, key, expire, func() (s string, err error) { t, err = fn(ctx) if err != nil { - log.ZError(ctx, "getCache query database failed", err, "key", key) + //log.ZError(ctx, "getCache query database failed", err, "key", key) return "", err } bs, err := json.Marshal(t) diff --git a/pkg/common/storage/controller/friend.go b/pkg/common/storage/controller/friend.go index 94cb7d661..88a5fc863 100644 --- a/pkg/common/storage/controller/friend.go +++ b/pkg/common/storage/controller/friend.go @@ -160,7 +160,7 @@ func (f *friendDatabase) BecomeFriends(ctx context.Context, ownerUserID string, if err != nil { return err } - opUserID := mcontext.GetOperationID(ctx) + opUserID := mcontext.GetOpUserID(ctx) friends := make([]*model.Friend, 0, len(friendUserIDs)*2) myFriendsSet := datautil.SliceSetAny(myFriends, func(friend *model.Friend) string { return friend.FriendUserID diff --git a/pkg/rpccache/conversation.go b/pkg/rpccache/conversation.go index 2a62c7bbd..925d2a37c 100644 --- a/pkg/rpccache/conversation.go +++ b/pkg/rpccache/conversation.go @@ -86,7 +86,7 @@ func (c *ConversationLocalCache) GetConversation(ctx context.Context, userID, co if err == nil { log.ZDebug(ctx, "ConversationLocalCache GetConversation return", "userID", userID, "conversationID", conversationID, "value", val) } else { - log.ZError(ctx, "ConversationLocalCache GetConversation return", err, "userID", userID, "conversationID", conversationID) + log.ZWarn(ctx, "ConversationLocalCache GetConversation return", err, "userID", userID, "conversationID", conversationID) } }() var cache cacheProto[pbconversation.Conversation] diff --git a/pkg/rpcclient/msg.go b/pkg/rpcclient/msg.go index 958cb69a6..715014800 100644 --- a/pkg/rpcclient/msg.go +++ b/pkg/rpcclient/msg.go @@ -23,7 +23,6 @@ import ( "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/log" - "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/mq/memamq" "github.com/openimsdk/tools/system/program" "github.com/openimsdk/tools/utils/idutil" @@ -270,8 +269,8 @@ func WithUserRpcClient(userRpcClient *UserRpcClient) NotificationSenderOptions { } const ( - notificationWorkerCount = 2 - notificationBufferSize = 200 + notificationWorkerCount = 16 + notificationBufferSize = 1024 * 1024 * 2 ) func NewNotificationSender(conf *config.Notification, opts ...NotificationSenderOptions) *NotificationSender { @@ -298,7 +297,8 @@ func WithRpcGetUserName() NotificationOptions { } func (s *NotificationSender) send(ctx context.Context, sendID, recvID string, contentType, sessionType int32, m proto.Message, opts ...NotificationOptions) { - ctx = mcontext.WithMustInfoCtx([]string{mcontext.GetOperationID(ctx), mcontext.GetOpUserID(ctx), mcontext.GetOpUserPlatform(ctx), mcontext.GetConnID(ctx)}) + //ctx = mcontext.WithMustInfoCtx([]string{mcontext.GetOperationID(ctx), mcontext.GetOpUserID(ctx), mcontext.GetOpUserPlatform(ctx), mcontext.GetConnID(ctx)}) + ctx = context.WithoutCancel(ctx) ctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(5)) defer cancel() n := sdkws.NotificationElem{Detail: jsonutil.StructToJsonString(m)}