diff --git a/Dockerfile b/Dockerfile index 3f765805c..e082dd64c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -43,7 +43,7 @@ COPY --from=builder $SERVER_DIR/start-config.yml $SERVER_DIR/ COPY --from=builder $SERVER_DIR/go.mod $SERVER_DIR/ COPY --from=builder $SERVER_DIR/go.sum $SERVER_DIR/ -RUN go get github.com/openimsdk/gomake@v0.0.13 +RUN go get github.com/openimsdk/gomake@v0.0.14-alpha.5 # Set the command to run when the container starts ENTRYPOINT ["sh", "-c", "mage start && tail -f /dev/null"] diff --git a/config/openim-crontask.yml b/config/openim-crontask.yml index 9bbccfd25..3839104a4 100644 --- a/config/openim-crontask.yml +++ b/config/openim-crontask.yml @@ -1,2 +1,3 @@ -chatRecordsClearTime: "0 2 * * *" +cronExecuteTime: "0 2 * * *" retainChatRecords: 365 +fileExpireTime: 90 diff --git a/config/webhooks.yml b/config/webhooks.yml index c7839d4f2..11a85ba0c 100644 --- a/config/webhooks.yml +++ b/config/webhooks.yml @@ -13,6 +13,9 @@ afterUpdateUserInfoEx: afterSendSingleMsg: enable: false timeout: 5 + # Only the senID/recvID specified in attentionIds will send the callback + # if not set, all user messages will be callback + attentionIds: [] beforeSendGroupMsg: enable: false timeout: 5 diff --git a/go.mod b/go.mod index e4f45e963..285f0d965 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/protocol v0.0.69-alpha.24 - github.com/openimsdk/tools v0.0.49-alpha.25 + github.com/openimsdk/tools v0.0.49-alpha.39 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 @@ -35,7 +35,7 @@ require ( github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/kelindar/bitmap v1.5.2 github.com/likexian/gokit v0.25.13 - github.com/openimsdk/gomake v0.0.13 + github.com/openimsdk/gomake v0.0.14-alpha.5 github.com/redis/go-redis/v9 v9.4.0 github.com/robfig/cron/v3 v3.0.1 github.com/shirou/gopsutil v3.21.11+incompatible @@ -176,5 +176,3 @@ require ( golang.org/x/crypto v0.21.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect ) - -//replace github.com/openimsdk/protocol => /Users/chao/Desktop/project/protocol diff --git a/go.sum b/go.sum index b0f495e82..f452bcd87 100644 --- a/go.sum +++ b/go.sum @@ -268,12 +268,12 @@ github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= -github.com/openimsdk/gomake v0.0.13 h1:xLDe/moqgWpRoptHzI4packAWzs4C16b+sVY+txNJp0= -github.com/openimsdk/gomake v0.0.13/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.69-alpha.24 h1:TYcNJeWOTuE40UQ54eNPdDdy0KTOh9rAOgax8lCyhDc= -github.com/openimsdk/protocol v0.0.69-alpha.24/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= -github.com/openimsdk/tools v0.0.49-alpha.25 h1:OpRPwDZ2xWX7Zj5kyfZhryu/NfZTrsRVr2GFwu1HQHI= -github.com/openimsdk/tools v0.0.49-alpha.25/go.mod h1:rwsFI1G/nBHNfiNapbven41akRDPBbH4df0Cgy6xueU= +github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= +github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= +github.com/openimsdk/protocol v0.0.69-alpha.22 h1:kifZWVNDkg9diXFJUJ/Q9xFc80cveBhc+1dUXcE9xHQ= +github.com/openimsdk/protocol v0.0.69-alpha.22/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= +github.com/openimsdk/tools v0.0.49-alpha.39 h1:bl5+q7xHsc/j1NnkN8/gYmn23RsNNbRizDY58d2EY1w= +github.com/openimsdk/tools v0.0.49-alpha.39/go.mod h1:zc0maZ2ohXlHd0ylY5JnCE8uqq/hslhcfcKa6iO5PCU= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/internal/api/group.go b/internal/api/group.go index 91992004c..e48191ee1 100644 --- a/internal/api/group.go +++ b/internal/api/group.go @@ -67,6 +67,7 @@ func (o *GroupApi) GetGroupUsersReqApplicationList(c *gin.Context) { func (o *GroupApi) GetGroupsInfo(c *gin.Context) { a2r.Call(group.GroupClient.GetGroupsInfo, o.Client, c) + //a2r.Call(group.GroupClient.GetGroupsInfo, o.Client, c, a2r.NewNilReplaceOption(group.GroupClient.GetGroupsInfo)) } func (o *GroupApi) KickGroupMember(c *gin.Context) { @@ -75,6 +76,7 @@ func (o *GroupApi) KickGroupMember(c *gin.Context) { func (o *GroupApi) GetGroupMembersInfo(c *gin.Context) { a2r.Call(group.GroupClient.GetGroupMembersInfo, o.Client, c) + //a2r.Call(group.GroupClient.GetGroupMembersInfo, o.Client, c, a2r.NewNilReplaceOption(group.GroupClient.GetGroupMembersInfo)) } func (o *GroupApi) GetGroupMemberList(c *gin.Context) { diff --git a/internal/api/msg.go b/internal/api/msg.go index 180342e59..ba63fbb66 100644 --- a/internal/api/msg.go +++ b/internal/api/msg.go @@ -101,6 +101,7 @@ func (m MessageApi) newUserSendMsgReq(_ *gin.Context, params *apistruct.SendMsg) SendTime: params.SendTime, Options: options, OfflinePushInfo: params.OfflinePushInfo, + Ex: params.Ex, }, } return &pbData diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index 3f3ca11e8..e942f40ae 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -215,7 +215,7 @@ func (c *ConsumerHandler) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws. } func (c *ConsumerHandler) Push2Group(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) { - log.ZDebug(ctx, "Get super group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID) + log.ZDebug(ctx, "Get group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID) var pushToUserIDs []string if err = c.webhookBeforeGroupOnlinePush(ctx, &c.config.WebhooksConfig.BeforeGroupOnlinePush, groupID, msg, &pushToUserIDs); err != nil { diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index 320fb1d52..804375e4f 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -61,7 +61,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg userRpcClient: &userRpcClient, RegisterCenter: client, authDatabase: controller.NewAuthDatabase( - redis2.NewTokenCacheModel(rdb), + redis2.NewTokenCacheModel(rdb, config.RpcConfig.TokenPolicy.Expire), config.Share.Secret, config.RpcConfig.TokenPolicy.Expire, ), diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 5b82b8ceb..e3d1d4dfe 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -17,17 +17,18 @@ package group import ( "context" "fmt" + "math/big" + "math/rand" + "strconv" + "strings" + "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/common" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" "github.com/openimsdk/open-im-server/v3/pkg/localcache" - "math/big" - "math/rand" - "strconv" - "strings" - "time" "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" @@ -531,6 +532,14 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbgroup.KickGrou if datautil.Contain(opUserID, req.KickedUserIDs...) { return nil, errs.ErrArgs.WrapMsg("opUserID in KickedUserIDs") } + owner, err := s.db.TakeGroupOwner(ctx, req.GroupID) + if err != nil { + return nil, err + } + if datautil.Contain(owner.UserID, req.KickedUserIDs...) { + return nil, errs.ErrArgs.WrapMsg("ownerUID can not Kick") + } + members, err := s.db.FindGroupMembers(ctx, req.GroupID, append(req.KickedUserIDs, opUserID)) if err != nil { return nil, err @@ -590,7 +599,7 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbgroup.KickGrou FaceURL: group.FaceURL, OwnerUserID: ownerUserID, CreateTime: group.CreateTime.UnixMilli(), - MemberCount: num, + MemberCount: num - uint32(len(req.KickedUserIDs)), Ex: group.Ex, Status: group.Status, CreatorUserID: group.CreatorUserID, diff --git a/internal/rpc/group/notification.go b/internal/rpc/group/notification.go index ee4f00ad3..a8824962d 100644 --- a/internal/rpc/group/notification.go +++ b/internal/rpc/group/notification.go @@ -36,6 +36,12 @@ import ( "github.com/openimsdk/tools/utils/stringutil" ) +// GroupApplicationReceiver +const ( + applicantReceiver = iota + adminReceiver +) + func NewGroupNotificationSender(db controller.GroupDatabase, msgRpcClient *rpcclient.MessageRpcClient, userRpcClient *rpcclient.UserRpcClient, config *Config, fn func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error)) *GroupNotificationSender { return &GroupNotificationSender{ NotificationSender: rpcclient.NewNotificationSender(&config.NotificationConfig, rpcclient.WithRpcClient(msgRpcClient), rpcclient.WithUserRpcClient(userRpcClient)), @@ -418,15 +424,17 @@ func (g *GroupNotificationSender) GroupApplicationAcceptedNotification(ctx conte if err != nil { return } - tips := &sdkws.GroupApplicationAcceptedTips{Group: group, HandleMsg: req.HandledMsg} - if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { + + var opUser *sdkws.GroupMemberFullInfo + if err = g.fillOpUser(ctx, &opUser, group.GroupID); err != nil { return } for _, userID := range append(userIDs, req.FromUserID) { + tips := &sdkws.GroupApplicationAcceptedTips{Group: group, OpUser: opUser, HandleMsg: req.HandledMsg} if userID == req.FromUserID { - tips.ReceiverAs = 0 + tips.ReceiverAs = applicantReceiver } else { - tips.ReceiverAs = 1 + tips.ReceiverAs = adminReceiver } g.Notification(ctx, mcontext.GetOpUserID(ctx), userID, constant.GroupApplicationAcceptedNotification, tips) } @@ -449,15 +457,17 @@ func (g *GroupNotificationSender) GroupApplicationRejectedNotification(ctx conte if err != nil { return } - tips := &sdkws.GroupApplicationRejectedTips{Group: group, HandleMsg: req.HandledMsg} - if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { + + var opUser *sdkws.GroupMemberFullInfo + if err = g.fillOpUser(ctx, &opUser, group.GroupID); err != nil { return } for _, userID := range append(userIDs, req.FromUserID) { + tips := &sdkws.GroupApplicationAcceptedTips{Group: group, OpUser: opUser, HandleMsg: req.HandledMsg} if userID == req.FromUserID { - tips.ReceiverAs = 0 + tips.ReceiverAs = applicantReceiver } else { - tips.ReceiverAs = 1 + tips.ReceiverAs = adminReceiver } g.Notification(ctx, mcontext.GetOpUserID(ctx), userID, constant.GroupApplicationRejectedNotification, tips) } diff --git a/internal/rpc/group/sync.go b/internal/rpc/group/sync.go index 3793d0c7e..75d060c0e 100644 --- a/internal/rpc/group/sync.go +++ b/internal/rpc/group/sync.go @@ -82,6 +82,9 @@ func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgrou } return false }) + if vl.LogLen > 0 { + hasGroupUpdate = true + } return vl, nil }, CacheMaxVersion: s.db.FindMaxGroupMemberVersionCache, diff --git a/internal/rpc/msg/callback.go b/internal/rpc/msg/callback.go index 10404675e..be58d7504 100644 --- a/internal/rpc/msg/callback.go +++ b/internal/rpc/msg/callback.go @@ -83,6 +83,11 @@ func (m *msgServer) webhookAfterSendSingleMsg(ctx context.Context, after *config if msg.MsgData.ContentType == constant.Typing { return } + // According to the attentionIds configuration, only some users are sent + attentionIds := after.AttentionIds + if attentionIds != nil && !datautil.Contain(msg.MsgData.RecvID, attentionIds...) && !datautil.Contain(msg.MsgData.SendID, attentionIds...) { + return + } cbReq := &cbapi.CallbackAfterSendSingleMsgReq{ CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand), RecvID: msg.MsgData.RecvID, diff --git a/internal/rpc/msg/seq.go b/internal/rpc/msg/seq.go index 27465c210..1ebec4a71 100644 --- a/internal/rpc/msg/seq.go +++ b/internal/rpc/msg/seq.go @@ -16,13 +16,15 @@ package msg import ( "context" + "github.com/openimsdk/tools/errs" + "github.com/redis/go-redis/v9" pbmsg "github.com/openimsdk/protocol/msg" ) func (m *msgServer) GetConversationMaxSeq(ctx context.Context, req *pbmsg.GetConversationMaxSeqReq) (*pbmsg.GetConversationMaxSeqResp, error) { maxSeq, err := m.MsgDatabase.GetMaxSeq(ctx, req.ConversationID) - if err != nil { + if err != nil && errs.Unwrap(err) != redis.Nil { return nil, err } return &pbmsg.GetConversationMaxSeqResp{MaxSeq: maxSeq}, nil diff --git a/internal/rpc/third/s3.go b/internal/rpc/third/s3.go index 4cb1b81d0..54cbc30c1 100644 --- a/internal/rpc/third/s3.go +++ b/internal/rpc/third/s3.go @@ -19,13 +19,17 @@ import ( "encoding/base64" "encoding/hex" "encoding/json" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "path" "strconv" "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "go.mongodb.org/mongo-driver/mongo" + "github.com/google/uuid" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" + "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/protocol/third" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" @@ -283,6 +287,52 @@ func (t *thirdServer) apiAddress(prefix, name string) string { return prefix + name } +func (t *thirdServer) DeleteOutdatedData(ctx context.Context, req *third.DeleteOutdatedDataReq) (*third.DeleteOutdatedDataResp, error) { + var conf config.Third + expireTime := time.UnixMilli(req.ExpireTime) + findPagination := &sdkws.RequestPagination{ + PageNumber: 1, + ShowNumber: 1000, + } + for { + total, models, err := t.s3dataBase.FindByExpires(ctx, expireTime, findPagination) + if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments { + return nil, errs.Wrap(err) + } + needDelObjectKeys := make([]string, 0) + for _, model := range models { + needDelObjectKeys = append(needDelObjectKeys, model.Key) + } + + needDelObjectKeys = datautil.Distinct(needDelObjectKeys) + for _, key := range needDelObjectKeys { + count, err := t.s3dataBase.FindNotDelByS3(ctx, key, expireTime) + if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments { + return nil, errs.Wrap(err) + } + if int(count) < 1 { + thumbnailKey, err := t.s3dataBase.GetImageThumbnailKey(ctx, key) + if err != nil { + return nil, errs.Wrap(err) + } + t.s3dataBase.DeleteObject(ctx, thumbnailKey) + t.s3dataBase.DelS3Key(ctx, conf.Object.Enable, needDelObjectKeys...) + t.s3dataBase.DeleteObject(ctx, key) + } + } + for _, model := range models { + err := t.s3dataBase.DeleteSpecifiedData(ctx, model.Engine, model.Name) + if err != nil { + return nil, errs.Wrap(err) + } + } + if total < int64(findPagination.ShowNumber) { + break + } + } + return &third.DeleteOutdatedDataResp{}, nil +} + type FormDataMate struct { Name string `json:"name"` Size int64 `json:"size"` diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index 0b96077ec..1da923e95 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -62,6 +62,11 @@ type userServer struct { webhookClient *webhook.Client } +func (s *userServer) SetUserOnlineStatus(ctx context.Context, req *pbuser.SetUserOnlineStatusReq) (*pbuser.SetUserOnlineStatusResp, error) { + //TODO implement me + panic("implement me") +} + type Config struct { RpcConfig config.User RedisConfig config.Redis diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index 50f328b55..1ef4943cd 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -20,6 +20,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" "github.com/openimsdk/protocol/msg" + "github.com/openimsdk/protocol/third" "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/mw" "google.golang.org/grpc" @@ -39,7 +40,7 @@ type CronTaskConfig struct { } func Start(ctx context.Context, config *CronTaskConfig) error { - log.CInfo(ctx, "CRON-TASK server is initializing", "chatRecordsClearTime", config.CronTask.ChatRecordsClearTime, "msgDestructTime", config.CronTask.RetainChatRecords) + log.CInfo(ctx, "CRON-TASK server is initializing", "chatRecordsClearTime", config.CronTask.CronExecuteTime, "msgDestructTime", config.CronTask.RetainChatRecords) if config.CronTask.RetainChatRecords < 1 { return errs.New("msg destruct time must be greater than 1").Wrap() } @@ -66,16 +67,31 @@ func Start(ctx context.Context, config *CronTaskConfig) error { } log.ZInfo(ctx, "cron clear chat records success", "deltime", deltime, "cont", time.Since(now)) } - delFile := func() { + if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, clearFunc); err != nil { + return errs.Wrap(err) + } + tConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Third) + if err != nil { + return err } - if _, err := crontab.AddFunc(config.CronTask.ChatRecordsClearTime, clearFunc); err != nil { - return errs.Wrap(err) + thirdClient := third.NewThirdClient(tConn) + + deleteFunc := func() { + now := time.Now() + deleteTime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.FileExpireTime)) + ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deleteTime.UnixMilli())) + log.ZInfo(ctx, "deleteoutDatedData ", "deletetime", deleteTime, "timestamp", deleteTime.UnixMilli()) + if _, err := thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: deleteTime.UnixMilli()}); err != nil { + log.ZError(ctx, "cron deleteoutDatedData failed", err, "deleteTime", deleteTime, "cont", time.Since(now)) + return + } + log.ZInfo(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(now)) } - if _, err := crontab.AddFunc(config.CronTask.ChatRecordsClearTime, delFile); err != nil { + if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, deleteFunc); err != nil { return errs.Wrap(err) } - log.ZInfo(ctx, "start cron task", "chatRecordsClearTime", config.CronTask.ChatRecordsClearTime) + log.ZInfo(ctx, "start cron task", "CronExecuteTime", config.CronTask.CronExecuteTime) crontab.Start() <-ctx.Done() return nil diff --git a/pkg/apistruct/manage.go b/pkg/apistruct/manage.go index e79b47722..f4deb9fb1 100644 --- a/pkg/apistruct/manage.go +++ b/pkg/apistruct/manage.go @@ -15,7 +15,7 @@ package apistruct import ( - sdkws "github.com/openimsdk/protocol/sdkws" + "github.com/openimsdk/protocol/sdkws" ) // SendMsg defines the structure for sending messages with various metadata. @@ -55,6 +55,9 @@ type SendMsg struct { // OfflinePushInfo contains information for offline push notifications. OfflinePushInfo *sdkws.OfflinePushInfo `json:"offlinePushInfo"` + + // Ex stores extended fields + Ex string `json:"ex"` } // SendMsgReq extends SendMsg with the requirement of RecvID when SessionType indicates a one-on-one or notification chat. diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index a4e613560..3d6546d9c 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -105,9 +105,9 @@ type API struct { } type CronTask struct { - ChatRecordsClearTime string `mapstructure:"chatRecordsClearTime"` - RetainChatRecords int `mapstructure:"retainChatRecords"` - FileTime int + CronExecuteTime string `mapstructure:"cronExecuteTime"` + RetainChatRecords int `mapstructure:"retainChatRecords"` + FileExpireTime int `mapstructure:"fileExpireTime"` } type OfflinePushConfig struct { @@ -340,8 +340,9 @@ type BeforeConfig struct { } type AfterConfig struct { - Enable bool `mapstructure:"enable"` - Timeout int `mapstructure:"timeout"` + Enable bool `mapstructure:"enable"` + Timeout int `mapstructure:"timeout"` + AttentionIds []string `mapstructure:"attentionIds"` } type Share struct { diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index 069c92012..b531daa47 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -158,7 +158,6 @@ func Start[T any](ctx context.Context, discovery *config2.Discovery, prometheusC } return nil case <-netDone: - close(netDone) return netErr } } diff --git a/pkg/common/storage/cache/redis/token.go b/pkg/common/storage/cache/redis/token.go index 6098a666c..b82259658 100644 --- a/pkg/common/storage/cache/redis/token.go +++ b/pkg/common/storage/cache/redis/token.go @@ -21,22 +21,36 @@ import ( "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/utils/stringutil" "github.com/redis/go-redis/v9" + "time" ) type tokenCache struct { - rdb redis.UniversalClient + rdb redis.UniversalClient + accessExpire time.Duration } -func NewTokenCacheModel(rdb redis.UniversalClient) cache.TokenModel { - return &tokenCache{ - rdb: rdb, - } +func NewTokenCacheModel(rdb redis.UniversalClient, accessExpire int64) cache.TokenModel { + c := &tokenCache{rdb: rdb} + c.accessExpire = c.getExpireTime(accessExpire) + return c } -func (c *tokenCache) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error { +func (c *tokenCache) SetTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error { return errs.Wrap(c.rdb.HSet(ctx, cachekey.GetTokenKey(userID, platformID), token, flag).Err()) } +// SetTokenFlagEx set token and flag with expire time +func (c *tokenCache) SetTokenFlagEx(ctx context.Context, userID string, platformID int, token string, flag int) error { + key := cachekey.GetTokenKey(userID, platformID) + if err := c.rdb.HSet(ctx, key, token, flag).Err(); err != nil { + return errs.Wrap(err) + } + if err := c.rdb.Expire(ctx, key, c.accessExpire).Err(); err != nil { + return errs.Wrap(err) + } + return nil +} + func (c *tokenCache) GetTokensWithoutError(ctx context.Context, userID string, platformID int) (map[string]int, error) { m, err := c.rdb.HGetAll(ctx, cachekey.GetTokenKey(userID, platformID)).Result() if err != nil { @@ -61,3 +75,7 @@ func (c *tokenCache) SetTokenMapByUidPid(ctx context.Context, userID string, pla func (c *tokenCache) DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error { return errs.Wrap(c.rdb.HDel(ctx, cachekey.GetTokenKey(userID, platformID), fields...).Err()) } + +func (c *tokenCache) getExpireTime(t int64) time.Duration { + return time.Hour * 24 * time.Duration(t) +} diff --git a/pkg/common/storage/cache/token.go b/pkg/common/storage/cache/token.go index 55b3321ef..4a0fee087 100644 --- a/pkg/common/storage/cache/token.go +++ b/pkg/common/storage/cache/token.go @@ -5,7 +5,9 @@ import ( ) type TokenModel interface { - AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error + SetTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error + // SetTokenFlagEx set token and flag with expire time + SetTokenFlagEx(ctx context.Context, userID string, platformID int, token string, flag int) error GetTokensWithoutError(ctx context.Context, userID string, platformID int) (map[string]int, error) SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error diff --git a/pkg/common/storage/controller/auth.go b/pkg/common/storage/controller/auth.go index 321583743..fbfe30836 100644 --- a/pkg/common/storage/controller/auth.go +++ b/pkg/common/storage/controller/auth.go @@ -55,6 +55,7 @@ func (a *authDatabase) SetTokenMapByUidPid(ctx context.Context, userID string, p // Create Token. func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformID int) (string, error) { + isCreate := true // flag is create or update tokens, err := a.cache.GetTokensWithoutError(ctx, userID, platformID) if err != nil { return "", err @@ -65,6 +66,9 @@ func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformI if err != nil || v != constant.NormalToken { deleteTokenKey = append(deleteTokenKey, k) } + if v == constant.NormalToken { + isCreate = false + } } if len(deleteTokenKey) != 0 { err = a.cache.DeleteTokenByUidPid(ctx, userID, platformID, deleteTokenKey) @@ -79,5 +83,17 @@ func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformI if err != nil { return "", errs.WrapMsg(err, "token.SignedString") } - return tokenString, a.cache.AddTokenFlag(ctx, userID, platformID, tokenString, constant.NormalToken) + + if isCreate { + // should create,should specify expiration time + if err = a.cache.SetTokenFlagEx(ctx, userID, platformID, tokenString, constant.NormalToken); err != nil { + return "", err + } + } else { + // should update + if err = a.cache.SetTokenFlag(ctx, userID, platformID, tokenString, constant.NormalToken); err != nil { + return "", err + } + } + return tokenString, nil } diff --git a/pkg/common/storage/controller/friend.go b/pkg/common/storage/controller/friend.go index 4d3a96f12..e402f5980 100644 --- a/pkg/common/storage/controller/friend.go +++ b/pkg/common/storage/controller/friend.go @@ -335,9 +335,6 @@ func (f *friendDatabase) FindFriendsWithError(ctx context.Context, ownerUserID s if err != nil { return } - if len(friends) != len(friendUserIDs) { - err = errs.ErrRecordNotFound.Wrap() - } return } diff --git a/pkg/common/storage/controller/group.go b/pkg/common/storage/controller/group.go index 27b6f4366..3a5f48d4c 100644 --- a/pkg/common/storage/controller/group.go +++ b/pkg/common/storage/controller/group.go @@ -378,6 +378,7 @@ func (g *groupDatabase) DeleteGroupMember(ctx context.Context, groupID string, u DelGroupMembersInfo(groupID, userIDs...). DelGroupAllRoleLevel(groupID). DelMaxGroupMemberVersion(groupID). + DelMaxJoinGroupVersion(userIDs...). ChainExecDel(ctx) }) } @@ -400,10 +401,7 @@ func (g *groupDatabase) MapGroupMemberNum(ctx context.Context, groupIDs []string func (g *groupDatabase) TransferGroupOwner(ctx context.Context, groupID string, oldOwnerUserID, newOwnerUserID string, roleLevel int32) error { return g.ctxTx.Transaction(ctx, func(ctx context.Context) error { - if err := g.groupMemberDB.UpdateRoleLevel(ctx, groupID, oldOwnerUserID, roleLevel); err != nil { - return err - } - if err := g.groupMemberDB.UpdateRoleLevel(ctx, groupID, newOwnerUserID, constant.GroupOwner); err != nil { + if err := g.groupMemberDB.UpdateUserRoleLevels(ctx, groupID, oldOwnerUserID, roleLevel, newOwnerUserID, constant.GroupOwner); err != nil { return err } c := g.cache.CloneGroupCache() diff --git a/pkg/common/storage/controller/s3.go b/pkg/common/storage/controller/s3.go index b0ad61203..592659536 100644 --- a/pkg/common/storage/controller/s3.go +++ b/pkg/common/storage/controller/s3.go @@ -16,13 +16,15 @@ package controller import ( "context" - redis2 "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "path/filepath" "time" + redisCache "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" + "github.com/openimsdk/tools/db/pagination" "github.com/openimsdk/tools/s3" "github.com/openimsdk/tools/s3/cont" "github.com/redis/go-redis/v9" @@ -38,20 +40,28 @@ type S3Database interface { SetObject(ctx context.Context, info *model.Object) error StatObject(ctx context.Context, name string) (*s3.ObjectInfo, error) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) + FindByExpires(ctx context.Context, duration time.Time, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) + DeleteObject(ctx context.Context, name string) error + DeleteSpecifiedData(ctx context.Context, engine string, name string) error + FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error) + DelS3Key(ctx context.Context, engine string, keys ...string) error + GetImageThumbnailKey(ctx context.Context, name string) (string, error) } func NewS3Database(rdb redis.UniversalClient, s3 s3.Interface, obj database.ObjectInfo) S3Database { return &s3Database{ - s3: cont.New(redis2.NewS3Cache(rdb, s3), s3), - cache: redis2.NewObjectCacheRedis(rdb, obj), - db: obj, + s3: cont.New(redisCache.NewS3Cache(rdb, s3), s3), + cache: redisCache.NewObjectCacheRedis(rdb, obj), + s3cache: redisCache.NewS3Cache(rdb, s3), + db: obj, } } type s3Database struct { - s3 *cont.Controller - cache cache.ObjectCache - db database.ObjectInfo + s3 *cont.Controller + cache cache.ObjectCache + s3cache cont.S3Cache + db database.ObjectInfo } func (s *s3Database) PartSize(ctx context.Context, size int64) (int64, error) { @@ -111,3 +121,26 @@ func (s *s3Database) StatObject(ctx context.Context, name string) (*s3.ObjectInf func (s *s3Database) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) { return s.s3.FormData(ctx, name, size, contentType, duration) } +func (s *s3Database) FindByExpires(ctx context.Context, duration time.Time, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) { + + return s.db.FindByExpires(ctx, duration, pagination) +} + +func (s *s3Database) DeleteObject(ctx context.Context, name string) error { + return s.s3.DeleteObject(ctx, name) +} +func (s *s3Database) DeleteSpecifiedData(ctx context.Context, engine string, name string) error { + return s.db.Delete(ctx, engine, name) +} + +func (s *s3Database) FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error) { + return s.db.FindNotDelByS3(ctx, key, duration) +} + +func (s *s3Database) DelS3Key(ctx context.Context, engine string, keys ...string) error { + return s.s3cache.DelS3Key(ctx, engine, keys...) +} + +func (s *s3Database) GetImageThumbnailKey(ctx context.Context, name string) (string, error) { + return s.s3.GetImageThumbnailKey(ctx, name) +} diff --git a/pkg/common/storage/controller/third.go b/pkg/common/storage/controller/third.go index 344501466..a9c2ae403 100644 --- a/pkg/common/storage/controller/third.go +++ b/pkg/common/storage/controller/third.go @@ -16,9 +16,10 @@ package controller import ( "context" + "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" - "time" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/tools/db/pagination" diff --git a/pkg/common/storage/database/group_member.go b/pkg/common/storage/database/group_member.go index c49319649..c272b6ef6 100644 --- a/pkg/common/storage/database/group_member.go +++ b/pkg/common/storage/database/group_member.go @@ -25,6 +25,7 @@ type GroupMember interface { Delete(ctx context.Context, groupID string, userIDs []string) (err error) Update(ctx context.Context, groupID string, userID string, data map[string]any) (err error) UpdateRoleLevel(ctx context.Context, groupID string, userID string, roleLevel int32) error + UpdateUserRoleLevels(ctx context.Context, groupID string, firstUserID string, firstUserRoleLevel int32, secondUserID string, secondUserRoleLevel int32) error FindMemberUserID(ctx context.Context, groupID string) (userIDs []string, err error) Take(ctx context.Context, groupID string, userID string) (groupMember *model.GroupMember, err error) TakeOwner(ctx context.Context, groupID string) (groupMember *model.GroupMember, err error) diff --git a/pkg/common/storage/database/mgo/group_member.go b/pkg/common/storage/database/mgo/group_member.go index 30f8d63b9..3eb93a10e 100644 --- a/pkg/common/storage/database/mgo/group_member.go +++ b/pkg/common/storage/database/mgo/group_member.go @@ -115,11 +115,28 @@ func (g *GroupMemberMgo) Delete(ctx context.Context, groupID string, userIDs []s func (g *GroupMemberMgo) UpdateRoleLevel(ctx context.Context, groupID string, userID string, roleLevel int32) error { return mongoutil.IncrVersion(func() error { - return g.Update(ctx, groupID, userID, bson.M{"role_level": roleLevel}) + return mongoutil.UpdateOne(ctx, g.coll, bson.M{"group_id": groupID, "user_id": userID}, + bson.M{"$set": bson.M{"role_level": roleLevel}}, true) }, func() error { return g.member.IncrVersion(ctx, groupID, []string{userID}, model.VersionStateUpdate) }) } +func (g *GroupMemberMgo) UpdateUserRoleLevels(ctx context.Context, groupID string, firstUserID string, firstUserRoleLevel int32, secondUserID string, secondUserRoleLevel int32) error { + return mongoutil.IncrVersion(func() error { + if err := mongoutil.UpdateOne(ctx, g.coll, bson.M{"group_id": groupID, "user_id": firstUserID}, + bson.M{"$set": bson.M{"role_level": firstUserRoleLevel}}, true); err != nil { + return err + } + if err := mongoutil.UpdateOne(ctx, g.coll, bson.M{"group_id": groupID, "user_id": secondUserID}, + bson.M{"$set": bson.M{"role_level": secondUserRoleLevel}}, true); err != nil { + return err + } + + return nil + }, func() error { + return g.member.IncrVersion(ctx, groupID, []string{firstUserID, secondUserID}, model.VersionStateUpdate) + }) +} func (g *GroupMemberMgo) Update(ctx context.Context, groupID string, userID string, data map[string]any) (err error) { if len(data) == 0 { diff --git a/pkg/common/storage/database/mgo/object.go b/pkg/common/storage/database/mgo/object.go index df4d10ec4..4242fbb53 100644 --- a/pkg/common/storage/database/mgo/object.go +++ b/pkg/common/storage/database/mgo/object.go @@ -16,10 +16,13 @@ package mgo import ( "context" + "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/tools/db/mongoutil" + "github.com/openimsdk/tools/db/pagination" "github.com/openimsdk/tools/errs" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" @@ -68,3 +71,14 @@ func (o *S3Mongo) Take(ctx context.Context, engine string, name string) (*model. func (o *S3Mongo) Delete(ctx context.Context, engine string, name string) error { return mongoutil.DeleteOne(ctx, o.coll, bson.M{"name": name, "engine": engine}) } +func (o *S3Mongo) FindByExpires(ctx context.Context, duration time.Time, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) { + return mongoutil.FindPage[*model.Object](ctx, o.coll, bson.M{ + "create_time": bson.M{"$lt": duration}, + }, pagination) +} +func (o *S3Mongo) FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error) { + return mongoutil.Count(ctx, o.coll, bson.M{ + "key": key, + "create_time": bson.M{"$gt": duration}, + }) +} diff --git a/pkg/common/storage/database/mgo/version_test.go b/pkg/common/storage/database/mgo/version_test.go new file mode 100644 index 000000000..236c61a2c --- /dev/null +++ b/pkg/common/storage/database/mgo/version_test.go @@ -0,0 +1,39 @@ +package mgo + +import ( + "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "testing" + "time" +) + +func Result[V any](val V, err error) V { + if err != nil { + panic(err) + } + return val +} + +func Check(err error) { + if err != nil { + panic(err) + } +} + +func TestName(t *testing.T) { + cli := Result(mongo.Connect(context.Background(), options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second))) + coll := cli.Database("openim_v3").Collection("version_test") + tmp, err := NewVersionLog(coll) + if err != nil { + panic(err) + } + vl := tmp.(*VersionLogMgo) + res, err := vl.writeLogBatch2(context.Background(), "100", []string{"1000", "1001", "1003"}, model.VersionStateInsert, time.Now()) + if err != nil { + t.Log(err) + return + } + t.Logf("%+v", res) +} diff --git a/pkg/common/storage/database/object.go b/pkg/common/storage/database/object.go index 554f71f35..8292006a0 100644 --- a/pkg/common/storage/database/object.go +++ b/pkg/common/storage/database/object.go @@ -16,11 +16,16 @@ package database import ( "context" + "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/tools/db/pagination" ) type ObjectInfo interface { SetObject(ctx context.Context, obj *model.Object) error Take(ctx context.Context, engine string, name string) (*model.Object, error) Delete(ctx context.Context, engine string, name string) error + FindByExpires(ctx context.Context, duration time.Time, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) + FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error) } diff --git a/pkg/rpccache/conversation.go b/pkg/rpccache/conversation.go index 4c00dd1f7..0109f1b1d 100644 --- a/pkg/rpccache/conversation.go +++ b/pkg/rpccache/conversation.go @@ -16,15 +16,19 @@ package rpccache import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/localcache" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" pbconversation "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/redis/go-redis/v9" + "golang.org/x/sync/errgroup" +) + +const ( + conversationWorkerCount = 20 ) func NewConversationLocalCache(client rpcclient.ConversationRpcClient, localCache *config.LocalCache, cli redis.UniversalClient) *ConversationLocalCache { @@ -90,15 +94,33 @@ func (c *ConversationLocalCache) GetSingleConversationRecvMsgOpt(ctx context.Con } func (c *ConversationLocalCache) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*pbconversation.Conversation, error) { - conversations := make([]*pbconversation.Conversation, 0, len(conversationIDs)) + var ( + conversations = make([]*pbconversation.Conversation, 0, len(conversationIDs)) + conversationsChan = make(chan *pbconversation.Conversation, len(conversationIDs)) + ) + + g, ctx := errgroup.WithContext(ctx) + g.SetLimit(conversationWorkerCount) + for _, conversationID := range conversationIDs { - conversation, err := c.GetConversation(ctx, ownerUserID, conversationID) - if err != nil { - if errs.ErrRecordNotFound.Is(err) { - continue + conversationID := conversationID + g.Go(func() error { + conversation, err := c.GetConversation(ctx, ownerUserID, conversationID) + if err != nil { + if errs.ErrRecordNotFound.Is(err) { + return nil + } + return err } - return nil, err - } + conversationsChan <- conversation + return nil + }) + } + if err := g.Wait(); err != nil { + return nil, err + } + close(conversationsChan) + for conversation := range conversationsChan { conversations = append(conversations, conversation) } return conversations, nil diff --git a/pkg/rpcclient/third.go b/pkg/rpcclient/third.go index 4c71dff6a..7cdc60d52 100644 --- a/pkg/rpcclient/third.go +++ b/pkg/rpcclient/third.go @@ -41,3 +41,7 @@ func NewThird(discov discovery.SvcDiscoveryRegistry, rpcRegisterName, grafanaUrl } return &Third{discov: discov, Client: client, conn: conn, GrafanaUrl: grafanaUrl} } +func (t *Third) DeleteOutdatedData(ctx context.Context, expires int64) error { + _, err := t.Client.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: expires}) + return err +}