fix: merge main code

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>
pull/1038/head
Gordon 2 years ago
commit 7d95544ce7

@ -0,0 +1,16 @@
OpenIMSDK/openim-docker:
- source: ./config
dest: ./openim-server/release/config
replace: true
- source: ./docs
dest: ./openim-server/release/docs
replace: true
- source: ./scripts
dest: ./openim-server/release/scripts
replace: true
- source: ./scripts
dest: ./scripts
replace: true
- source: ./Makefile
dest: ./Makefile
replace: true

8
.github/sync.yml vendored

@ -75,7 +75,7 @@ OpenIMSDK/OpenKF:
dest: .github/.codecov.yml
replace: false
openim-docker/openim-docker:
OpenIMSDK/openim-docker:
- source: ./config
dest: ./openim-server/main/config
replace: true
@ -85,6 +85,12 @@ openim-docker/openim-docker:
- source: ./scripts
dest: ./openim-server/main/scripts
replace: true
- source: ./scripts
dest: ./scripts
replace: true
- source: ./Makefile
dest: ./Makefile
replace: true
group:
# first groupcommon to all warehouses

@ -0,0 +1,43 @@
# Copyright © 2023 KubeCub open source community. All rights reserved.
# Licensed under the MIT License (the "License");
# you may not use this file except in compliance with the License.
# https://github.com/BetaHuhn/repo-file-sync-action
name: Synchronize kubecub public code to other repositories
on:
push:
paths:
- scripts/*
- docs/*
- config/*
branches:
- release-v*.*
workflow_dispatch:
jobs:
sync:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Run GitHub File Sync
uses: BetaHuhn/repo-file-sync-action@latest
with:
GH_INSTALLATION_TOKEN: "${{ secrets.BOT_GITHUB_TOKEN }}"
CONFIG_PATH: .github/sync-release.yml
ORIGINAL_MESSAGE: true
SKIP_PR: true
COMMIT_EACH_FILE: false
COMMIT_BODY: "🤖 kubbot to synchronize the warehouse"
GIT_EMAIL: "3293172751ysy@gmail.com"
GIT_USERNAME: "kubbot"
PR_BODY: 👌 kubecub provides automated community services
REVIEWERS: |
kubbot
cubxxw
PR_LABELS: |
file-sync
automerge
ASSIGNEES: |
kubbot

@ -18,9 +18,6 @@ import (
"context"
"fmt"
"net"
"net/http"
"os"
"runtime"
"strconv"
"time"
@ -46,16 +43,6 @@ func main() {
}
}
func startPprof() {
runtime.GOMAXPROCS(1)
runtime.SetMutexProfileFraction(1)
runtime.SetBlockProfileRate(1)
if err := http.ListenAndServe(":6060", nil); err != nil {
panic(err)
}
os.Exit(0)
}
func run(port int) error {
if port == 0 {
return fmt.Errorf("port is empty")
@ -92,7 +79,6 @@ func run(port int) error {
}
fmt.Println("start api server, address: ", address, ", OpenIM version: ", config.Version)
log.ZInfo(context.Background(), "start server success", "address", address, "version", config.Version)
go startPprof()
err = router.Run(address)
if err != nil {
log.ZError(context.Background(), "api run failed ", err, "address", address)

@ -37,7 +37,7 @@ require (
require github.com/google/uuid v1.3.0
require (
github.com/OpenIMSDK/protocol v0.0.15
github.com/OpenIMSDK/protocol v0.0.18
github.com/OpenIMSDK/tools v0.0.14
github.com/aliyun/aliyun-oss-go-sdk v2.2.8+incompatible
github.com/go-redis/redis v6.15.9+incompatible

@ -17,8 +17,8 @@ cloud.google.com/go/storage v1.30.1/go.mod h1:NfxhC0UJE1aXSx7CIIbCf7y9HKT7Biccwk
firebase.google.com/go v3.13.0+incompatible h1:3TdYC3DDi6aHn20qoRkxwGqNgdjtblwVAyRLQwGn/+4=
firebase.google.com/go v3.13.0+incompatible/go.mod h1:xlah6XbEyW6tbfSklcfe5FHJIwjt8toICdV5Wh9ptHs=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/OpenIMSDK/protocol v0.0.15 h1:KrrvdHH9kFF/tFYL2FXRPAr2e5F5DctSHfHq6MQjUI4=
github.com/OpenIMSDK/protocol v0.0.15/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y=
github.com/OpenIMSDK/protocol v0.0.18 h1:hXukFiDMLZx7s+hDCQePIK9ABiHyNlobNL4MppvOuMY=
github.com/OpenIMSDK/protocol v0.0.18/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y=
github.com/OpenIMSDK/tools v0.0.14 h1:WLof/+WxyPyRST+QkoTKubYCiV73uCLiL8pgnpH/yKQ=
github.com/OpenIMSDK/tools v0.0.14/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI=
github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM=

@ -88,3 +88,7 @@ func (o *FriendApi) IsFriend(c *gin.Context) {
func (o *FriendApi) GetFriendIDs(c *gin.Context) {
a2r.Call(friend.FriendClient.GetFriendIDs, o.Client, c)
}
func (o *FriendApi) GetSpecifiedFriendsInfo(c *gin.Context) {
a2r.Call(friend.FriendClient.GetSpecifiedFriendsInfo, o.Client, c)
}

@ -167,6 +167,7 @@ func (m *MessageApi) DeleteMsgPhysical(c *gin.Context) {
func (m *MessageApi) getSendMsgReq(c *gin.Context, req apistruct.SendMsg) (sendMsgReq *msg.SendMsgReq, err error) {
var data interface{}
log.ZDebug(c, "getSendMsgReq", "req", req.Content)
switch req.ContentType {
case constant.Text:
data = apistruct.TextElem{}
@ -195,7 +196,7 @@ func (m *MessageApi) getSendMsgReq(c *gin.Context, req apistruct.SendMsg) (sendM
if err := mapstructure.WeakDecode(req.Content, &data); err != nil {
return nil, err
}
log.ZDebug(c, "getSendMsgReq", "data", data)
log.ZDebug(c, "getSendMsgReq", "req", req.Content)
if err := m.validate.Struct(data); err != nil {
return nil, err
}

@ -106,6 +106,7 @@ func NewGinRouter(discov discoveryregistry.SvcDiscoveryRegistry, rdb redis.Unive
friendRouterGroup.POST("/import_friend", f.ImportFriends)
friendRouterGroup.POST("/is_friend", f.IsFriend)
friendRouterGroup.POST("/get_friend_id", f.GetFriendIDs)
friendRouterGroup.POST("/get_specified_friends_info", f.GetSpecifiedFriendsInfo)
}
g := NewGroupApi(*groupRpc)
groupRouterGroup := r.Group("/group", ParseToken)

@ -133,34 +133,42 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver
// }
// }
}
cs, err := c.conversationDatabase.GetConversationsByConversationID(ctx, []string{req.Conversation.ConversationID})
if err != nil {
return nil, err
}
if len(cs) == 0 {
return nil, errs.ErrRecordNotFound.Wrap("conversation not found")
}
conv := cs[0]
var conversation tablerelation.ConversationModel
conversation.ConversationID = req.Conversation.ConversationID
conversation.ConversationType = req.Conversation.ConversationType
conversation.UserID = req.Conversation.UserID
conversation.GroupID = req.Conversation.GroupID
m := make(map[string]interface{})
if req.Conversation.RecvMsgOpt != nil {
if req.Conversation.RecvMsgOpt != nil && req.Conversation.RecvMsgOpt.Value != conv.RecvMsgOpt {
m["recv_msg_opt"] = req.Conversation.RecvMsgOpt.Value
}
if req.Conversation.AttachedInfo != nil {
if req.Conversation.AttachedInfo != nil && req.Conversation.AttachedInfo.Value != conv.AttachedInfo {
m["attached_info"] = req.Conversation.AttachedInfo.Value
}
if req.Conversation.Ex != nil {
if req.Conversation.Ex != nil && req.Conversation.Ex.Value != conv.Ex {
m["ex"] = req.Conversation.Ex.Value
}
if req.Conversation.IsPinned != nil {
if req.Conversation.IsPinned != nil && req.Conversation.IsPinned.Value != conv.IsPinned {
m["is_pinned"] = req.Conversation.IsPinned.Value
}
if req.Conversation.GroupAtType != nil {
if req.Conversation.GroupAtType != nil && req.Conversation.GroupAtType.Value != conv.GroupAtType {
m["group_at_type"] = req.Conversation.GroupAtType.Value
}
if req.Conversation.MsgDestructTime != nil {
if req.Conversation.MsgDestructTime != nil && req.Conversation.MsgDestructTime.Value != conv.MsgDestructTime {
m["msg_destruct_time"] = req.Conversation.MsgDestructTime.Value
}
if req.Conversation.IsMsgDestruct != nil {
if req.Conversation.IsMsgDestruct != nil && req.Conversation.IsMsgDestruct.Value != conv.IsMsgDestruct {
m["is_msg_destruct"] = req.Conversation.IsMsgDestruct.Value
}
if req.Conversation.IsPrivateChat != nil && req.Conversation.ConversationType != constant.SuperGroupChatType {
if req.Conversation.IsPrivateChat != nil && req.Conversation.ConversationType != constant.SuperGroupChatType && len(m) > 0 {
var conversations []*tablerelation.ConversationModel
for _, ownerUserID := range req.UserIDs {
conversation2 := conversation
@ -175,16 +183,17 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver
c.conversationNotificationSender.ConversationSetPrivateNotification(ctx, userID, req.Conversation.UserID, req.Conversation.IsPrivateChat.Value, req.Conversation.ConversationID)
}
}
if req.Conversation.BurnDuration != nil {
if req.Conversation.BurnDuration != nil && req.Conversation.BurnDuration.Value != conv.BurnDuration {
m["burn_duration"] = req.Conversation.BurnDuration.Value
}
err := c.conversationDatabase.SetUsersConversationFiledTx(ctx, req.UserIDs, &conversation, m)
if err != nil {
if len(m) > 0 {
if err := c.conversationDatabase.SetUsersConversationFiledTx(ctx, req.UserIDs, &conversation, m); err != nil {
return nil, err
}
for _, v := range req.UserIDs {
c.conversationNotificationSender.ConversationChangeNotification(ctx, v, []string{req.Conversation.ConversationID})
}
}
return &pbconversation.SetConversationsResp{}, nil
}

@ -17,6 +17,8 @@ package friend
import (
"context"
"github.com/OpenIMSDK/protocol/sdkws"
"github.com/OpenIMSDK/Open-IM-Server/pkg/authverify"
"github.com/OpenIMSDK/tools/log"
@ -45,6 +47,7 @@ type friendServer struct {
blackDatabase controller.BlackDatabase
userRpcClient *rpcclient.UserRpcClient
notificationSender *notification.FriendNotificationSender
conversationRpcClient rpcclient.ConversationRpcClient
RegisterCenter registry.SvcDiscoveryRegistry
}
@ -82,6 +85,7 @@ func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
userRpcClient: &userRpcClient,
notificationSender: notificationSender,
RegisterCenter: client,
conversationRpcClient: rpcclient.NewConversationRpcClient(client),
})
return nil
}
@ -131,17 +135,22 @@ func (s *friendServer) ImportFriends(
if _, err := s.userRpcClient.GetUsersInfo(ctx, append([]string{req.OwnerUserID}, req.FriendUserIDs...)); err != nil {
return nil, err
}
if utils.Contain(req.OwnerUserID, req.FriendUserIDs...) {
return nil, errs.ErrCanNotAddYourself.Wrap()
}
if utils.Duplicate(req.FriendUserIDs) {
return nil, errs.ErrArgs.Wrap("friend userID repeated")
}
if err := s.friendDatabase.BecomeFriends(ctx, req.OwnerUserID, req.FriendUserIDs, constant.BecomeFriendByImport); err != nil {
return nil, err
}
for _, userID := range req.FriendUserIDs {
s.notificationSender.FriendApplicationAgreedNotification(ctx, &pbfriend.RespondFriendApplyReq{
FromUserID: req.OwnerUserID,
ToUserID: userID,
HandleResult: constant.FriendResponseAgree,
})
}
return &pbfriend.ImportFriendResp{}, nil
}
@ -350,3 +359,66 @@ func (s *friendServer) GetFriendIDs(
}
return resp, nil
}
func (s *friendServer) GetSpecifiedFriendsInfo(ctx context.Context, req *pbfriend.GetSpecifiedFriendsInfoReq) (*pbfriend.GetSpecifiedFriendsInfoResp, error) {
if len(req.UserIDList) == 0 {
return nil, errs.ErrArgs.Wrap("userIDList is empty")
}
if utils.Duplicate(req.UserIDList) {
return nil, errs.ErrArgs.Wrap("userIDList repeated")
}
userMap, err := s.userRpcClient.GetUsersInfoMap(ctx, req.UserIDList)
if err != nil {
return nil, err
}
friends, err := s.friendDatabase.FindFriendsWithError(ctx, req.OwnerUserID, req.UserIDList)
if err != nil {
return nil, err
}
blacks, err := s.blackDatabase.FindBlackInfos(ctx, req.OwnerUserID, req.UserIDList)
if err != nil {
return nil, err
}
friendMap := utils.SliceToMap(friends, func(e *tablerelation.FriendModel) string {
return e.FriendUserID
})
blackMap := utils.SliceToMap(blacks, func(e *tablerelation.BlackModel) string {
return e.BlockUserID
})
resp := &pbfriend.GetSpecifiedFriendsInfoResp{
Infos: make([]*pbfriend.GetSpecifiedFriendsInfoInfo, 0, len(req.UserIDList)),
}
for _, userID := range req.UserIDList {
user := userMap[userID]
if user == nil {
continue
}
var friendInfo *sdkws.FriendInfo
if friend := friendMap[userID]; friend != nil {
friendInfo = &sdkws.FriendInfo{
OwnerUserID: friend.OwnerUserID,
Remark: friend.Remark,
CreateTime: friend.CreateTime.UnixMilli(),
AddSource: friend.AddSource,
OperatorUserID: friend.OperatorUserID,
Ex: friend.Ex,
}
}
var blackInfo *sdkws.BlackInfo
if black := blackMap[userID]; black != nil {
blackInfo = &sdkws.BlackInfo{
OwnerUserID: black.OwnerUserID,
CreateTime: black.CreateTime.UnixMilli(),
AddSource: black.AddSource,
OperatorUserID: black.OperatorUserID,
Ex: black.Ex,
}
}
resp.Infos = append(resp.Infos, &pbfriend.GetSpecifiedFriendsInfoInfo{
UserInfo: user,
FriendInfo: friendInfo,
BlackInfo: blackInfo,
})
}
return resp, nil
}

@ -22,12 +22,7 @@ import (
relationtb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
)
func (s *groupServer) FindGroupMember(
ctx context.Context,
groupIDs []string,
userIDs []string,
roleLevels []int32,
) ([]*relationtb.GroupMemberModel, error) {
func (s *groupServer) FindGroupMember(ctx context.Context, groupIDs []string, userIDs []string, roleLevels []int32) ([]*relationtb.GroupMemberModel, error) {
members, err := s.GroupDatabase.FindGroupMember(ctx, groupIDs, userIDs, roleLevels)
if err != nil {
return nil, err

@ -114,6 +114,24 @@ type groupServer struct {
msgRpcClient rpcclient.MessageRpcClient
}
func (s *groupServer) NotificationUserInfoUpdate(ctx context.Context, req *pbgroup.NotificationUserInfoUpdateReq) (*pbgroup.NotificationUserInfoUpdateResp, error) {
defer log.ZDebug(ctx, "return")
members, err := s.GroupDatabase.FindGroupMember(ctx, nil, []string{req.UserID}, nil)
if err != nil {
return nil, err
}
for _, member := range members {
if member.Nickname != "" && member.FaceURL != "" {
continue
}
if err := s.Notification.GroupMemberInfoSetNotification(ctx, member.GroupID, member.UserID); err != nil {
log.ZError(ctx, "setGroupMemberInfo notification failed", err, "member", member.UserID, "groupID", member.GroupID)
}
}
return &pbgroup.NotificationUserInfoUpdateResp{}, nil
}
func (s *groupServer) CheckGroupAdmin(ctx context.Context, groupID string) error {
if !authverify.IsAppManagerUid(ctx) {
groupMember, err := s.GroupDatabase.TakeGroupMember(ctx, groupID, mcontext.GetOpUserID(ctx))

@ -128,7 +128,9 @@ func (m *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgRe
func (m *msgServer) encapsulateMsgData(msg *sdkws.MsgData) {
msg.ServerMsgID = GetMsgID(msg.SendID)
if msg.SendTime == 0 {
msg.SendTime = utils.GetCurrentTimestampByMill()
}
switch msg.ContentType {
case constant.Text:
fallthrough

@ -50,6 +50,7 @@ type userServer struct {
friendNotificationSender *notification.FriendNotificationSender
userNotificationSender *notification.UserNotificationSender
friendRpcClient *rpcclient.FriendRpcClient
groupRpcClient *rpcclient.GroupRpcClient
RegisterCenter registry.SvcDiscoveryRegistry
}
@ -81,11 +82,13 @@ func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
userMongoDB := unrelation.NewUserMongoDriver(mongo.GetDatabase())
database := controller.NewUserDatabase(userDB, cache, tx.NewGorm(db), userMongoDB)
friendRpcClient := rpcclient.NewFriendRpcClient(client)
groupRpcClient := rpcclient.NewGroupRpcClient(client)
msgRpcClient := rpcclient.NewMessageRpcClient(client)
u := &userServer{
UserDatabase: database,
RegisterCenter: client,
friendRpcClient: &friendRpcClient,
groupRpcClient: &groupRpcClient,
friendNotificationSender: notification.NewFriendNotificationSender(&msgRpcClient, notification.WithDBFunc(database.FindWithError)),
userNotificationSender: notification.NewUserNotificationSender(&msgRpcClient, notification.WithUserFunc(database.FindWithError)),
}
@ -125,6 +128,11 @@ func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserI
if err != nil {
return nil, err
}
if req.UserInfo.Nickname != "" || req.UserInfo.FaceURL != "" {
if err := s.groupRpcClient.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID); err != nil {
log.ZError(ctx, "NotificationUserInfoUpdate", err)
}
}
for _, friendID := range friends {
s.friendNotificationSender.FriendInfoUpdatedNotification(ctx, req.UserInfo.UserID, friendID)
}

@ -72,8 +72,9 @@ type CustomElem struct {
Description string `mapstructure:"description"`
Extension string `mapstructure:"extension"`
}
type TextElem struct {
Text string `mapstructure:"text" validate:"required"`
Content string `json:"content" validate:"required"`
}
type RevokeElem struct {

@ -84,6 +84,13 @@ type configStruct struct {
Username string `yaml:"username"`
Password string `yaml:"password"`
Addr []string `yaml:"addr"`
TLS *struct {
CACrt string `yaml:"caCrt"`
ClientCrt string `yaml:"clientCrt"`
ClientKey string `yaml:"clientKey"`
ClientKeyPwd string `yaml:"clientKeyPwd"`
InsecureSkipVerify bool `yaml:"insecureSkipVerify"`
} `yaml:"tls"`
LatestMsgToRedis struct {
Topic string `yaml:"topic"`
} `yaml:"latestMsgToRedis"`

@ -244,7 +244,7 @@ func (u *UserCacheRedis) SetUserStatus(ctx context.Context, list []*user.OnlineS
}
if newPlatformIDs == nil {
onlineStatus.Status = constant.Offline
onlineStatus.PlatformIDs = nil
onlineStatus.PlatformIDs = []int32{}
newjsonData, err := json.Marshal(&onlineStatus)
if err != nil {
return errs.Wrap(err)

@ -36,6 +36,7 @@ type BlackDatabase interface {
pageNumber, showNumber int32,
) (blacks []*relation.BlackModel, total int64, err error)
FindBlackIDs(ctx context.Context, ownerUserID string) (blackIDs []string, err error)
FindBlackInfos(ctx context.Context, ownerUserID string, userIDs []string) (blacks []*relation.BlackModel, err error)
// CheckIn 检查user2是否在user1的黑名单列表中(inUser1Blacks==true) 检查user1是否在user2的黑名单列表中(inUser2Blacks==true)
CheckIn(ctx context.Context, userID1, userID2 string) (inUser1Blacks bool, inUser2Blacks bool, err error)
}
@ -102,3 +103,7 @@ func (b *blackDatabase) CheckIn(
func (b *blackDatabase) FindBlackIDs(ctx context.Context, ownerUserID string) (blackIDs []string, err error) {
return b.cache.GetBlackIDs(ctx, ownerUserID)
}
func (b *blackDatabase) FindBlackInfos(ctx context.Context, ownerUserID string, userIDs []string) (blacks []*relation.BlackModel, err error) {
return b.black.FindOwnerBlackInfos(ctx, ownerUserID, userIDs)
}

@ -41,83 +41,34 @@ type GroupDatabase interface {
TakeGroup(ctx context.Context, groupID string) (group *relationtb.GroupModel, err error)
FindGroup(ctx context.Context, groupIDs []string) (groups []*relationtb.GroupModel, err error)
FindNotDismissedGroup(ctx context.Context, groupIDs []string) (groups []*relationtb.GroupModel, err error)
SearchGroup(
ctx context.Context,
keyword string,
pageNumber, showNumber int32,
) (uint32, []*relationtb.GroupModel, error)
SearchGroup(ctx context.Context, keyword string, pageNumber, showNumber int32) (uint32, []*relationtb.GroupModel, error)
UpdateGroup(ctx context.Context, groupID string, data map[string]any) error
DismissGroup(ctx context.Context, groupID string, deleteMember bool) error // 解散群,并删除群成员
GetGroupIDsByGroupType(ctx context.Context, groupType int) (groupIDs []string, err error)
// GroupMember
TakeGroupMember(
ctx context.Context,
groupID string,
userID string,
) (groupMember *relationtb.GroupMemberModel, err error)
TakeGroupMember(ctx context.Context, groupID string, userID string) (groupMember *relationtb.GroupMemberModel, err error)
TakeGroupOwner(ctx context.Context, groupID string) (*relationtb.GroupMemberModel, error)
FindGroupMember(
ctx context.Context,
groupIDs []string,
userIDs []string,
roleLevels []int32,
) ([]*relationtb.GroupMemberModel, error)
FindGroupMember(ctx context.Context, groupIDs []string, userIDs []string, roleLevels []int32) ([]*relationtb.GroupMemberModel, error)
FindGroupMemberUserID(ctx context.Context, groupID string) ([]string, error)
FindGroupMemberNum(ctx context.Context, groupID string) (uint32, error)
FindUserManagedGroupID(ctx context.Context, userID string) (groupIDs []string, err error)
PageGroupRequest(
ctx context.Context,
groupIDs []string,
pageNumber, showNumber int32,
) (uint32, []*relationtb.GroupRequestModel, error)
// PageGroupMember(ctx context.Context, groupIDs []string, userIDs []string, roleLevels []int32, pageNumber,
// showNumber int32) (uint32, []*relationtb.GroupMemberModel, error)
PageGetJoinGroup(
ctx context.Context,
userID string,
pageNumber, showNumber int32,
) (total uint32, totalGroupMembers []*relationtb.GroupMemberModel, err error)
PageGetGroupMember(
ctx context.Context,
groupID string,
pageNumber, showNumber int32,
) (total uint32, totalGroupMembers []*relationtb.GroupMemberModel, err error)
SearchGroupMember(
ctx context.Context,
keyword string,
groupIDs []string,
userIDs []string,
roleLevels []int32,
pageNumber, showNumber int32,
) (uint32, []*relationtb.GroupMemberModel, error)
HandlerGroupRequest(
ctx context.Context,
groupID string,
userID string,
handledMsg string,
handleResult int32,
member *relationtb.GroupMemberModel,
) error
PageGroupRequest(ctx context.Context, groupIDs []string, pageNumber, showNumber int32) (uint32, []*relationtb.GroupRequestModel, error)
PageGetJoinGroup(ctx context.Context, userID string, pageNumber, showNumber int32) (total uint32, totalGroupMembers []*relationtb.GroupMemberModel, err error)
PageGetGroupMember(ctx context.Context, groupID string, pageNumber, showNumber int32) (total uint32, totalGroupMembers []*relationtb.GroupMemberModel, err error)
SearchGroupMember(ctx context.Context, keyword string, groupIDs []string, userIDs []string, roleLevels []int32, pageNumber, showNumber int32) (uint32, []*relationtb.GroupMemberModel, error)
HandlerGroupRequest(ctx context.Context, groupID string, userID string, handledMsg string, handleResult int32, member *relationtb.GroupMemberModel) error
DeleteGroupMember(ctx context.Context, groupID string, userIDs []string) error
MapGroupMemberUserID(ctx context.Context, groupIDs []string) (map[string]*relationtb.GroupSimpleUserID, error)
MapGroupMemberNum(ctx context.Context, groupIDs []string) (map[string]uint32, error)
TransferGroupOwner(
ctx context.Context,
groupID string,
oldOwnerUserID, newOwnerUserID string,
roleLevel int32,
) error // 转让群
TransferGroupOwner(ctx context.Context, groupID string, oldOwnerUserID, newOwnerUserID string, roleLevel int32) error // 转让群
UpdateGroupMember(ctx context.Context, groupID string, userID string, data map[string]any) error
UpdateGroupMembers(ctx context.Context, data []*relationtb.BatchUpdateGroupMember) error
// GroupRequest
CreateGroupRequest(ctx context.Context, requests []*relationtb.GroupRequestModel) error
TakeGroupRequest(ctx context.Context, groupID string, userID string) (*relationtb.GroupRequestModel, error)
FindGroupRequests(ctx context.Context, groupID string, userIDs []string) (int64, []*relationtb.GroupRequestModel, error)
PageGroupRequestUser(
ctx context.Context,
userID string,
pageNumber, showNumber int32,
) (uint32, []*relationtb.GroupRequestModel, error)
PageGroupRequestUser(ctx context.Context, userID string, pageNumber, showNumber int32) (uint32, []*relationtb.GroupRequestModel, error)
// SuperGroupModelInterface
FindSuperGroup(ctx context.Context, groupIDs []string) ([]*unrelationtb.SuperGroupModel, error)
FindJoinSuperGroup(ctx context.Context, userID string) ([]string, error)
@ -310,12 +261,22 @@ func (g *groupDatabase) PageGroupRequest(
return g.groupRequestDB.PageGroup(ctx, groupIDs, pageNumber, showNumber)
}
func (g *groupDatabase) FindGroupMember(
ctx context.Context,
groupIDs []string,
userIDs []string,
roleLevels []int32,
) (totalGroupMembers []*relationtb.GroupMemberModel, err error) {
func (g *groupDatabase) FindGroupMember(ctx context.Context, groupIDs []string, userIDs []string, roleLevels []int32) (totalGroupMembers []*relationtb.GroupMemberModel, err error) {
if len(groupIDs) == 0 && len(roleLevels) == 0 && len(userIDs) == 1 {
gIDs, err := g.cache.GetJoinedGroupIDs(ctx, userIDs[0])
if err != nil {
return nil, err
}
var res []*relationtb.GroupMemberModel
for _, groupID := range gIDs {
v, err := g.cache.GetGroupMemberInfo(ctx, groupID, userIDs[0])
if err != nil {
return nil, err
}
res = append(res, v)
}
return res, nil
}
if len(roleLevels) == 0 {
for _, groupID := range groupIDs {
groupMembers, err := g.cache.GetGroupMembersInfo(ctx, groupID, userIDs)
@ -451,13 +412,8 @@ func (g *groupDatabase) MapGroupMemberNum(ctx context.Context, groupIDs []string
return m, nil
}
func (g *groupDatabase) TransferGroupOwner(
ctx context.Context,
groupID string,
oldOwnerUserID, newOwnerUserID string,
roleLevel int32,
) error {
if err := g.tx.Transaction(func(tx any) error {
func (g *groupDatabase) TransferGroupOwner(ctx context.Context, groupID string, oldOwnerUserID, newOwnerUserID string, roleLevel int32) error {
return g.tx.Transaction(func(tx any) error {
rowsAffected, err := g.groupMemberDB.NewTx(tx).UpdateRoleLevel(ctx, groupID, oldOwnerUserID, roleLevel)
if err != nil {
return err
@ -472,11 +428,8 @@ func (g *groupDatabase) TransferGroupOwner(
if rowsAffected != 1 {
return utils.Wrap(fmt.Errorf("newOwnerUserID %s rowsAffected = %d", newOwnerUserID, rowsAffected), "")
}
return nil
}); err != nil {
return err
}
return g.cache.DelGroupMembersInfo(groupID, oldOwnerUserID, newOwnerUserID).ExecDel(ctx)
return g.cache.DelGroupMembersInfo(groupID, oldOwnerUserID, newOwnerUserID).DelGroupMembersHash(groupID).ExecDel(ctx)
})
}
func (g *groupDatabase) UpdateGroupMember(

@ -17,6 +17,8 @@ package relation
import (
"context"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/ormutil"
"gorm.io/gorm"
@ -103,3 +105,7 @@ func (b *BlackGorm) FindBlackUserIDs(ctx context.Context, ownerUserID string) (b
"",
)
}
func (b *BlackGorm) FindOwnerBlackInfos(ctx context.Context, ownerUserID string, userIDs []string) (blacks []*relation.BlackModel, err error) {
return blacks, errs.Wrap(b.db(ctx).Where("owner_user_id = ? and block_user_id in ?", ownerUserID, userIDs).Find(&blacks).Error)
}

@ -43,10 +43,7 @@ type BlackModelInterface interface {
Update(ctx context.Context, blacks []*BlackModel) (err error)
Find(ctx context.Context, blacks []*BlackModel) (blackList []*BlackModel, err error)
Take(ctx context.Context, ownerUserID, blockUserID string) (black *BlackModel, err error)
FindOwnerBlacks(
ctx context.Context,
ownerUserID string,
pageNumber, showNumber int32,
) (blacks []*BlackModel, total int64, err error)
FindOwnerBlacks(ctx context.Context, ownerUserID string, pageNumber, showNumber int32) (blacks []*BlackModel, total int64, err error)
FindOwnerBlackInfos(ctx context.Context, ownerUserID string, userIDs []string) (blacks []*BlackModel, err error)
FindBlackUserIDs(ctx context.Context, ownerUserID string) (blackUserIDs []string, err error)
}

@ -40,6 +40,7 @@ func NewKafkaConsumer(addr []string, topic string) *Consumer {
consumerConfig.Net.SASL.User = config.Config.Kafka.Username
consumerConfig.Net.SASL.Password = config.Config.Kafka.Password
}
SetupTLSConfig(consumerConfig)
consumer, err := sarama.NewConsumer(p.addr, consumerConfig)
if err != nil {
panic(err.Error())

@ -19,6 +19,8 @@ import (
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/Shopify/sarama"
)
@ -35,11 +37,17 @@ type MConsumerGroupConfig struct {
}
func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []string, groupID string) *MConsumerGroup {
config := sarama.NewConfig()
config.Version = consumerConfig.KafkaVersion
config.Consumer.Offsets.Initial = consumerConfig.OffsetsInitial
config.Consumer.Return.Errors = consumerConfig.IsReturnErr
consumerGroup, err := sarama.NewConsumerGroup(addrs, groupID, config)
consumerGroupConfig := sarama.NewConfig()
consumerGroupConfig.Version = consumerConfig.KafkaVersion
consumerGroupConfig.Consumer.Offsets.Initial = consumerConfig.OffsetsInitial
consumerGroupConfig.Consumer.Return.Errors = consumerConfig.IsReturnErr
if config.Config.Kafka.Username != "" && config.Config.Kafka.Password != "" {
consumerGroupConfig.Net.SASL.Enable = true
consumerGroupConfig.Net.SASL.User = config.Config.Kafka.Username
consumerGroupConfig.Net.SASL.Password = config.Config.Kafka.Password
}
SetupTLSConfig(consumerGroupConfig)
consumerGroup, err := sarama.NewConsumerGroup(addrs, groupID, consumerGroupConfig)
if err != nil {
panic(err.Error())
}

@ -60,6 +60,7 @@ func NewKafkaProducer(addr []string, topic string) *Producer {
}
p.addr = addr
p.topic = topic
SetupTLSConfig(p.config)
var producer sarama.SyncProducer
var err error
for i := 0; i <= maxRetry; i++ {

@ -0,0 +1,21 @@
package kafka
import (
"github.com/Shopify/sarama"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/tls"
)
// SetupTLSConfig set up the TLS config from config file.
func SetupTLSConfig(cfg *sarama.Config) {
if config.Config.Kafka.TLS != nil {
cfg.Net.TLS.Enable = true
cfg.Net.TLS.Config = tls.NewTLSConfig(
config.Config.Kafka.TLS.ClientCrt,
config.Config.Kafka.TLS.ClientKey,
config.Config.Kafka.TLS.CACrt,
[]byte(config.Config.Kafka.TLS.ClientKeyPwd),
)
}
}

@ -0,0 +1,70 @@
package tls
import (
"crypto/tls"
"crypto/x509"
"encoding/pem"
"errors"
"os"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
)
func decryptPEM(data []byte, passphrase []byte) ([]byte, error) {
if len(passphrase) == 0 {
return data, nil
}
b, _ := pem.Decode(data)
d, err := x509.DecryptPEMBlock(b, passphrase)
if err != nil {
return nil, err
}
return pem.EncodeToMemory(&pem.Block{
Type: b.Type,
Bytes: d,
}), nil
}
func readEncryptablePEMBlock(path string, pwd []byte) ([]byte, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, err
}
return decryptPEM(data, pwd)
}
// NewTLSConfig setup the TLS config from general config file.
func NewTLSConfig(clientCertFile, clientKeyFile, caCertFile string, keyPwd []byte) *tls.Config {
tlsConfig := tls.Config{}
if clientCertFile != "" && clientKeyFile != "" {
certPEMBlock, err := os.ReadFile(clientCertFile)
if err != nil {
panic(err)
}
keyPEMBlock, err := readEncryptablePEMBlock(clientKeyFile, keyPwd)
if err != nil {
panic(err)
}
cert, err := tls.X509KeyPair(certPEMBlock, keyPEMBlock)
if err != nil {
panic(err)
}
tlsConfig.Certificates = []tls.Certificate{cert}
}
caCert, err := os.ReadFile(caCertFile)
if err != nil {
panic(err)
}
caCertPool := x509.NewCertPool()
ok := caCertPool.AppendCertsFromPEM(caCert)
if !ok {
panic(errors.New("not a valid CA cert"))
}
tlsConfig.RootCAs = caCertPool
tlsConfig.InsecureSkipVerify = config.Config.Kafka.TLS.InsecureSkipVerify
return &tlsConfig
}

@ -208,3 +208,10 @@ func (g *GroupRpcClient) DismissGroup(ctx context.Context, groupID string) error
})
return err
}
func (g *GroupRpcClient) NotificationUserInfoUpdate(ctx context.Context, userID string) error {
_, err := g.Client.NotificationUserInfoUpdate(ctx, &group.NotificationUserInfoUpdateReq{
UserID: userID,
})
return err
}

@ -18,6 +18,8 @@ import (
"context"
"fmt"
"github.com/OpenIMSDK/Open-IM-Server/pkg/authverify"
"github.com/OpenIMSDK/protocol/constant"
pbgroup "github.com/OpenIMSDK/protocol/group"
"github.com/OpenIMSDK/protocol/sdkws"
@ -235,6 +237,14 @@ func (g *GroupNotificationSender) fillOpUser(ctx context.Context, opUser **sdkws
}
userID := mcontext.GetOpUserID(ctx)
if groupID != "" {
if authverify.IsManagerUserID(userID) {
*opUser = &sdkws.GroupMemberFullInfo{
GroupID: groupID,
UserID: userID,
RoleLevel: constant.GroupAdmin,
AppMangerLevel: constant.AppAdmin,
}
} else {
member, err := g.db.TakeGroupMember(ctx, groupID, userID)
if err == nil {
*opUser = g.groupMemberDB2PB(member, 0)
@ -242,6 +252,7 @@ func (g *GroupNotificationSender) fillOpUser(ctx context.Context, opUser **sdkws
return err
}
}
}
user, err := g.getUser(ctx, userID)
if err != nil {
return err

@ -147,7 +147,10 @@ func (u *UserRpcClient) GetUserGlobalMsgRecvOpt(ctx context.Context, userID stri
resp, err := u.Client.GetGlobalRecvMessageOpt(ctx, &user.GetGlobalRecvMessageOptReq{
UserID: userID,
})
return resp.GlobalRecvMsgOpt, err
if err != nil {
return 0, err
}
return resp.GlobalRecvMsgOpt, nil
}
// Access verifies the access rights for the provided user ID.

@ -39,6 +39,7 @@ import (
"gorm.io/gorm"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/kafka"
"github.com/minio/minio-go/v7/pkg/credentials"
)
@ -274,6 +275,7 @@ func checkKafka() error {
cfg.Net.SASL.User = config.Config.Kafka.Username
cfg.Net.SASL.Password = config.Config.Kafka.Password
}
kafka.SetupTLSConfig(cfg)
kafkaClient, err := sarama.NewClient(config.Config.Kafka.Addr, cfg)
if err != nil {
return errs.Wrap(err)

Loading…
Cancel
Save