diff --git a/.github/sync-release.yml b/.github/sync-release.yml new file mode 100644 index 000000000..9d111aefd --- /dev/null +++ b/.github/sync-release.yml @@ -0,0 +1,16 @@ +OpenIMSDK/openim-docker: + - source: ./config + dest: ./openim-server/release/config + replace: true + - source: ./docs + dest: ./openim-server/release/docs + replace: true + - source: ./scripts + dest: ./openim-server/release/scripts + replace: true + - source: ./scripts + dest: ./scripts + replace: true + - source: ./Makefile + dest: ./Makefile + replace: true diff --git a/.github/sync.yml b/.github/sync.yml index 7ffefc79a..ee667d415 100644 --- a/.github/sync.yml +++ b/.github/sync.yml @@ -75,7 +75,7 @@ OpenIMSDK/OpenKF: dest: .github/.codecov.yml replace: false -openim-docker/openim-docker: +OpenIMSDK/openim-docker: - source: ./config dest: ./openim-server/main/config replace: true @@ -85,6 +85,12 @@ openim-docker/openim-docker: - source: ./scripts dest: ./openim-server/main/scripts replace: true + - source: ./scripts + dest: ./scripts + replace: true + - source: ./Makefile + dest: ./Makefile + replace: true group: # first group:common to all warehouses diff --git a/.github/workflows/sync-release.yml b/.github/workflows/sync-release.yml new file mode 100644 index 000000000..839040aff --- /dev/null +++ b/.github/workflows/sync-release.yml @@ -0,0 +1,43 @@ +# Copyright © 2023 KubeCub open source community. All rights reserved. +# Licensed under the MIT License (the "License"); +# you may not use this file except in compliance with the License. + +# https://github.com/BetaHuhn/repo-file-sync-action +name: Synchronize kubecub public code to other repositories +on: + push: + paths: + - scripts/* + - docs/* + - config/* + branches: + - release-v*.* + workflow_dispatch: + +jobs: + sync: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v3 + + - name: Run GitHub File Sync + uses: BetaHuhn/repo-file-sync-action@latest + with: + GH_INSTALLATION_TOKEN: "${{ secrets.BOT_GITHUB_TOKEN }}" + CONFIG_PATH: .github/sync-release.yml + ORIGINAL_MESSAGE: true + SKIP_PR: true + COMMIT_EACH_FILE: false + COMMIT_BODY: "🤖 kubbot to synchronize the warehouse" + GIT_EMAIL: "3293172751ysy@gmail.com" + GIT_USERNAME: "kubbot" + PR_BODY: 👌 kubecub provides automated community services + REVIEWERS: | + kubbot + cubxxw + PR_LABELS: | + file-sync + automerge + ASSIGNEES: | + kubbot diff --git a/cmd/openim-api/main.go b/cmd/openim-api/main.go index aee35171e..42bb7d1ac 100644 --- a/cmd/openim-api/main.go +++ b/cmd/openim-api/main.go @@ -18,9 +18,6 @@ import ( "context" "fmt" "net" - "net/http" - "os" - "runtime" "strconv" "time" @@ -46,16 +43,6 @@ func main() { } } -func startPprof() { - runtime.GOMAXPROCS(1) - runtime.SetMutexProfileFraction(1) - runtime.SetBlockProfileRate(1) - if err := http.ListenAndServe(":6060", nil); err != nil { - panic(err) - } - os.Exit(0) -} - func run(port int) error { if port == 0 { return fmt.Errorf("port is empty") @@ -92,7 +79,6 @@ func run(port int) error { } fmt.Println("start api server, address: ", address, ", OpenIM version: ", config.Version) log.ZInfo(context.Background(), "start server success", "address", address, "version", config.Version) - go startPprof() err = router.Run(address) if err != nil { log.ZError(context.Background(), "api run failed ", err, "address", address) diff --git a/go.mod b/go.mod index 8f18943a7..1e2fd2c9c 100644 --- a/go.mod +++ b/go.mod @@ -37,7 +37,7 @@ require ( require github.com/google/uuid v1.3.0 require ( - github.com/OpenIMSDK/protocol v0.0.15 + github.com/OpenIMSDK/protocol v0.0.18 github.com/OpenIMSDK/tools v0.0.14 github.com/aliyun/aliyun-oss-go-sdk v2.2.8+incompatible github.com/go-redis/redis v6.15.9+incompatible diff --git a/go.sum b/go.sum index a361872a3..fcf36033e 100644 --- a/go.sum +++ b/go.sum @@ -17,8 +17,8 @@ cloud.google.com/go/storage v1.30.1/go.mod h1:NfxhC0UJE1aXSx7CIIbCf7y9HKT7Biccwk firebase.google.com/go v3.13.0+incompatible h1:3TdYC3DDi6aHn20qoRkxwGqNgdjtblwVAyRLQwGn/+4= firebase.google.com/go v3.13.0+incompatible/go.mod h1:xlah6XbEyW6tbfSklcfe5FHJIwjt8toICdV5Wh9ptHs= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/OpenIMSDK/protocol v0.0.15 h1:KrrvdHH9kFF/tFYL2FXRPAr2e5F5DctSHfHq6MQjUI4= -github.com/OpenIMSDK/protocol v0.0.15/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= +github.com/OpenIMSDK/protocol v0.0.18 h1:hXukFiDMLZx7s+hDCQePIK9ABiHyNlobNL4MppvOuMY= +github.com/OpenIMSDK/protocol v0.0.18/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= github.com/OpenIMSDK/tools v0.0.14 h1:WLof/+WxyPyRST+QkoTKubYCiV73uCLiL8pgnpH/yKQ= github.com/OpenIMSDK/tools v0.0.14/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI= github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM= diff --git a/internal/api/friend.go b/internal/api/friend.go index 9542a61f9..2f708901e 100644 --- a/internal/api/friend.go +++ b/internal/api/friend.go @@ -88,3 +88,7 @@ func (o *FriendApi) IsFriend(c *gin.Context) { func (o *FriendApi) GetFriendIDs(c *gin.Context) { a2r.Call(friend.FriendClient.GetFriendIDs, o.Client, c) } + +func (o *FriendApi) GetSpecifiedFriendsInfo(c *gin.Context) { + a2r.Call(friend.FriendClient.GetSpecifiedFriendsInfo, o.Client, c) +} diff --git a/internal/api/msg.go b/internal/api/msg.go index 3d6d4bc5d..726889d56 100644 --- a/internal/api/msg.go +++ b/internal/api/msg.go @@ -167,6 +167,7 @@ func (m *MessageApi) DeleteMsgPhysical(c *gin.Context) { func (m *MessageApi) getSendMsgReq(c *gin.Context, req apistruct.SendMsg) (sendMsgReq *msg.SendMsgReq, err error) { var data interface{} + log.ZDebug(c, "getSendMsgReq", "req", req.Content) switch req.ContentType { case constant.Text: data = apistruct.TextElem{} @@ -195,7 +196,7 @@ func (m *MessageApi) getSendMsgReq(c *gin.Context, req apistruct.SendMsg) (sendM if err := mapstructure.WeakDecode(req.Content, &data); err != nil { return nil, err } - log.ZDebug(c, "getSendMsgReq", "data", data) + log.ZDebug(c, "getSendMsgReq", "req", req.Content) if err := m.validate.Struct(data); err != nil { return nil, err } diff --git a/internal/api/route.go b/internal/api/route.go index 0a0201d43..a118092c4 100644 --- a/internal/api/route.go +++ b/internal/api/route.go @@ -106,6 +106,7 @@ func NewGinRouter(discov discoveryregistry.SvcDiscoveryRegistry, rdb redis.Unive friendRouterGroup.POST("/import_friend", f.ImportFriends) friendRouterGroup.POST("/is_friend", f.IsFriend) friendRouterGroup.POST("/get_friend_id", f.GetFriendIDs) + friendRouterGroup.POST("/get_specified_friends_info", f.GetSpecifiedFriendsInfo) } g := NewGroupApi(*groupRpc) groupRouterGroup := r.Group("/group", ParseToken) diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index d761079e2..4a2575017 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -133,34 +133,42 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver // } // } } + cs, err := c.conversationDatabase.GetConversationsByConversationID(ctx, []string{req.Conversation.ConversationID}) + if err != nil { + return nil, err + } + if len(cs) == 0 { + return nil, errs.ErrRecordNotFound.Wrap("conversation not found") + } + conv := cs[0] var conversation tablerelation.ConversationModel conversation.ConversationID = req.Conversation.ConversationID conversation.ConversationType = req.Conversation.ConversationType conversation.UserID = req.Conversation.UserID conversation.GroupID = req.Conversation.GroupID m := make(map[string]interface{}) - if req.Conversation.RecvMsgOpt != nil { + if req.Conversation.RecvMsgOpt != nil && req.Conversation.RecvMsgOpt.Value != conv.RecvMsgOpt { m["recv_msg_opt"] = req.Conversation.RecvMsgOpt.Value } - if req.Conversation.AttachedInfo != nil { + if req.Conversation.AttachedInfo != nil && req.Conversation.AttachedInfo.Value != conv.AttachedInfo { m["attached_info"] = req.Conversation.AttachedInfo.Value } - if req.Conversation.Ex != nil { + if req.Conversation.Ex != nil && req.Conversation.Ex.Value != conv.Ex { m["ex"] = req.Conversation.Ex.Value } - if req.Conversation.IsPinned != nil { + if req.Conversation.IsPinned != nil && req.Conversation.IsPinned.Value != conv.IsPinned { m["is_pinned"] = req.Conversation.IsPinned.Value } - if req.Conversation.GroupAtType != nil { + if req.Conversation.GroupAtType != nil && req.Conversation.GroupAtType.Value != conv.GroupAtType { m["group_at_type"] = req.Conversation.GroupAtType.Value } - if req.Conversation.MsgDestructTime != nil { + if req.Conversation.MsgDestructTime != nil && req.Conversation.MsgDestructTime.Value != conv.MsgDestructTime { m["msg_destruct_time"] = req.Conversation.MsgDestructTime.Value } - if req.Conversation.IsMsgDestruct != nil { + if req.Conversation.IsMsgDestruct != nil && req.Conversation.IsMsgDestruct.Value != conv.IsMsgDestruct { m["is_msg_destruct"] = req.Conversation.IsMsgDestruct.Value } - if req.Conversation.IsPrivateChat != nil && req.Conversation.ConversationType != constant.SuperGroupChatType { + if req.Conversation.IsPrivateChat != nil && req.Conversation.ConversationType != constant.SuperGroupChatType && len(m) > 0 { var conversations []*tablerelation.ConversationModel for _, ownerUserID := range req.UserIDs { conversation2 := conversation @@ -175,15 +183,16 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver c.conversationNotificationSender.ConversationSetPrivateNotification(ctx, userID, req.Conversation.UserID, req.Conversation.IsPrivateChat.Value, req.Conversation.ConversationID) } } - if req.Conversation.BurnDuration != nil { + if req.Conversation.BurnDuration != nil && req.Conversation.BurnDuration.Value != conv.BurnDuration { m["burn_duration"] = req.Conversation.BurnDuration.Value } - err := c.conversationDatabase.SetUsersConversationFiledTx(ctx, req.UserIDs, &conversation, m) - if err != nil { - return nil, err - } - for _, v := range req.UserIDs { - c.conversationNotificationSender.ConversationChangeNotification(ctx, v, []string{req.Conversation.ConversationID}) + if len(m) > 0 { + if err := c.conversationDatabase.SetUsersConversationFiledTx(ctx, req.UserIDs, &conversation, m); err != nil { + return nil, err + } + for _, v := range req.UserIDs { + c.conversationNotificationSender.ConversationChangeNotification(ctx, v, []string{req.Conversation.ConversationID}) + } } return &pbconversation.SetConversationsResp{}, nil } diff --git a/internal/rpc/friend/friend.go b/internal/rpc/friend/friend.go index 00ea68392..7b753faa5 100644 --- a/internal/rpc/friend/friend.go +++ b/internal/rpc/friend/friend.go @@ -17,6 +17,8 @@ package friend import ( "context" + "github.com/OpenIMSDK/protocol/sdkws" + "github.com/OpenIMSDK/Open-IM-Server/pkg/authverify" "github.com/OpenIMSDK/tools/log" @@ -41,11 +43,12 @@ import ( ) type friendServer struct { - friendDatabase controller.FriendDatabase - blackDatabase controller.BlackDatabase - userRpcClient *rpcclient.UserRpcClient - notificationSender *notification.FriendNotificationSender - RegisterCenter registry.SvcDiscoveryRegistry + friendDatabase controller.FriendDatabase + blackDatabase controller.BlackDatabase + userRpcClient *rpcclient.UserRpcClient + notificationSender *notification.FriendNotificationSender + conversationRpcClient rpcclient.ConversationRpcClient + RegisterCenter registry.SvcDiscoveryRegistry } func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error { @@ -79,9 +82,10 @@ func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error { blackDB, cache.NewBlackCacheRedis(rdb, blackDB, cache.GetDefaultOpt()), ), - userRpcClient: &userRpcClient, - notificationSender: notificationSender, - RegisterCenter: client, + userRpcClient: &userRpcClient, + notificationSender: notificationSender, + RegisterCenter: client, + conversationRpcClient: rpcclient.NewConversationRpcClient(client), }) return nil } @@ -131,17 +135,22 @@ func (s *friendServer) ImportFriends( if _, err := s.userRpcClient.GetUsersInfo(ctx, append([]string{req.OwnerUserID}, req.FriendUserIDs...)); err != nil { return nil, err } - if utils.Contain(req.OwnerUserID, req.FriendUserIDs...) { return nil, errs.ErrCanNotAddYourself.Wrap() } if utils.Duplicate(req.FriendUserIDs) { return nil, errs.ErrArgs.Wrap("friend userID repeated") } - if err := s.friendDatabase.BecomeFriends(ctx, req.OwnerUserID, req.FriendUserIDs, constant.BecomeFriendByImport); err != nil { return nil, err } + for _, userID := range req.FriendUserIDs { + s.notificationSender.FriendApplicationAgreedNotification(ctx, &pbfriend.RespondFriendApplyReq{ + FromUserID: req.OwnerUserID, + ToUserID: userID, + HandleResult: constant.FriendResponseAgree, + }) + } return &pbfriend.ImportFriendResp{}, nil } @@ -350,3 +359,66 @@ func (s *friendServer) GetFriendIDs( } return resp, nil } + +func (s *friendServer) GetSpecifiedFriendsInfo(ctx context.Context, req *pbfriend.GetSpecifiedFriendsInfoReq) (*pbfriend.GetSpecifiedFriendsInfoResp, error) { + if len(req.UserIDList) == 0 { + return nil, errs.ErrArgs.Wrap("userIDList is empty") + } + if utils.Duplicate(req.UserIDList) { + return nil, errs.ErrArgs.Wrap("userIDList repeated") + } + userMap, err := s.userRpcClient.GetUsersInfoMap(ctx, req.UserIDList) + if err != nil { + return nil, err + } + friends, err := s.friendDatabase.FindFriendsWithError(ctx, req.OwnerUserID, req.UserIDList) + if err != nil { + return nil, err + } + blacks, err := s.blackDatabase.FindBlackInfos(ctx, req.OwnerUserID, req.UserIDList) + if err != nil { + return nil, err + } + friendMap := utils.SliceToMap(friends, func(e *tablerelation.FriendModel) string { + return e.FriendUserID + }) + blackMap := utils.SliceToMap(blacks, func(e *tablerelation.BlackModel) string { + return e.BlockUserID + }) + resp := &pbfriend.GetSpecifiedFriendsInfoResp{ + Infos: make([]*pbfriend.GetSpecifiedFriendsInfoInfo, 0, len(req.UserIDList)), + } + for _, userID := range req.UserIDList { + user := userMap[userID] + if user == nil { + continue + } + var friendInfo *sdkws.FriendInfo + if friend := friendMap[userID]; friend != nil { + friendInfo = &sdkws.FriendInfo{ + OwnerUserID: friend.OwnerUserID, + Remark: friend.Remark, + CreateTime: friend.CreateTime.UnixMilli(), + AddSource: friend.AddSource, + OperatorUserID: friend.OperatorUserID, + Ex: friend.Ex, + } + } + var blackInfo *sdkws.BlackInfo + if black := blackMap[userID]; black != nil { + blackInfo = &sdkws.BlackInfo{ + OwnerUserID: black.OwnerUserID, + CreateTime: black.CreateTime.UnixMilli(), + AddSource: black.AddSource, + OperatorUserID: black.OperatorUserID, + Ex: black.Ex, + } + } + resp.Infos = append(resp.Infos, &pbfriend.GetSpecifiedFriendsInfoInfo{ + UserInfo: user, + FriendInfo: friendInfo, + BlackInfo: blackInfo, + }) + } + return resp, nil +} diff --git a/internal/rpc/group/fill.go b/internal/rpc/group/fill.go index 9044cdc57..e4d03de36 100644 --- a/internal/rpc/group/fill.go +++ b/internal/rpc/group/fill.go @@ -22,12 +22,7 @@ import ( relationtb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation" ) -func (s *groupServer) FindGroupMember( - ctx context.Context, - groupIDs []string, - userIDs []string, - roleLevels []int32, -) ([]*relationtb.GroupMemberModel, error) { +func (s *groupServer) FindGroupMember(ctx context.Context, groupIDs []string, userIDs []string, roleLevels []int32) ([]*relationtb.GroupMemberModel, error) { members, err := s.GroupDatabase.FindGroupMember(ctx, groupIDs, userIDs, roleLevels) if err != nil { return nil, err diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index a19f68b30..b09743759 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -114,6 +114,24 @@ type groupServer struct { msgRpcClient rpcclient.MessageRpcClient } +func (s *groupServer) NotificationUserInfoUpdate(ctx context.Context, req *pbgroup.NotificationUserInfoUpdateReq) (*pbgroup.NotificationUserInfoUpdateResp, error) { + defer log.ZDebug(ctx, "return") + + members, err := s.GroupDatabase.FindGroupMember(ctx, nil, []string{req.UserID}, nil) + if err != nil { + return nil, err + } + for _, member := range members { + if member.Nickname != "" && member.FaceURL != "" { + continue + } + if err := s.Notification.GroupMemberInfoSetNotification(ctx, member.GroupID, member.UserID); err != nil { + log.ZError(ctx, "setGroupMemberInfo notification failed", err, "member", member.UserID, "groupID", member.GroupID) + } + } + return &pbgroup.NotificationUserInfoUpdateResp{}, nil +} + func (s *groupServer) CheckGroupAdmin(ctx context.Context, groupID string) error { if !authverify.IsAppManagerUid(ctx) { groupMember, err := s.GroupDatabase.TakeGroupMember(ctx, groupID, mcontext.GetOpUserID(ctx)) diff --git a/internal/rpc/msg/verify.go b/internal/rpc/msg/verify.go index 7297dc83b..de85775d8 100644 --- a/internal/rpc/msg/verify.go +++ b/internal/rpc/msg/verify.go @@ -128,7 +128,9 @@ func (m *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgRe func (m *msgServer) encapsulateMsgData(msg *sdkws.MsgData) { msg.ServerMsgID = GetMsgID(msg.SendID) - msg.SendTime = utils.GetCurrentTimestampByMill() + if msg.SendTime == 0 { + msg.SendTime = utils.GetCurrentTimestampByMill() + } switch msg.ContentType { case constant.Text: fallthrough diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index a1cb0db09..32a3e314b 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -50,6 +50,7 @@ type userServer struct { friendNotificationSender *notification.FriendNotificationSender userNotificationSender *notification.UserNotificationSender friendRpcClient *rpcclient.FriendRpcClient + groupRpcClient *rpcclient.GroupRpcClient RegisterCenter registry.SvcDiscoveryRegistry } @@ -81,11 +82,13 @@ func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error { userMongoDB := unrelation.NewUserMongoDriver(mongo.GetDatabase()) database := controller.NewUserDatabase(userDB, cache, tx.NewGorm(db), userMongoDB) friendRpcClient := rpcclient.NewFriendRpcClient(client) + groupRpcClient := rpcclient.NewGroupRpcClient(client) msgRpcClient := rpcclient.NewMessageRpcClient(client) u := &userServer{ UserDatabase: database, RegisterCenter: client, friendRpcClient: &friendRpcClient, + groupRpcClient: &groupRpcClient, friendNotificationSender: notification.NewFriendNotificationSender(&msgRpcClient, notification.WithDBFunc(database.FindWithError)), userNotificationSender: notification.NewUserNotificationSender(&msgRpcClient, notification.WithUserFunc(database.FindWithError)), } @@ -125,6 +128,11 @@ func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserI if err != nil { return nil, err } + if req.UserInfo.Nickname != "" || req.UserInfo.FaceURL != "" { + if err := s.groupRpcClient.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID); err != nil { + log.ZError(ctx, "NotificationUserInfoUpdate", err) + } + } for _, friendID := range friends { s.friendNotificationSender.FriendInfoUpdatedNotification(ctx, req.UserInfo.UserID, friendID) } diff --git a/pkg/apistruct/msg.go b/pkg/apistruct/msg.go index a0c5e1429..12cf253a0 100644 --- a/pkg/apistruct/msg.go +++ b/pkg/apistruct/msg.go @@ -72,8 +72,9 @@ type CustomElem struct { Description string `mapstructure:"description"` Extension string `mapstructure:"extension"` } + type TextElem struct { - Text string `mapstructure:"text" validate:"required"` + Content string `json:"content" validate:"required"` } type RevokeElem struct { diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 754753b3b..c6dd41419 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -81,9 +81,16 @@ type configStruct struct { } `yaml:"redis"` Kafka struct { - Username string `yaml:"username"` - Password string `yaml:"password"` - Addr []string `yaml:"addr"` + Username string `yaml:"username"` + Password string `yaml:"password"` + Addr []string `yaml:"addr"` + TLS *struct { + CACrt string `yaml:"caCrt"` + ClientCrt string `yaml:"clientCrt"` + ClientKey string `yaml:"clientKey"` + ClientKeyPwd string `yaml:"clientKeyPwd"` + InsecureSkipVerify bool `yaml:"insecureSkipVerify"` + } `yaml:"tls"` LatestMsgToRedis struct { Topic string `yaml:"topic"` } `yaml:"latestMsgToRedis"` diff --git a/pkg/common/db/cache/user.go b/pkg/common/db/cache/user.go index 61b4df9d8..2db5e2190 100644 --- a/pkg/common/db/cache/user.go +++ b/pkg/common/db/cache/user.go @@ -244,7 +244,7 @@ func (u *UserCacheRedis) SetUserStatus(ctx context.Context, list []*user.OnlineS } if newPlatformIDs == nil { onlineStatus.Status = constant.Offline - onlineStatus.PlatformIDs = nil + onlineStatus.PlatformIDs = []int32{} newjsonData, err := json.Marshal(&onlineStatus) if err != nil { return errs.Wrap(err) diff --git a/pkg/common/db/controller/black.go b/pkg/common/db/controller/black.go index 4a239829f..a962df213 100644 --- a/pkg/common/db/controller/black.go +++ b/pkg/common/db/controller/black.go @@ -36,6 +36,7 @@ type BlackDatabase interface { pageNumber, showNumber int32, ) (blacks []*relation.BlackModel, total int64, err error) FindBlackIDs(ctx context.Context, ownerUserID string) (blackIDs []string, err error) + FindBlackInfos(ctx context.Context, ownerUserID string, userIDs []string) (blacks []*relation.BlackModel, err error) // CheckIn 检查user2是否在user1的黑名单列表中(inUser1Blacks==true) 检查user1是否在user2的黑名单列表中(inUser2Blacks==true) CheckIn(ctx context.Context, userID1, userID2 string) (inUser1Blacks bool, inUser2Blacks bool, err error) } @@ -102,3 +103,7 @@ func (b *blackDatabase) CheckIn( func (b *blackDatabase) FindBlackIDs(ctx context.Context, ownerUserID string) (blackIDs []string, err error) { return b.cache.GetBlackIDs(ctx, ownerUserID) } + +func (b *blackDatabase) FindBlackInfos(ctx context.Context, ownerUserID string, userIDs []string) (blacks []*relation.BlackModel, err error) { + return b.black.FindOwnerBlackInfos(ctx, ownerUserID, userIDs) +} diff --git a/pkg/common/db/controller/group.go b/pkg/common/db/controller/group.go index f5fd9d2af..7ece8a07e 100644 --- a/pkg/common/db/controller/group.go +++ b/pkg/common/db/controller/group.go @@ -41,83 +41,34 @@ type GroupDatabase interface { TakeGroup(ctx context.Context, groupID string) (group *relationtb.GroupModel, err error) FindGroup(ctx context.Context, groupIDs []string) (groups []*relationtb.GroupModel, err error) FindNotDismissedGroup(ctx context.Context, groupIDs []string) (groups []*relationtb.GroupModel, err error) - SearchGroup( - ctx context.Context, - keyword string, - pageNumber, showNumber int32, - ) (uint32, []*relationtb.GroupModel, error) + SearchGroup(ctx context.Context, keyword string, pageNumber, showNumber int32) (uint32, []*relationtb.GroupModel, error) UpdateGroup(ctx context.Context, groupID string, data map[string]any) error DismissGroup(ctx context.Context, groupID string, deleteMember bool) error // 解散群,并删除群成员 GetGroupIDsByGroupType(ctx context.Context, groupType int) (groupIDs []string, err error) // GroupMember - TakeGroupMember( - ctx context.Context, - groupID string, - userID string, - ) (groupMember *relationtb.GroupMemberModel, err error) + TakeGroupMember(ctx context.Context, groupID string, userID string) (groupMember *relationtb.GroupMemberModel, err error) TakeGroupOwner(ctx context.Context, groupID string) (*relationtb.GroupMemberModel, error) - FindGroupMember( - ctx context.Context, - groupIDs []string, - userIDs []string, - roleLevels []int32, - ) ([]*relationtb.GroupMemberModel, error) + FindGroupMember(ctx context.Context, groupIDs []string, userIDs []string, roleLevels []int32) ([]*relationtb.GroupMemberModel, error) FindGroupMemberUserID(ctx context.Context, groupID string) ([]string, error) FindGroupMemberNum(ctx context.Context, groupID string) (uint32, error) FindUserManagedGroupID(ctx context.Context, userID string) (groupIDs []string, err error) - PageGroupRequest( - ctx context.Context, - groupIDs []string, - pageNumber, showNumber int32, - ) (uint32, []*relationtb.GroupRequestModel, error) - // PageGroupMember(ctx context.Context, groupIDs []string, userIDs []string, roleLevels []int32, pageNumber, - // showNumber int32) (uint32, []*relationtb.GroupMemberModel, error) - PageGetJoinGroup( - ctx context.Context, - userID string, - pageNumber, showNumber int32, - ) (total uint32, totalGroupMembers []*relationtb.GroupMemberModel, err error) - PageGetGroupMember( - ctx context.Context, - groupID string, - pageNumber, showNumber int32, - ) (total uint32, totalGroupMembers []*relationtb.GroupMemberModel, err error) - SearchGroupMember( - ctx context.Context, - keyword string, - groupIDs []string, - userIDs []string, - roleLevels []int32, - pageNumber, showNumber int32, - ) (uint32, []*relationtb.GroupMemberModel, error) - HandlerGroupRequest( - ctx context.Context, - groupID string, - userID string, - handledMsg string, - handleResult int32, - member *relationtb.GroupMemberModel, - ) error + PageGroupRequest(ctx context.Context, groupIDs []string, pageNumber, showNumber int32) (uint32, []*relationtb.GroupRequestModel, error) + + PageGetJoinGroup(ctx context.Context, userID string, pageNumber, showNumber int32) (total uint32, totalGroupMembers []*relationtb.GroupMemberModel, err error) + PageGetGroupMember(ctx context.Context, groupID string, pageNumber, showNumber int32) (total uint32, totalGroupMembers []*relationtb.GroupMemberModel, err error) + SearchGroupMember(ctx context.Context, keyword string, groupIDs []string, userIDs []string, roleLevels []int32, pageNumber, showNumber int32) (uint32, []*relationtb.GroupMemberModel, error) + HandlerGroupRequest(ctx context.Context, groupID string, userID string, handledMsg string, handleResult int32, member *relationtb.GroupMemberModel) error DeleteGroupMember(ctx context.Context, groupID string, userIDs []string) error MapGroupMemberUserID(ctx context.Context, groupIDs []string) (map[string]*relationtb.GroupSimpleUserID, error) MapGroupMemberNum(ctx context.Context, groupIDs []string) (map[string]uint32, error) - TransferGroupOwner( - ctx context.Context, - groupID string, - oldOwnerUserID, newOwnerUserID string, - roleLevel int32, - ) error // 转让群 + TransferGroupOwner(ctx context.Context, groupID string, oldOwnerUserID, newOwnerUserID string, roleLevel int32) error // 转让群 UpdateGroupMember(ctx context.Context, groupID string, userID string, data map[string]any) error UpdateGroupMembers(ctx context.Context, data []*relationtb.BatchUpdateGroupMember) error // GroupRequest CreateGroupRequest(ctx context.Context, requests []*relationtb.GroupRequestModel) error TakeGroupRequest(ctx context.Context, groupID string, userID string) (*relationtb.GroupRequestModel, error) FindGroupRequests(ctx context.Context, groupID string, userIDs []string) (int64, []*relationtb.GroupRequestModel, error) - PageGroupRequestUser( - ctx context.Context, - userID string, - pageNumber, showNumber int32, - ) (uint32, []*relationtb.GroupRequestModel, error) + PageGroupRequestUser(ctx context.Context, userID string, pageNumber, showNumber int32) (uint32, []*relationtb.GroupRequestModel, error) // SuperGroupModelInterface FindSuperGroup(ctx context.Context, groupIDs []string) ([]*unrelationtb.SuperGroupModel, error) FindJoinSuperGroup(ctx context.Context, userID string) ([]string, error) @@ -310,12 +261,22 @@ func (g *groupDatabase) PageGroupRequest( return g.groupRequestDB.PageGroup(ctx, groupIDs, pageNumber, showNumber) } -func (g *groupDatabase) FindGroupMember( - ctx context.Context, - groupIDs []string, - userIDs []string, - roleLevels []int32, -) (totalGroupMembers []*relationtb.GroupMemberModel, err error) { +func (g *groupDatabase) FindGroupMember(ctx context.Context, groupIDs []string, userIDs []string, roleLevels []int32) (totalGroupMembers []*relationtb.GroupMemberModel, err error) { + if len(groupIDs) == 0 && len(roleLevels) == 0 && len(userIDs) == 1 { + gIDs, err := g.cache.GetJoinedGroupIDs(ctx, userIDs[0]) + if err != nil { + return nil, err + } + var res []*relationtb.GroupMemberModel + for _, groupID := range gIDs { + v, err := g.cache.GetGroupMemberInfo(ctx, groupID, userIDs[0]) + if err != nil { + return nil, err + } + res = append(res, v) + } + return res, nil + } if len(roleLevels) == 0 { for _, groupID := range groupIDs { groupMembers, err := g.cache.GetGroupMembersInfo(ctx, groupID, userIDs) @@ -451,13 +412,8 @@ func (g *groupDatabase) MapGroupMemberNum(ctx context.Context, groupIDs []string return m, nil } -func (g *groupDatabase) TransferGroupOwner( - ctx context.Context, - groupID string, - oldOwnerUserID, newOwnerUserID string, - roleLevel int32, -) error { - if err := g.tx.Transaction(func(tx any) error { +func (g *groupDatabase) TransferGroupOwner(ctx context.Context, groupID string, oldOwnerUserID, newOwnerUserID string, roleLevel int32) error { + return g.tx.Transaction(func(tx any) error { rowsAffected, err := g.groupMemberDB.NewTx(tx).UpdateRoleLevel(ctx, groupID, oldOwnerUserID, roleLevel) if err != nil { return err @@ -472,11 +428,8 @@ func (g *groupDatabase) TransferGroupOwner( if rowsAffected != 1 { return utils.Wrap(fmt.Errorf("newOwnerUserID %s rowsAffected = %d", newOwnerUserID, rowsAffected), "") } - return nil - }); err != nil { - return err - } - return g.cache.DelGroupMembersInfo(groupID, oldOwnerUserID, newOwnerUserID).ExecDel(ctx) + return g.cache.DelGroupMembersInfo(groupID, oldOwnerUserID, newOwnerUserID).DelGroupMembersHash(groupID).ExecDel(ctx) + }) } func (g *groupDatabase) UpdateGroupMember( diff --git a/pkg/common/db/relation/black_model.go b/pkg/common/db/relation/black_model.go index 9684b6f77..3946c8fc2 100644 --- a/pkg/common/db/relation/black_model.go +++ b/pkg/common/db/relation/black_model.go @@ -17,6 +17,8 @@ package relation import ( "context" + "github.com/OpenIMSDK/tools/errs" + "github.com/OpenIMSDK/tools/ormutil" "gorm.io/gorm" @@ -103,3 +105,7 @@ func (b *BlackGorm) FindBlackUserIDs(ctx context.Context, ownerUserID string) (b "", ) } + +func (b *BlackGorm) FindOwnerBlackInfos(ctx context.Context, ownerUserID string, userIDs []string) (blacks []*relation.BlackModel, err error) { + return blacks, errs.Wrap(b.db(ctx).Where("owner_user_id = ? and block_user_id in ?", ownerUserID, userIDs).Find(&blacks).Error) +} diff --git a/pkg/common/db/table/relation/black.go b/pkg/common/db/table/relation/black.go index ec7ca7a56..59dd12122 100644 --- a/pkg/common/db/table/relation/black.go +++ b/pkg/common/db/table/relation/black.go @@ -43,10 +43,7 @@ type BlackModelInterface interface { Update(ctx context.Context, blacks []*BlackModel) (err error) Find(ctx context.Context, blacks []*BlackModel) (blackList []*BlackModel, err error) Take(ctx context.Context, ownerUserID, blockUserID string) (black *BlackModel, err error) - FindOwnerBlacks( - ctx context.Context, - ownerUserID string, - pageNumber, showNumber int32, - ) (blacks []*BlackModel, total int64, err error) + FindOwnerBlacks(ctx context.Context, ownerUserID string, pageNumber, showNumber int32) (blacks []*BlackModel, total int64, err error) + FindOwnerBlackInfos(ctx context.Context, ownerUserID string, userIDs []string) (blacks []*BlackModel, err error) FindBlackUserIDs(ctx context.Context, ownerUserID string) (blackUserIDs []string, err error) } diff --git a/pkg/common/kafka/consumer.go b/pkg/common/kafka/consumer.go index 67bc3977b..e253ec5e0 100644 --- a/pkg/common/kafka/consumer.go +++ b/pkg/common/kafka/consumer.go @@ -40,6 +40,7 @@ func NewKafkaConsumer(addr []string, topic string) *Consumer { consumerConfig.Net.SASL.User = config.Config.Kafka.Username consumerConfig.Net.SASL.Password = config.Config.Kafka.Password } + SetupTLSConfig(consumerConfig) consumer, err := sarama.NewConsumer(p.addr, consumerConfig) if err != nil { panic(err.Error()) diff --git a/pkg/common/kafka/consumer_group.go b/pkg/common/kafka/consumer_group.go index b4bd81660..da62fbe65 100644 --- a/pkg/common/kafka/consumer_group.go +++ b/pkg/common/kafka/consumer_group.go @@ -19,6 +19,8 @@ import ( "github.com/OpenIMSDK/tools/log" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" + "github.com/Shopify/sarama" ) @@ -35,11 +37,17 @@ type MConsumerGroupConfig struct { } func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []string, groupID string) *MConsumerGroup { - config := sarama.NewConfig() - config.Version = consumerConfig.KafkaVersion - config.Consumer.Offsets.Initial = consumerConfig.OffsetsInitial - config.Consumer.Return.Errors = consumerConfig.IsReturnErr - consumerGroup, err := sarama.NewConsumerGroup(addrs, groupID, config) + consumerGroupConfig := sarama.NewConfig() + consumerGroupConfig.Version = consumerConfig.KafkaVersion + consumerGroupConfig.Consumer.Offsets.Initial = consumerConfig.OffsetsInitial + consumerGroupConfig.Consumer.Return.Errors = consumerConfig.IsReturnErr + if config.Config.Kafka.Username != "" && config.Config.Kafka.Password != "" { + consumerGroupConfig.Net.SASL.Enable = true + consumerGroupConfig.Net.SASL.User = config.Config.Kafka.Username + consumerGroupConfig.Net.SASL.Password = config.Config.Kafka.Password + } + SetupTLSConfig(consumerGroupConfig) + consumerGroup, err := sarama.NewConsumerGroup(addrs, groupID, consumerGroupConfig) if err != nil { panic(err.Error()) } diff --git a/pkg/common/kafka/producer.go b/pkg/common/kafka/producer.go index 938757d40..b7ec32714 100644 --- a/pkg/common/kafka/producer.go +++ b/pkg/common/kafka/producer.go @@ -60,6 +60,7 @@ func NewKafkaProducer(addr []string, topic string) *Producer { } p.addr = addr p.topic = topic + SetupTLSConfig(p.config) var producer sarama.SyncProducer var err error for i := 0; i <= maxRetry; i++ { diff --git a/pkg/common/kafka/util.go b/pkg/common/kafka/util.go new file mode 100644 index 000000000..9d5678648 --- /dev/null +++ b/pkg/common/kafka/util.go @@ -0,0 +1,21 @@ +package kafka + +import ( + "github.com/Shopify/sarama" + + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/tls" +) + +// SetupTLSConfig set up the TLS config from config file. +func SetupTLSConfig(cfg *sarama.Config) { + if config.Config.Kafka.TLS != nil { + cfg.Net.TLS.Enable = true + cfg.Net.TLS.Config = tls.NewTLSConfig( + config.Config.Kafka.TLS.ClientCrt, + config.Config.Kafka.TLS.ClientKey, + config.Config.Kafka.TLS.CACrt, + []byte(config.Config.Kafka.TLS.ClientKeyPwd), + ) + } +} diff --git a/pkg/common/tls/tls.go b/pkg/common/tls/tls.go new file mode 100644 index 000000000..5f84f87e3 --- /dev/null +++ b/pkg/common/tls/tls.go @@ -0,0 +1,70 @@ +package tls + +import ( + "crypto/tls" + "crypto/x509" + "encoding/pem" + "errors" + "os" + + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" +) + +func decryptPEM(data []byte, passphrase []byte) ([]byte, error) { + if len(passphrase) == 0 { + return data, nil + } + b, _ := pem.Decode(data) + d, err := x509.DecryptPEMBlock(b, passphrase) + if err != nil { + return nil, err + } + return pem.EncodeToMemory(&pem.Block{ + Type: b.Type, + Bytes: d, + }), nil +} + +func readEncryptablePEMBlock(path string, pwd []byte) ([]byte, error) { + data, err := os.ReadFile(path) + if err != nil { + return nil, err + } + return decryptPEM(data, pwd) +} + +// NewTLSConfig setup the TLS config from general config file. +func NewTLSConfig(clientCertFile, clientKeyFile, caCertFile string, keyPwd []byte) *tls.Config { + tlsConfig := tls.Config{} + + if clientCertFile != "" && clientKeyFile != "" { + certPEMBlock, err := os.ReadFile(clientCertFile) + if err != nil { + panic(err) + } + keyPEMBlock, err := readEncryptablePEMBlock(clientKeyFile, keyPwd) + if err != nil { + panic(err) + } + cert, err := tls.X509KeyPair(certPEMBlock, keyPEMBlock) + if err != nil { + panic(err) + } + tlsConfig.Certificates = []tls.Certificate{cert} + } + + caCert, err := os.ReadFile(caCertFile) + if err != nil { + panic(err) + } + caCertPool := x509.NewCertPool() + ok := caCertPool.AppendCertsFromPEM(caCert) + if !ok { + panic(errors.New("not a valid CA cert")) + } + tlsConfig.RootCAs = caCertPool + + tlsConfig.InsecureSkipVerify = config.Config.Kafka.TLS.InsecureSkipVerify + + return &tlsConfig +} diff --git a/pkg/rpcclient/group.go b/pkg/rpcclient/group.go index 7dc333de6..d85321cbe 100644 --- a/pkg/rpcclient/group.go +++ b/pkg/rpcclient/group.go @@ -208,3 +208,10 @@ func (g *GroupRpcClient) DismissGroup(ctx context.Context, groupID string) error }) return err } + +func (g *GroupRpcClient) NotificationUserInfoUpdate(ctx context.Context, userID string) error { + _, err := g.Client.NotificationUserInfoUpdate(ctx, &group.NotificationUserInfoUpdateReq{ + UserID: userID, + }) + return err +} diff --git a/pkg/rpcclient/notification/group.go b/pkg/rpcclient/notification/group.go index 87c657efb..ee62f08b4 100644 --- a/pkg/rpcclient/notification/group.go +++ b/pkg/rpcclient/notification/group.go @@ -18,6 +18,8 @@ import ( "context" "fmt" + "github.com/OpenIMSDK/Open-IM-Server/pkg/authverify" + "github.com/OpenIMSDK/protocol/constant" pbgroup "github.com/OpenIMSDK/protocol/group" "github.com/OpenIMSDK/protocol/sdkws" @@ -235,11 +237,20 @@ func (g *GroupNotificationSender) fillOpUser(ctx context.Context, opUser **sdkws } userID := mcontext.GetOpUserID(ctx) if groupID != "" { - member, err := g.db.TakeGroupMember(ctx, groupID, userID) - if err == nil { - *opUser = g.groupMemberDB2PB(member, 0) - } else if !errs.ErrRecordNotFound.Is(err) { - return err + if authverify.IsManagerUserID(userID) { + *opUser = &sdkws.GroupMemberFullInfo{ + GroupID: groupID, + UserID: userID, + RoleLevel: constant.GroupAdmin, + AppMangerLevel: constant.AppAdmin, + } + } else { + member, err := g.db.TakeGroupMember(ctx, groupID, userID) + if err == nil { + *opUser = g.groupMemberDB2PB(member, 0) + } else if !errs.ErrRecordNotFound.Is(err) { + return err + } } } user, err := g.getUser(ctx, userID) diff --git a/pkg/rpcclient/user.go b/pkg/rpcclient/user.go index 0b2118f39..5ea747381 100644 --- a/pkg/rpcclient/user.go +++ b/pkg/rpcclient/user.go @@ -147,7 +147,10 @@ func (u *UserRpcClient) GetUserGlobalMsgRecvOpt(ctx context.Context, userID stri resp, err := u.Client.GetGlobalRecvMessageOpt(ctx, &user.GetGlobalRecvMessageOptReq{ UserID: userID, }) - return resp.GlobalRecvMsgOpt, err + if err != nil { + return 0, err + } + return resp.GlobalRecvMsgOpt, nil } // Access verifies the access rights for the provided user ID. diff --git a/tools/component/component.go b/tools/component/component.go index 295ac44b1..be454f900 100644 --- a/tools/component/component.go +++ b/tools/component/component.go @@ -39,6 +39,7 @@ import ( "gorm.io/gorm" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/kafka" "github.com/minio/minio-go/v7/pkg/credentials" ) @@ -274,6 +275,7 @@ func checkKafka() error { cfg.Net.SASL.User = config.Config.Kafka.Username cfg.Net.SASL.Password = config.Config.Kafka.Password } + kafka.SetupTLSConfig(cfg) kafkaClient, err := sarama.NewClient(config.Config.Kafka.Addr, cfg) if err != nil { return errs.Wrap(err)