Merge branch 'main' of github.com:openimsdk/open-im-server

pull/2591/head
Monet Lee 1 year ago
commit c4ee8c5ecd

@ -4,3 +4,4 @@ password: openIM123
clusterMode: false clusterMode: false
db: 0 db: 0
maxRetry: 10 maxRetry: 10
poolSize: 100

@ -13,7 +13,7 @@ require (
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0 github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.72-alpha.9 github.com/openimsdk/protocol v0.0.72-alpha.9
github.com/openimsdk/tools v0.0.49-alpha.55 github.com/openimsdk/tools v0.0.50-alpha.11
github.com/pkg/errors v0.9.1 // indirect github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0 github.com/prometheus/client_golang v1.18.0
github.com/stretchr/testify v1.9.0 github.com/stretchr/testify v1.9.0

@ -321,8 +321,8 @@ github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCF
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.72-alpha.9 h1:Dyx4vs88IU4rJ2YcP/TdYp4ww8JjsMkV89hB/Eazx+A= github.com/openimsdk/protocol v0.0.72-alpha.9 h1:Dyx4vs88IU4rJ2YcP/TdYp4ww8JjsMkV89hB/Eazx+A=
github.com/openimsdk/protocol v0.0.72-alpha.9/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/protocol v0.0.72-alpha.9/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/tools v0.0.49-alpha.55 h1:KPgC53oqiwZYssLKljhtXbWXifMlTj2SSQEusj4Uf4k= github.com/openimsdk/tools v0.0.50-alpha.11 h1:ClhkRjUVJWbmOiQ14G6do/ES1a6ZueDITv40Apwq/Tc=
github.com/openimsdk/tools v0.0.49-alpha.55/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4= github.com/openimsdk/tools v0.0.50-alpha.11/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=

@ -237,6 +237,10 @@ func (och *OnlineHistoryRedisConsumerHandler) categorizeMessageLists(totalMsgs [
} }
func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key, conversationID string, storageList, notStorageList []*ContextMsg) { func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key, conversationID string, storageList, notStorageList []*ContextMsg) {
for _, storageMsg := range storageList {
log.ZDebug(ctx, "handle storage msg", "msg", storageMsg.message.String())
}
och.toPushTopic(ctx, key, conversationID, notStorageList) och.toPushTopic(ctx, key, conversationID, notStorageList)
var storageMessageList []*sdkws.MsgData var storageMessageList []*sdkws.MsgData
for _, msg := range storageList { for _, msg := range storageList {
@ -311,8 +315,9 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con
} }
} }
func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(_ context.Context, key, conversationID string, msgs []*ContextMsg) { func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, key, conversationID string, msgs []*ContextMsg) {
for _, v := range msgs { for _, v := range msgs {
log.ZDebug(ctx, "push msg to topic", "msg", v.message.String())
och.msgDatabase.MsgToPushMQ(v.ctx, key, conversationID, v.message) och.msgDatabase.MsgToPushMQ(v.ctx, key, conversationID, v.message)
} }
} }

@ -17,6 +17,7 @@ package dummy
import ( import (
"context" "context"
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options" "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options"
"github.com/openimsdk/tools/log"
) )
func NewClient() *Dummy { func NewClient() *Dummy {
@ -27,5 +28,6 @@ type Dummy struct {
} }
func (d *Dummy) Push(ctx context.Context, userIDs []string, title, content string, opts *options.Opts) error { func (d *Dummy) Push(ctx context.Context, userIDs []string, title, content string, opts *options.Opts) error {
log.ZInfo(ctx, "dummy push")
return nil return nil
} }

@ -93,7 +93,8 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) {
nowSec := timeutil.GetCurrentTimestampBySecond() nowSec := timeutil.GetCurrentTimestampBySecond()
if nowSec-sec > 10 { if nowSec-sec > 10 {
log.ZWarn(ctx, "long time push msg", nil, "msg", pbData.String(), "sec", sec, "nowSec", nowSec, "nowSec-sec", nowSec-sec) prommetrics.MsgLoneTimePushCounter.Inc()
log.ZWarn(ctx, "its been a while since the message was sent", nil, "msg", pbData.String(), "sec", sec, "nowSec", nowSec, "nowSec-sec", nowSec-sec)
} }
var err error var err error
switch msgFromMQ.MsgData.SessionType { switch msgFromMQ.MsgData.SessionType {

@ -1028,7 +1028,7 @@ func (g *groupServer) SetGroupInfo(ctx context.Context, req *pbgroup.SetGroupInf
} }
resp, err := g.GetGroupMemberUserIDs(ctx, &pbgroup.GetGroupMemberUserIDsReq{GroupID: req.GroupInfoForSet.GroupID}) resp, err := g.GetGroupMemberUserIDs(ctx, &pbgroup.GetGroupMemberUserIDsReq{GroupID: req.GroupInfoForSet.GroupID})
if err != nil { if err != nil {
log.ZWarn(ctx, "GetGroupMemberIDs is failed", err) log.ZWarn(ctx, "GetGroupMemberIDs", err)
return return
} }
conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.GroupNotification} conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.GroupNotification}
@ -1125,33 +1125,36 @@ func (g *groupServer) SetGroupInfoEX(ctx context.Context, req *pbgroup.SetGroupI
if req.GroupInfoForSetEX.Notification != nil { if req.GroupInfoForSetEX.Notification != nil {
num-- num--
func() { if req.GroupInfoForSetEX.Notification.Value != "" {
conversation := &pbconversation.ConversationReq{ func() {
ConversationID: msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, req.GroupInfoForSetEX.GroupID), conversation := &pbconversation.ConversationReq{
ConversationType: constant.ReadGroupChatType, ConversationID: msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, req.GroupInfoForSetEX.GroupID),
GroupID: req.GroupInfoForSetEX.GroupID, ConversationType: constant.ReadGroupChatType,
} GroupID: req.GroupInfoForSetEX.GroupID,
}
resp, err := g.GetGroupMemberUserIDs(ctx, &pbgroup.GetGroupMemberUserIDsReq{GroupID: req.GroupInfoForSetEX.GroupID}) resp, err := g.GetGroupMemberUserIDs(ctx, &pbgroup.GetGroupMemberUserIDsReq{GroupID: req.GroupInfoForSetEX.GroupID})
if err != nil { if err != nil {
log.ZWarn(ctx, "GetGroupMemberIDs is failed.", err) log.ZWarn(ctx, "GetGroupMemberIDs", err)
return return
} }
conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.GroupNotification} conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.GroupNotification}
if err := g.conversationRpcClient.SetConversations(ctx, resp.UserIDs, conversation); err != nil { if err := g.conversationRpcClient.SetConversations(ctx, resp.UserIDs, conversation); err != nil {
log.ZWarn(ctx, "SetConversations", err, "UserIDs", resp.UserIDs, "conversation", conversation) log.ZWarn(ctx, "SetConversations", err, resp.UserIDs, conversation)
} }
}() }()
g.notification.GroupInfoSetAnnouncementNotification(ctx, &sdkws.GroupInfoSetAnnouncementTips{Group: tips.Group, OpUser: tips.OpUser}) g.notification.GroupInfoSetAnnouncementNotification(ctx, &sdkws.GroupInfoSetAnnouncementTips{Group: tips.Group, OpUser: tips.OpUser})
}
} }
if req.GroupInfoForSetEX.GroupName != "" { if req.GroupInfoForSetEX.GroupName != "" {
num-- num--
g.notification.GroupInfoSetNameNotification(ctx, &sdkws.GroupInfoSetNameTips{Group: tips.Group, OpUser: tips.OpUser}) g.notification.GroupInfoSetNameNotification(ctx, &sdkws.GroupInfoSetNameTips{Group: tips.Group, OpUser: tips.OpUser})
} }
if num > 0 { if num > 0 {
g.notification.GroupInfoSetNotification(ctx, tips) g.notification.GroupInfoSetNotification(ctx, tips)
} }

@ -535,28 +535,7 @@ func (g *GroupNotificationSender) MemberEnterNotification(ctx context.Context, g
if err := g.conversationRpcClient.GroupChatFirstCreateConversation(ctx, groupID, entrantUserID); err != nil { if err := g.conversationRpcClient.GroupChatFirstCreateConversation(ctx, groupID, entrantUserID); err != nil {
return err return err
} }
opUserID := mcontext.GetOpUserID(ctx)
var opUser *sdkws.GroupMemberFullInfo
if authverify.IsAppManagerUid(ctx, g.config.Share.IMAdminUserID) {
opUser = &sdkws.GroupMemberFullInfo{
GroupID: groupID,
UserID: opUserID,
AppMangerLevel: constant.AppAdmin,
}
} else {
users, err := g.getGroupMembers(ctx, groupID, []string{opUserID})
if err != nil {
return err
}
if len(users) == 0 {
opUser = &sdkws.GroupMemberFullInfo{
GroupID: groupID,
UserID: opUserID,
}
} else {
opUser = users[0]
}
}
var group *sdkws.GroupInfo var group *sdkws.GroupInfo
group, err = g.getGroupInfo(ctx, groupID) group, err = g.getGroupInfo(ctx, groupID)
if err != nil { if err != nil {
@ -566,7 +545,11 @@ func (g *GroupNotificationSender) MemberEnterNotification(ctx context.Context, g
if err != nil { if err != nil {
return err return err
} }
tips := &sdkws.MemberInvitedTips{Group: group, InvitedUserList: users, OpUser: opUser}
tips := &sdkws.MemberInvitedTips{Group: group, InvitedUserList: users}
if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
return nil
}
g.setVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID) g.setVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID)
g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.MemberInvitedNotification, tips) g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.MemberInvitedNotification, tips)
return nil return nil

@ -121,7 +121,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
conversationRpcClient: rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation), conversationRpcClient: rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation),
config: config, config: config,
webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL), webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL),
queue: memamq.NewMemoryQueue(128, 1024*8), queue: memamq.NewMemoryQueue(16, 1024*1024),
}) })
return nil return nil
} }
@ -312,16 +312,20 @@ func (s *friendServer) GetPaginationFriendsApplyTo(ctx context.Context, req *rel
if err := s.userRpcClient.Access(ctx, req.UserID); err != nil { if err := s.userRpcClient.Access(ctx, req.UserID); err != nil {
return nil, err return nil, err
} }
total, friendRequests, err := s.db.PageFriendRequestToMe(ctx, req.UserID, req.Pagination) total, friendRequests, err := s.db.PageFriendRequestToMe(ctx, req.UserID, req.Pagination)
if err != nil { if err != nil {
return nil, err return nil, err
} }
resp = &relation.GetPaginationFriendsApplyToResp{} resp = &relation.GetPaginationFriendsApplyToResp{}
resp.FriendRequests, err = convert.FriendRequestDB2Pb(ctx, friendRequests, s.userRpcClient.GetUsersInfoMap) resp.FriendRequests, err = convert.FriendRequestDB2Pb(ctx, friendRequests, s.userRpcClient.GetUsersInfoMap)
if err != nil { if err != nil {
return nil, err return nil, err
} }
resp.Total = int32(total) resp.Total = int32(total)
return resp, nil return resp, nil
} }

@ -129,10 +129,11 @@ func (r *RootCmd) applyOptions(opts ...func(*CmdOpts)) *CmdOpts {
} }
func (r *RootCmd) initializeLogger(cmdOpts *CmdOpts) error { func (r *RootCmd) initializeLogger(cmdOpts *CmdOpts) error {
err := log.InitFromConfig( err := log.InitLoggerFromConfig(
cmdOpts.loggerPrefixName, cmdOpts.loggerPrefixName,
r.processName, r.processName,
"", "",
r.log.RemainLogLevel, r.log.RemainLogLevel,
r.log.IsStdout, r.log.IsStdout,
r.log.IsJson, r.log.IsJson,

@ -336,7 +336,8 @@ type Redis struct {
Password string `mapstructure:"password"` Password string `mapstructure:"password"`
ClusterMode bool `mapstructure:"clusterMode"` ClusterMode bool `mapstructure:"clusterMode"`
DB int `mapstructure:"storage"` DB int `mapstructure:"storage"`
MaxRetry int `mapstructure:"MaxRetry"` MaxRetry int `mapstructure:"maxRetry"`
PoolSize int `mapstructure:"poolSize"`
} }
type BeforeConfig struct { type BeforeConfig struct {
@ -474,6 +475,7 @@ func (r *Redis) Build() *redisutil.Config {
Password: r.Password, Password: r.Password,
DB: r.DB, DB: r.DB,
MaxRetry: r.MaxRetry, MaxRetry: r.MaxRetry,
PoolSize: r.PoolSize,
} }
} }

@ -23,4 +23,8 @@ var (
Name: "msg_offline_push_failed_total", Name: "msg_offline_push_failed_total",
Help: "The number of msg failed offline pushed", Help: "The number of msg failed offline pushed",
}) })
MsgLoneTimePushCounter = prometheus.NewCounter(prometheus.CounterOpts{
Name: "msg_long_time_push_total",
Help: "The number of messages with a push time exceeding 10 seconds",
})
) )

@ -47,9 +47,17 @@ func GetGrpcCusMetrics(registerName string, share *config.Share) []prometheus.Co
case share.RpcRegisterName.MessageGateway: case share.RpcRegisterName.MessageGateway:
return []prometheus.Collector{OnlineUserGauge} return []prometheus.Collector{OnlineUserGauge}
case share.RpcRegisterName.Msg: case share.RpcRegisterName.Msg:
return []prometheus.Collector{SingleChatMsgProcessSuccessCounter, SingleChatMsgProcessFailedCounter, GroupChatMsgProcessSuccessCounter, GroupChatMsgProcessFailedCounter} return []prometheus.Collector{
SingleChatMsgProcessSuccessCounter,
SingleChatMsgProcessFailedCounter,
GroupChatMsgProcessSuccessCounter,
GroupChatMsgProcessFailedCounter,
}
case share.RpcRegisterName.Push: case share.RpcRegisterName.Push:
return []prometheus.Collector{MsgOfflinePushFailedCounter} return []prometheus.Collector{
MsgOfflinePushFailedCounter,
MsgLoneTimePushCounter,
}
case share.RpcRegisterName.Auth: case share.RpcRegisterName.Auth:
return []prometheus.Collector{UserLoginCounter} return []prometheus.Collector{UserLoginCounter}
case share.RpcRegisterName.User: case share.RpcRegisterName.User:

@ -118,7 +118,7 @@ func getCache[T any](ctx context.Context, rcClient *rockscache.Client, key strin
v, err := rcClient.Fetch2(ctx, key, expire, func() (s string, err error) { v, err := rcClient.Fetch2(ctx, key, expire, func() (s string, err error) {
t, err = fn(ctx) t, err = fn(ctx)
if err != nil { if err != nil {
log.ZError(ctx, "getCache query database failed", err, "key", key) //log.ZError(ctx, "getCache query database failed", err, "key", key)
return "", err return "", err
} }
bs, err := json.Marshal(t) bs, err := json.Marshal(t)

@ -160,7 +160,7 @@ func (f *friendDatabase) BecomeFriends(ctx context.Context, ownerUserID string,
if err != nil { if err != nil {
return err return err
} }
opUserID := mcontext.GetOperationID(ctx) opUserID := mcontext.GetOpUserID(ctx)
friends := make([]*model.Friend, 0, len(friendUserIDs)*2) friends := make([]*model.Friend, 0, len(friendUserIDs)*2)
myFriendsSet := datautil.SliceSetAny(myFriends, func(friend *model.Friend) string { myFriendsSet := datautil.SliceSetAny(myFriends, func(friend *model.Friend) string {
return friend.FriendUserID return friend.FriendUserID

@ -86,7 +86,7 @@ func (c *ConversationLocalCache) GetConversation(ctx context.Context, userID, co
if err == nil { if err == nil {
log.ZDebug(ctx, "ConversationLocalCache GetConversation return", "userID", userID, "conversationID", conversationID, "value", val) log.ZDebug(ctx, "ConversationLocalCache GetConversation return", "userID", userID, "conversationID", conversationID, "value", val)
} else { } else {
log.ZError(ctx, "ConversationLocalCache GetConversation return", err, "userID", userID, "conversationID", conversationID) log.ZWarn(ctx, "ConversationLocalCache GetConversation return", err, "userID", userID, "conversationID", conversationID)
} }
}() }()
var cache cacheProto[pbconversation.Conversation] var cache cacheProto[pbconversation.Conversation]

@ -23,7 +23,6 @@ import (
"github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/log" "github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/mq/memamq" "github.com/openimsdk/tools/mq/memamq"
"github.com/openimsdk/tools/system/program" "github.com/openimsdk/tools/system/program"
"github.com/openimsdk/tools/utils/idutil" "github.com/openimsdk/tools/utils/idutil"
@ -270,8 +269,8 @@ func WithUserRpcClient(userRpcClient *UserRpcClient) NotificationSenderOptions {
} }
const ( const (
notificationWorkerCount = 2 notificationWorkerCount = 16
notificationBufferSize = 200 notificationBufferSize = 1024 * 1024 * 2
) )
func NewNotificationSender(conf *config.Notification, opts ...NotificationSenderOptions) *NotificationSender { func NewNotificationSender(conf *config.Notification, opts ...NotificationSenderOptions) *NotificationSender {
@ -298,7 +297,8 @@ func WithRpcGetUserName() NotificationOptions {
} }
func (s *NotificationSender) send(ctx context.Context, sendID, recvID string, contentType, sessionType int32, m proto.Message, opts ...NotificationOptions) { func (s *NotificationSender) send(ctx context.Context, sendID, recvID string, contentType, sessionType int32, m proto.Message, opts ...NotificationOptions) {
ctx = mcontext.WithMustInfoCtx([]string{mcontext.GetOperationID(ctx), mcontext.GetOpUserID(ctx), mcontext.GetOpUserPlatform(ctx), mcontext.GetConnID(ctx)}) //ctx = mcontext.WithMustInfoCtx([]string{mcontext.GetOperationID(ctx), mcontext.GetOpUserID(ctx), mcontext.GetOpUserPlatform(ctx), mcontext.GetConnID(ctx)})
ctx = context.WithoutCancel(ctx)
ctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(5)) ctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(5))
defer cancel() defer cancel()
n := sdkws.NotificationElem{Detail: jsonutil.StructToJsonString(m)} n := sdkws.NotificationElem{Detail: jsonutil.StructToJsonString(m)}

Loading…
Cancel
Save