diff --git a/go.mod b/go.mod index 8eae1edb2..fbeb6eabe 100644 --- a/go.mod +++ b/go.mod @@ -199,3 +199,5 @@ require ( golang.org/x/crypto v0.27.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect ) + +replace github.com/openimsdk/protocol => /Users/chao/Desktop/code/protocol diff --git a/go.sum b/go.sum index e572f0b47..635a85060 100644 --- a/go.sum +++ b/go.sum @@ -319,8 +319,6 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.72-alpha.59 h1:+ycb2+68mLKPIo7VrxF0id/GXP6OqZ2/nBM1YZQr7qY= -github.com/openimsdk/protocol v0.0.72-alpha.59/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M= github.com/openimsdk/tools v0.0.50-alpha.38 h1:AU6/cvDfN4ciIOwAj8IWEwze3DeEp2cHYPgW3y0OlbU= github.com/openimsdk/tools v0.0.50-alpha.38/go.mod h1:/Em/fQH46CuWf60+hcmvZyboGCQpSDEb2MdQ4nmQRAk= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 08d9d93a0..92053931c 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -92,7 +92,7 @@ func Start(ctx context.Context, index int, config *Config) error { return err } seqConversationCache := redis.NewSeqConversationCacheRedis(rdb, seqConversation) - seqUser, err := mgo.NewSeqUserMongo(mgocli.GetDB(), nil) + seqUser, err := mgo.NewSeqUserMongo(mgocli.GetDB()) if err != nil { return err } diff --git a/internal/rpc/conversation/conversation.go b/internal/rpc/conversation/conversation.go index 0b6b656a4..f232322db 100644 --- a/internal/rpc/conversation/conversation.go +++ b/internal/rpc/conversation/conversation.go @@ -16,6 +16,7 @@ package conversation import ( "context" + pbmsg "github.com/openimsdk/protocol/msg" "sort" "time" @@ -437,19 +438,30 @@ func (c *conversationServer) CreateGroupChatConversations(ctx context.Context, r } func (c *conversationServer) SetConversationMaxSeq(ctx context.Context, req *pbconversation.SetConversationMaxSeqReq) (*pbconversation.SetConversationMaxSeqResp, error) { + if _, err := c.msgRpcClient.Client.SetUserConversationMaxSeq(ctx, &pbmsg.SetUserConversationMaxSeqReq{ConversationID: req.ConversationID, OwnerUserID: req.OwnerUserID, MaxSeq: req.MaxSeq}); err != nil { + return nil, err + } if err := c.conversationDatabase.UpdateUsersConversationField(ctx, req.OwnerUserID, req.ConversationID, map[string]any{"max_seq": req.MaxSeq}); err != nil { return nil, err } - + for _, userID := range req.OwnerUserID { + c.conversationNotificationSender.ConversationChangeNotification(ctx, userID, []string{req.ConversationID}) + } return &pbconversation.SetConversationMaxSeqResp{}, nil } func (c *conversationServer) SetConversationMinSeq(ctx context.Context, req *pbconversation.SetConversationMinSeqReq) (*pbconversation.SetConversationMinSeqResp, error) { + if _, err := c.msgRpcClient.Client.SetUserConversationMinSeq(ctx, &pbmsg.SetUserConversationMinSeqReq{ConversationID: req.ConversationID, OwnerUserID: req.OwnerUserID, MinSeq: req.MinSeq}); err != nil { + return nil, err + } if err := c.conversationDatabase.UpdateUsersConversationField(ctx, req.OwnerUserID, req.ConversationID, map[string]any{"min_seq": req.MinSeq}); err != nil { return nil, err } + for _, userID := range req.OwnerUserID { + c.conversationNotificationSender.ConversationChangeNotification(ctx, userID, []string{req.ConversationID}) + } return &pbconversation.SetConversationMinSeqResp{}, nil } diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index ba0c8a42d..9e610df0f 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -964,7 +964,6 @@ func (g *groupServer) deleteMemberAndSetConversationSeq(ctx context.Context, gro if err != nil { return err } - return g.conversationRpcClient.SetConversationMaxSeq(ctx, userIDs, conevrsationID, maxSeq) } diff --git a/internal/rpc/msg/seq.go b/internal/rpc/msg/seq.go index 7f5fa1adb..bd68138fb 100644 --- a/internal/rpc/msg/seq.go +++ b/internal/rpc/msg/seq.go @@ -84,3 +84,21 @@ func (m *msgServer) GetActiveConversation(ctx context.Context, req *pbmsg.GetAct } return &pbmsg.GetActiveConversationResp{Conversations: conversations}, nil } + +func (m *msgServer) SetUserConversationMaxSeq(ctx context.Context, req *pbmsg.SetUserConversationMaxSeqReq) (*pbmsg.SetUserConversationMaxSeqResp, error) { + for _, userID := range req.OwnerUserID { + if err := m.MsgDatabase.SetUserConversationsMaxSeq(ctx, req.ConversationID, userID, req.MaxSeq); err != nil { + return nil, err + } + } + return &pbmsg.SetUserConversationMaxSeqResp{}, nil +} + +func (m *msgServer) SetUserConversationMinSeq(ctx context.Context, req *pbmsg.SetUserConversationMinSeqReq) (*pbmsg.SetUserConversationMinSeqResp, error) { + for _, userID := range req.OwnerUserID { + if err := m.MsgDatabase.SetUserConversationsMinSeq(ctx, req.ConversationID, userID, req.MinSeq); err != nil { + return nil, err + } + } + return &pbmsg.SetUserConversationMinSeqResp{}, nil +} diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index 7eee2905f..758a7cf19 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -16,7 +16,6 @@ package msg import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" @@ -99,14 +98,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg return err } seqConversationCache := redis.NewSeqConversationCacheRedis(rdb, seqConversation) - seqUser, err := mgo.NewSeqUserMongo(mgocli.GetDB(), &mgo.SeqUserHook{ - SetUserMaxSeq: func(ctx context.Context, conversationID string, userID string, seq int64) error { - return conversationClient.SetConversationMaxSeq(ctx, []string{userID}, conversationID, seq) - }, - SetUserMinSeq: func(ctx context.Context, conversationID string, userID string, seq int64) error { - return conversationClient.SetConversationMinSeq(ctx, []string{userID}, conversationID, seq) - }, - }) + seqUser, err := mgo.NewSeqUserMongo(mgocli.GetDB()) if err != nil { return err } diff --git a/internal/rpc/third/log.go b/internal/rpc/third/log.go index 657ea1689..cb4678b34 100644 --- a/internal/rpc/third/log.go +++ b/internal/rpc/third/log.go @@ -55,7 +55,7 @@ func (t *thirdServer) UploadLogs(ctx context.Context, req *third.UploadLogsReq) CreateTime: time.Now(), Url: fileURL.URL, FileName: fileURL.Filename, - SystemType: req.SystemType, + SystemType: req.AppFramework, Version: req.Version, Ex: req.Ex, } diff --git a/pkg/common/storage/controller/msg.go b/pkg/common/storage/controller/msg.go index 85b797c40..464ad7604 100644 --- a/pkg/common/storage/controller/msg.go +++ b/pkg/common/storage/controller/msg.go @@ -97,6 +97,9 @@ type CommonMsgDatabase interface { DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error) GetDocIDs(ctx context.Context) ([]string, error) + + SetUserConversationsMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error + SetUserConversationsMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error } func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser cache.SeqUser, seqConversation cache.SeqConversationCache, kafkaConf *config.Kafka) (CommonMsgDatabase, error) { @@ -702,6 +705,14 @@ func (db *commonMsgDatabase) SetUserConversationsMinSeqs(ctx context.Context, us return db.seqUser.SetUserMinSeqs(ctx, userID, seqs) } +func (db *commonMsgDatabase) SetUserConversationsMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error { + return db.seqUser.SetUserMaxSeq(ctx, conversationID, userID, seq) +} + +func (db *commonMsgDatabase) SetUserConversationsMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error { + return db.seqUser.SetUserMinSeq(ctx, conversationID, userID, seq) +} + func (db *commonMsgDatabase) UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error { return db.seqUser.SetUserReadSeqs(ctx, userID, hasReadSeqs) } diff --git a/pkg/common/storage/database/mgo/seq_user.go b/pkg/common/storage/database/mgo/seq_user.go index 3a5a5cccb..244de3000 100644 --- a/pkg/common/storage/database/mgo/seq_user.go +++ b/pkg/common/storage/database/mgo/seq_user.go @@ -11,14 +11,7 @@ import ( "go.mongodb.org/mongo-driver/mongo/options" ) -type seqUserFunc func(ctx context.Context, conversationID string, userID string, seq int64) error - -type SeqUserHook struct { - SetUserMaxSeq func(ctx context.Context, conversationID string, userID string, seq int64) error - SetUserMinSeq func(ctx context.Context, conversationID string, userID string, seq int64) error -} - -func NewSeqUserMongo(db *mongo.Database, hook *SeqUserHook) (database.SeqUser, error) { +func NewSeqUserMongo(db *mongo.Database) (database.SeqUser, error) { coll := db.Collection(database.SeqUserName) _, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{ Keys: bson.D{ @@ -29,15 +22,11 @@ func NewSeqUserMongo(db *mongo.Database, hook *SeqUserHook) (database.SeqUser, e if err != nil { return nil, err } - if hook == nil { - hook = &SeqUserHook{} - } - return &seqUserMongo{coll: coll, hook: hook}, nil + return &seqUserMongo{coll: coll}, nil } type seqUserMongo struct { coll *mongo.Collection - hook *SeqUserHook } func (s *seqUserMongo) setSeq(ctx context.Context, conversationID string, userID string, seq int64, field string) error { @@ -63,12 +52,12 @@ func (s *seqUserMongo) setSeq(ctx context.Context, conversationID string, userID return mongoutil.UpdateOne(ctx, s.coll, filter, update, false, opt) } -func (s *seqUserMongo) getSeq(ctx context.Context, conversationID string, userID string, field string) (int64, error) { +func (s *seqUserMongo) getSeq(ctx context.Context, conversationID string, userID string, failed string) (int64, error) { filter := map[string]any{ "user_id": userID, "conversation_id": conversationID, } - opt := options.FindOne().SetProjection(bson.M{"_id": 0, field: 1}) + opt := options.FindOne().SetProjection(bson.M{"_id": 0, failed: 1}) seq, err := mongoutil.FindOne[int64](ctx, s.coll, filter, opt) if err == nil { return seq, nil @@ -83,21 +72,8 @@ func (s *seqUserMongo) GetUserMaxSeq(ctx context.Context, conversationID string, return s.getSeq(ctx, conversationID, userID, "max_seq") } -func (s *seqUserMongo) withHook(ctx context.Context, conversationID string, userID string, seq int64, field string, hookFn seqUserFunc) error { - if err := s.setSeq(ctx, conversationID, userID, seq, field); err != nil { - return err - } - if hookFn != nil { - if err := hookFn(ctx, conversationID, userID, seq); err != nil { - return err - } - } - return nil -} - func (s *seqUserMongo) SetUserMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error { - //return s.setSeq(ctx, conversationID, userID, seq, "max_seq") - return s.withHook(ctx, conversationID, userID, seq, "max_seq", s.hook.SetUserMaxSeq) + return s.setSeq(ctx, conversationID, userID, seq, "max_seq") } func (s *seqUserMongo) GetUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) { @@ -105,8 +81,7 @@ func (s *seqUserMongo) GetUserMinSeq(ctx context.Context, conversationID string, } func (s *seqUserMongo) SetUserMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error { - //return s.setSeq(ctx, conversationID, userID, seq, "min_seq") - return s.withHook(ctx, conversationID, userID, seq, "min_seq", s.hook.SetUserMinSeq) + return s.setSeq(ctx, conversationID, userID, seq, "min_seq") } func (s *seqUserMongo) GetUserReadSeq(ctx context.Context, conversationID string, userID string) (int64, error) { diff --git a/tools/seq/internal/main.go b/tools/seq/internal/main.go index 201fca387..2bec5a8f1 100644 --- a/tools/seq/internal/main.go +++ b/tools/seq/internal/main.go @@ -87,7 +87,7 @@ func Main(conf string, del time.Duration) error { if err != nil { return err } - uSeq, err := mgo.NewSeqUserMongo(mgocli.GetDB(), nil) + uSeq, err := mgo.NewSeqUserMongo(mgocli.GetDB()) if err != nil { return err }