From 41d5688de620ee9671b3babd5c589d042c722175 Mon Sep 17 00:00:00 2001 From: icey-yu <119291641+icey-yu@users.noreply.github.com> Date: Fri, 1 Nov 2024 14:49:31 +0800 Subject: [PATCH 1/6] Update login policy (#2822) * fix: login Policy * fix: del login Policy --- go.mod | 2 +- go.sum | 4 +-- internal/msggateway/ws_server.go | 22 +---------------- pkg/common/storage/controller/auth.go | 35 ++++----------------------- 4 files changed, 9 insertions(+), 54 deletions(-) diff --git a/go.mod b/go.mod index 7cd4e4550..96a06729f 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.72-alpha.51 + github.com/openimsdk/protocol v0.0.72-alpha.53 github.com/openimsdk/tools v0.0.50-alpha.16 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 diff --git a/go.sum b/go.sum index 7cc45616f..92a98110b 100644 --- a/go.sum +++ b/go.sum @@ -319,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.72-alpha.51 h1:G5Yjndp/FRyOJWhoQcSF2x2GvYiAIlqN0vjkvjUPycU= -github.com/openimsdk/protocol v0.0.72-alpha.51/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= +github.com/openimsdk/protocol v0.0.72-alpha.53 h1:DMzvDd418GaJJLT2Iw+AX+oNc41DROWErXDkZxB+MMM= +github.com/openimsdk/protocol v0.0.72-alpha.53/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/tools v0.0.50-alpha.16 h1:bC1AQvJMuOHtZm8LZRvN8L5mH1Ws2VYdL+TLTs1iGSc= github.com/openimsdk/tools v0.0.50-alpha.16/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= diff --git a/internal/msggateway/ws_server.go b/internal/msggateway/ws_server.go index b92d7eb44..48e4b5cee 100644 --- a/internal/msggateway/ws_server.go +++ b/internal/msggateway/ws_server.go @@ -327,11 +327,6 @@ func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Clien switch ws.msgGatewayConfig.Share.MultiLogin.Policy { case constant.DefalutNotKick: - case constant.WebAndOther: - if constant.PlatformIDToClass(newClient.PlatformID) == constant.WebPlatformStr { - return - } - fallthrough case constant.PCAndOther: if constant.PlatformIDToClass(newClient.PlatformID) == constant.TerminalPC { return @@ -356,7 +351,7 @@ func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Clien log.ZWarn(newClient.ctx, "InvalidateToken err", err, "userID", newClient.UserID, "platformID", newClient.PlatformID) } - case constant.PcMobileAndWeb: + case constant.AllLoginButSameClassKick: clients, ok := ws.clients.GetAll(newClient.UserID) if !ok { return @@ -370,21 +365,6 @@ func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Clien } } kickTokenFunc(kickClients) - - case constant.SingleTerminalLogin: - clients, ok := ws.clients.GetAll(newClient.UserID) - if !ok { - return - } - var ( - kickClients []*Client - ) - for _, client := range clients { - kickClients = append(kickClients, client) - } - kickTokenFunc(kickClients) - case constant.Customize: - // todo } } diff --git a/pkg/common/storage/controller/auth.go b/pkg/common/storage/controller/auth.go index de8f93462..82d6f9476 100644 --- a/pkg/common/storage/controller/auth.go +++ b/pkg/common/storage/controller/auth.go @@ -172,17 +172,8 @@ func (a *authDatabase) checkToken(ctx context.Context, tokens map[int]map[string kickToken = append(kickToken, ts[len(ts)-1]) } } - case constant.SingleTerminalLogin: - for _, ts := range loginTokenMap { - kickToken = append(kickToken, ts...) - } - case constant.WebAndOther: - unkickTerminal = constant.WebPlatformStr - fallthrough case constant.PCAndOther: - if unkickTerminal == "" { - unkickTerminal = constant.TerminalPC - } + unkickTerminal = constant.TerminalPC if constant.PlatformIDToClass(platformID) != unkickTerminal { for plt, ts := range loginTokenMap { if constant.PlatformIDToClass(plt) != unkickTerminal { @@ -214,17 +205,17 @@ func (a *authDatabase) checkToken(ctx context.Context, tokens map[int]map[string } } } - case constant.PcMobileAndWeb: + case constant.AllLoginButSameClassKick: var ( - reserved = make(map[string]bool) + reserved = make(map[string]struct{}) ) for plt, ts := range loginTokenMap { if constant.PlatformIDToClass(plt) == constant.PlatformIDToClass(platformID) { kickToken = append(kickToken, ts...) } else { - if !reserved[constant.PlatformIDToClass(plt)] { - reserved[constant.PlatformIDToClass(plt)] = true + if _, ok := reserved[constant.PlatformIDToClass(plt)]; !ok { + reserved[constant.PlatformIDToClass(plt)] = struct{}{} kickToken = append(kickToken, ts[:len(ts)-1]...) continue } else { @@ -232,22 +223,6 @@ func (a *authDatabase) checkToken(ctx context.Context, tokens map[int]map[string } } } - - case constant.Customize: - if a.multiLogin.CustomizeLoginNum[platformID] <= 0 { - return nil, nil, errs.New("Do not allow login on this end").Wrap() - } - for plt, ts := range loginTokenMap { - l := len(ts) - if platformID == plt { - l++ - } - // a.multiLogin.CustomizeLoginNum[platformID] must > 0 - limit := min(a.multiLogin.CustomizeLoginNum[plt], a.multiLogin.MaxNumOneEnd) - if l > limit { - kickToken = append(kickToken, ts[:l-limit]...) - } - } default: return nil, nil, errs.New("unknown multiLogin policy").Wrap() } From 1516e9cb3af33634b53b2fb703ed3380c4976ccf Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Mon, 4 Nov 2024 14:50:07 +0800 Subject: [PATCH 2/6] build: implement version file update when release. (#2826) --- .../update-version-file-on-release.yml | 84 +++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 .github/workflows/.github/workflows/update-version-file-on-release.yml diff --git a/.github/workflows/.github/workflows/update-version-file-on-release.yml b/.github/workflows/.github/workflows/update-version-file-on-release.yml new file mode 100644 index 000000000..113537fd9 --- /dev/null +++ b/.github/workflows/.github/workflows/update-version-file-on-release.yml @@ -0,0 +1,84 @@ +name: Update Version File on Release + +on: + release: + types: [created] + +jobs: + update-version: + runs-on: ubuntu-latest + env: + TAG_VERSION: ${{ github.event.release.tag_name }} + steps: + # Step 1: Checkout the original repository's code + - name: Checkout code + uses: actions/checkout@v4 + with: + fetch-depth: 0 + + # Step 2: Set up Git with official account + - name: Set up Git + run: | + git config user.name "github-actions[bot]" + git config user.email "github-actions[bot]@users.noreply.github.com" + + # Step 3: Check and delete existing tag + - name: Check and delete existing tag + run: | + if git rev-parse ${{ env.TAG_VERSION }} >/dev/null 2>&1; then + git tag -d ${{ env.TAG_VERSION }} + git push --delete origin ${{ env.TAG_VERSION }} + fi + + # Step 4: Update version file + - name: Update version file + run: | + echo "${{ env.TAG_VERSION }}" > version/version + + # Step 5: Commit and push changes + - name: Commit and push changes + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + run: | + git add version/version + git commit -m "Update version to ${{ env.TAG_VERSION }}" + git push origin HEAD:${{ github.ref }} + + # Step 6: Create and push tag + - name: Create and push tag + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + run: | + git tag ${{ env.TAG_VERSION }} + git push origin ${{ env.TAG_VERSION }} + + # Step 7: Find and Publish Draft Release + - name: Find and Publish Draft Release + uses: actions/github-script@v6 + with: + github-token: ${{ secrets.GITHUB_TOKEN }} + script: | + // Get the list of releases + const releases = await github.rest.repos.listReleases({ + owner: context.repo.owner, + repo: context.repo.repo + }); + + // Find the draft release where the title and tag_name are the same + const draftRelease = releases.data.find(release => + release.draft && release.name === release.tag_name + ); + + if (draftRelease) { + // Publish the draft release using the release_id + await github.rest.repos.updateRelease({ + owner: context.repo.owner, + repo: context.repo.repo, + release_id: draftRelease.id, // Use release_id + draft: false + }); + + core.info(`Draft Release ${draftRelease.tag_name} published successfully.`); + } else { + core.info("No matching draft release found."); + } \ No newline at end of file From 8d308a41634b312c480a916cd91db233826e3924 Mon Sep 17 00:00:00 2001 From: chao <48119764+withchao@users.noreply.github.com> Date: Mon, 4 Nov 2024 16:40:39 +0800 Subject: [PATCH 3/6] feat: support stream message (#2824) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: GroupApplicationAcceptedNotification * fix: GroupApplicationAcceptedNotification * fix: NotificationUserInfoUpdate * cicd: robot automated Change * fix: component * fix: getConversationInfo * feat: cron task * feat: cron task * feat: cron task * feat: cron task * feat: cron task * fix: minio config url recognition error * update gomake version * update gomake version * fix: seq conversion bug * fix: redis pipe exec * fix: ImportFriends * fix: A large number of logs keysAndValues ​​length is not even * feat: mark read aggregate write * feat: online status supports redis cluster * feat: online status supports redis cluster * feat: online status supports redis cluster * merge * merge * read seq is written to mongo * read seq is written to mongo * fix: invitation to join group notification * fix: friend op_user_id * feat: optimizing asynchronous context * feat: optimizing memamq size * feat: add GetSeqMessage * feat: GroupApplicationAgreeMemberEnterNotification * feat: GroupApplicationAgreeMemberEnterNotification * feat: go.mod * feat: go.mod * feat: join group notification and get seq * feat: join group notification and get seq * feat: avoid pulling messages from sessions with a large number of max seq values of 0 * feat: API supports gzip * go.mod * fix: nil pointer error on close * fix: listen error * fix: listen error * update go.mod * feat: add log * fix: token parse token value * fix: GetMsgBySeqs boundary issues * fix: sn_ not sort * fix: sn_ not sort * fix: sn_ not sort * fix: jssdk add * fix: jssdk support * fix: jssdk support * fix: jssdk support * fix: the message I sent is not set to read seq in mongodb * fix: cannot modify group member avatars * fix: MemberEnterNotification * fix: MemberEnterNotification * fix: MsgData status * feat: stream msg * feat: support stream messages --------- Co-authored-by: withchao --- go.mod | 2 +- go.sum | 4 +- internal/api/msg.go | 10 ++ internal/api/router.go | 2 + internal/rpc/msg/notification.go | 4 + internal/rpc/msg/send.go | 5 + internal/rpc/msg/server.go | 10 +- internal/rpc/msg/stream_msg.go | 114 +++++++++++++ pkg/apistruct/msg.go | 5 + pkg/common/storage/controller/stream_msg.go | 34 ++++ pkg/common/storage/database/mgo/stream_msg.go | 60 +++++++ pkg/common/storage/database/name.go | 1 + pkg/common/storage/database/stream_msg.go | 13 ++ pkg/common/storage/model/stream_msg.go | 21 +++ tools/streammsg/main.go | 161 ++++++++++++++++++ 15 files changed, 441 insertions(+), 5 deletions(-) create mode 100644 internal/rpc/msg/stream_msg.go create mode 100644 pkg/common/storage/controller/stream_msg.go create mode 100644 pkg/common/storage/database/mgo/stream_msg.go create mode 100644 pkg/common/storage/database/stream_msg.go create mode 100644 pkg/common/storage/model/stream_msg.go create mode 100644 tools/streammsg/main.go diff --git a/go.mod b/go.mod index 96a06729f..051beb403 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.72-alpha.53 + github.com/openimsdk/protocol v0.0.72-alpha.54 github.com/openimsdk/tools v0.0.50-alpha.16 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 diff --git a/go.sum b/go.sum index 92a98110b..816c82094 100644 --- a/go.sum +++ b/go.sum @@ -319,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.72-alpha.53 h1:DMzvDd418GaJJLT2Iw+AX+oNc41DROWErXDkZxB+MMM= -github.com/openimsdk/protocol v0.0.72-alpha.53/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= +github.com/openimsdk/protocol v0.0.72-alpha.54 h1:opato7N4QjjRq/SHD54bDSVBpOEEDp1VLWVk5Os2A9s= +github.com/openimsdk/protocol v0.0.72-alpha.54/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/tools v0.0.50-alpha.16 h1:bC1AQvJMuOHtZm8LZRvN8L5mH1Ws2VYdL+TLTs1iGSc= github.com/openimsdk/tools v0.0.50-alpha.16/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= diff --git a/internal/api/msg.go b/internal/api/msg.go index bf7cb83a4..ce94b5f4f 100644 --- a/internal/api/msg.go +++ b/internal/api/msg.go @@ -173,6 +173,8 @@ func (m *MessageApi) getSendMsgReq(c *gin.Context, req apistruct.SendMsg) (sendM data = apistruct.AtElem{} case constant.Custom: data = apistruct.CustomElem{} + case constant.Stream: + data = apistruct.StreamMsgElem{} case constant.OANotification: data = apistruct.OANotificationElem{} req.SessionType = constant.NotificationChatType @@ -373,3 +375,11 @@ func (m *MessageApi) SearchMsg(c *gin.Context) { func (m *MessageApi) GetServerTime(c *gin.Context) { a2r.Call(msg.MsgClient.GetServerTime, m.Client, c) } + +func (m *MessageApi) GetStreamMsg(c *gin.Context) { + a2r.Call(msg.MsgClient.GetStreamMsg, m.Client, c) +} + +func (m *MessageApi) AppendStreamMsg(c *gin.Context) { + a2r.Call(msg.MsgClient.AppendStreamMsg, m.Client, c) +} diff --git a/internal/api/router.go b/internal/api/router.go index 560516d30..4ac301b07 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -222,6 +222,8 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En msgGroup.POST("/batch_send_msg", m.BatchSendMsg) msgGroup.POST("/check_msg_is_send_success", m.CheckMsgIsSendSuccess) msgGroup.POST("/get_server_time", m.GetServerTime) + msgGroup.POST("/get_stream_msg", m.GetStreamMsg) + msgGroup.POST("/append_stream_msg", m.AppendStreamMsg) } // Conversation conversationGroup := r.Group("/conversation") diff --git a/internal/rpc/msg/notification.go b/internal/rpc/msg/notification.go index 3b13676bf..26e3c7f46 100644 --- a/internal/rpc/msg/notification.go +++ b/internal/rpc/msg/notification.go @@ -48,3 +48,7 @@ func (m *MsgNotificationSender) MarkAsReadNotification(ctx context.Context, conv } m.NotificationWithSessionType(ctx, sendID, recvID, constant.HasReadReceipt, sessionType, tips) } + +func (m *MsgNotificationSender) StreamMsgNotification(ctx context.Context, sendID string, recvID string, sessionType int32, tips *sdkws.StreamMsgTips) { + m.NotificationWithSessionType(ctx, sendID, recvID, constant.StreamMsgNotification, sessionType, tips) +} diff --git a/internal/rpc/msg/send.go b/internal/rpc/msg/send.go index 2c3f8c0a3..4762f24de 100644 --- a/internal/rpc/msg/send.go +++ b/internal/rpc/msg/send.go @@ -34,6 +34,11 @@ import ( func (m *msgServer) SendMsg(ctx context.Context, req *pbmsg.SendMsgReq) (*pbmsg.SendMsgResp, error) { if req.MsgData != nil { m.encapsulateMsgData(req.MsgData) + if req.MsgData.ContentType == constant.Stream { + if err := m.handlerStreamMsg(ctx, req.MsgData); err != nil { + return nil, err + } + } switch req.MsgData.SessionType { case constant.SingleChatType: return m.sendMsgSingleChat(ctx, req) diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index 91f41f1b1..bf8781747 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -42,8 +42,9 @@ type ( // MsgServer encapsulates dependencies required for message handling. msgServer struct { - RegisterCenter discovery.SvcDiscoveryRegistry // Service discovery registry for service registration. - MsgDatabase controller.CommonMsgDatabase // Interface for message database operations. + RegisterCenter discovery.SvcDiscoveryRegistry // Service discovery registry for service registration. + MsgDatabase controller.CommonMsgDatabase // Interface for message database operations. + StreamMsgDatabase controller.StreamMsgDatabase Conversation *rpcclient.ConversationRpcClient // RPC client for conversation service. UserLocalCache *rpccache.UserLocalCache // Local cache for user data. FriendLocalCache *rpccache.FriendLocalCache // Local cache for friend data. @@ -101,6 +102,10 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg if err != nil { return err } + streamMsg, err := mgo.NewStreamMsgMongo(mgocli.GetDB()) + if err != nil { + return err + } seqUserCache := redis.NewSeqUserCacheRedis(rdb, seqUser) msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqUserCache, seqConversationCache, &config.KafkaConfig) if err != nil { @@ -109,6 +114,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg s := &msgServer{ Conversation: &conversationClient, MsgDatabase: msgDatabase, + StreamMsgDatabase: controller.NewStreamMsgDatabase(streamMsg), RegisterCenter: client, UserLocalCache: rpccache.NewUserLocalCache(userRpcClient, &config.LocalCacheConfig, rdb), GroupLocalCache: rpccache.NewGroupLocalCache(groupRpcClient, &config.LocalCacheConfig, rdb), diff --git a/internal/rpc/msg/stream_msg.go b/internal/rpc/msg/stream_msg.go new file mode 100644 index 000000000..5db2aad48 --- /dev/null +++ b/internal/rpc/msg/stream_msg.go @@ -0,0 +1,114 @@ +package msg + +import ( + "context" + "fmt" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" + "github.com/openimsdk/protocol/constant" + "github.com/openimsdk/protocol/msg" + "github.com/openimsdk/protocol/sdkws" + "github.com/openimsdk/tools/errs" + "time" +) + +const StreamDeadlineTime = time.Second * 60 * 10 + +func (m *msgServer) handlerStreamMsg(ctx context.Context, msgData *sdkws.MsgData) error { + now := time.Now() + val := &model.StreamMsg{ + ClientMsgID: msgData.ClientMsgID, + ConversationID: msgprocessor.GetConversationIDByMsg(msgData), + UserID: msgData.SendID, + CreateTime: now, + DeadlineTime: now.Add(StreamDeadlineTime), + } + return m.StreamMsgDatabase.CreateStreamMsg(ctx, val) +} + +func (m *msgServer) getStreamMsg(ctx context.Context, clientMsgID string) (*model.StreamMsg, error) { + res, err := m.StreamMsgDatabase.GetStreamMsg(ctx, clientMsgID) + if err != nil { + return nil, err + } + now := time.Now() + if !res.End && res.DeadlineTime.Before(now) { + res.End = true + res.DeadlineTime = now + _ = m.StreamMsgDatabase.AppendStreamMsg(ctx, res.ClientMsgID, 0, nil, true, now) + } + return res, nil +} + +func (m *msgServer) AppendStreamMsg(ctx context.Context, req *msg.AppendStreamMsgReq) (*msg.AppendStreamMsgResp, error) { + res, err := m.getStreamMsg(ctx, req.ClientMsgID) + if err != nil { + return nil, err + } + if res.End { + return nil, errs.ErrNoPermission.WrapMsg("stream msg is end") + } + if len(res.Packets) < int(req.StartIndex) { + return nil, errs.ErrNoPermission.WrapMsg("start index is invalid") + } + if val := len(res.Packets) - int(req.StartIndex); val > 0 { + exist := res.Packets[int(req.StartIndex):] + for i, s := range exist { + if len(req.Packets) == 0 { + break + } + if s != req.Packets[i] { + return nil, errs.ErrNoPermission.WrapMsg(fmt.Sprintf("packet %d has been written and is inconsistent", i)) + } + req.StartIndex++ + req.Packets = req.Packets[1:] + } + } + if len(req.Packets) == 0 && res.End == req.End { + return &msg.AppendStreamMsgResp{}, nil + } + deadlineTime := time.Now().Add(StreamDeadlineTime) + if err := m.StreamMsgDatabase.AppendStreamMsg(ctx, req.ClientMsgID, int(req.StartIndex), req.Packets, req.End, deadlineTime); err != nil { + return nil, err + } + conversation, err := m.Conversation.GetConversation(ctx, res.UserID, res.ConversationID) + if err != nil { + return nil, err + } + tips := &sdkws.StreamMsgTips{ + ConversationID: res.ConversationID, + ClientMsgID: res.ClientMsgID, + StartIndex: req.StartIndex, + Packets: req.Packets, + End: req.End, + } + var ( + recvID string + sessionType int32 + ) + if conversation.GroupID == "" { + sessionType = constant.SingleChatType + recvID = conversation.UserID + } else { + sessionType = constant.ReadGroupChatType + recvID = conversation.GroupID + } + m.msgNotificationSender.StreamMsgNotification(ctx, res.UserID, recvID, sessionType, tips) + return &msg.AppendStreamMsgResp{}, nil +} + +func (m *msgServer) GetStreamMsg(ctx context.Context, req *msg.GetStreamMsgReq) (*msg.GetStreamMsgResp, error) { + res, err := m.getStreamMsg(ctx, req.ClientMsgID) + if err != nil { + return nil, err + } + return &msg.GetStreamMsgResp{ + ClientMsgID: res.ClientMsgID, + ConversationID: res.ConversationID, + UserID: res.UserID, + Packets: res.Packets, + End: res.End, + CreateTime: res.CreateTime.UnixMilli(), + DeadlineTime: res.DeadlineTime.UnixMilli(), + }, nil +} diff --git a/pkg/apistruct/msg.go b/pkg/apistruct/msg.go index dc20b5104..dda3ff317 100644 --- a/pkg/apistruct/msg.go +++ b/pkg/apistruct/msg.go @@ -81,6 +81,11 @@ type TextElem struct { Content string `json:"content" validate:"required"` } +type StreamMsgElem struct { + Type string `mapstructure:"type" validate:"required"` + Content string `mapstructure:"content" validate:"required"` +} + type RevokeElem struct { RevokeMsgClientID string `mapstructure:"revokeMsgClientID" validate:"required"` } diff --git a/pkg/common/storage/controller/stream_msg.go b/pkg/common/storage/controller/stream_msg.go new file mode 100644 index 000000000..3409ccd93 --- /dev/null +++ b/pkg/common/storage/controller/stream_msg.go @@ -0,0 +1,34 @@ +package controller + +import ( + "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "time" +) + +type StreamMsgDatabase interface { + CreateStreamMsg(ctx context.Context, model *model.StreamMsg) error + AppendStreamMsg(ctx context.Context, clientMsgID string, startIndex int, packets []string, end bool, deadlineTime time.Time) error + GetStreamMsg(ctx context.Context, clientMsgID string) (*model.StreamMsg, error) +} + +func NewStreamMsgDatabase(db database.StreamMsg) StreamMsgDatabase { + return &streamMsgDatabase{db: db} +} + +type streamMsgDatabase struct { + db database.StreamMsg +} + +func (m *streamMsgDatabase) CreateStreamMsg(ctx context.Context, model *model.StreamMsg) error { + return m.db.CreateStreamMsg(ctx, model) +} + +func (m *streamMsgDatabase) AppendStreamMsg(ctx context.Context, clientMsgID string, startIndex int, packets []string, end bool, deadlineTime time.Time) error { + return m.db.AppendStreamMsg(ctx, clientMsgID, startIndex, packets, end, deadlineTime) +} + +func (m *streamMsgDatabase) GetStreamMsg(ctx context.Context, clientMsgID string) (*model.StreamMsg, error) { + return m.db.GetStreamMsg(ctx, clientMsgID) +} diff --git a/pkg/common/storage/database/mgo/stream_msg.go b/pkg/common/storage/database/mgo/stream_msg.go new file mode 100644 index 000000000..c57798daa --- /dev/null +++ b/pkg/common/storage/database/mgo/stream_msg.go @@ -0,0 +1,60 @@ +package mgo + +import ( + "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/tools/db/mongoutil" + "github.com/openimsdk/tools/errs" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "time" +) + +func NewStreamMsgMongo(db *mongo.Database) (*StreamMsgMongo, error) { + coll := db.Collection(database.StreamMsgName) + _, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{ + Keys: bson.D{ + {Key: "client_msg_id", Value: 1}, + }, + Options: options.Index().SetUnique(true), + }) + if err != nil { + return nil, errs.Wrap(err) + } + return &StreamMsgMongo{coll: coll}, nil +} + +type StreamMsgMongo struct { + coll *mongo.Collection +} + +func (m *StreamMsgMongo) CreateStreamMsg(ctx context.Context, val *model.StreamMsg) error { + if val.Packets == nil { + val.Packets = []string{} + } + return mongoutil.InsertMany(ctx, m.coll, []*model.StreamMsg{val}) +} + +func (m *StreamMsgMongo) AppendStreamMsg(ctx context.Context, clientMsgID string, startIndex int, packets []string, end bool, deadlineTime time.Time) error { + update := bson.M{ + "$set": bson.M{ + "end": end, + "deadline_time": deadlineTime, + }, + } + if len(packets) > 0 { + update["$push"] = bson.M{ + "packets": bson.M{ + "$each": packets, + "$position": startIndex, + }, + } + } + return mongoutil.UpdateOne(ctx, m.coll, bson.M{"client_msg_id": clientMsgID, "end": false}, update, true) +} + +func (m *StreamMsgMongo) GetStreamMsg(ctx context.Context, clientMsgID string) (*model.StreamMsg, error) { + return mongoutil.FindOne[*model.StreamMsg](ctx, m.coll, bson.M{"client_msg_id": clientMsgID}) +} diff --git a/pkg/common/storage/database/name.go b/pkg/common/storage/database/name.go index 748bd844d..9742f933f 100644 --- a/pkg/common/storage/database/name.go +++ b/pkg/common/storage/database/name.go @@ -17,4 +17,5 @@ const ( UserName = "user" SeqConversationName = "seq" SeqUserName = "seq_user" + StreamMsgName = "stream_msg" ) diff --git a/pkg/common/storage/database/stream_msg.go b/pkg/common/storage/database/stream_msg.go new file mode 100644 index 000000000..e83fffbaa --- /dev/null +++ b/pkg/common/storage/database/stream_msg.go @@ -0,0 +1,13 @@ +package database + +import ( + "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "time" +) + +type StreamMsg interface { + CreateStreamMsg(ctx context.Context, model *model.StreamMsg) error + AppendStreamMsg(ctx context.Context, clientMsgID string, startIndex int, packets []string, end bool, deadlineTime time.Time) error + GetStreamMsg(ctx context.Context, clientMsgID string) (*model.StreamMsg, error) +} diff --git a/pkg/common/storage/model/stream_msg.go b/pkg/common/storage/model/stream_msg.go new file mode 100644 index 000000000..c040426a4 --- /dev/null +++ b/pkg/common/storage/model/stream_msg.go @@ -0,0 +1,21 @@ +package model + +import ( + "time" +) + +const ( + StreamMsgStatusWait = 0 + StreamMsgStatusDone = 1 + StreamMsgStatusFail = 2 +) + +type StreamMsg struct { + ClientMsgID string `bson:"client_msg_id"` + ConversationID string `bson:"conversation_id"` + UserID string `bson:"user_id"` + Packets []string `bson:"packets"` + End bool `bson:"end"` + CreateTime time.Time `bson:"create_time"` + DeadlineTime time.Time `bson:"deadline_time"` +} diff --git a/tools/streammsg/main.go b/tools/streammsg/main.go new file mode 100644 index 000000000..bb567e233 --- /dev/null +++ b/tools/streammsg/main.go @@ -0,0 +1,161 @@ +package main + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "github.com/gin-gonic/gin" + "github.com/google/uuid" + "github.com/openimsdk/open-im-server/v3/pkg/apistruct" + cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" + "github.com/openimsdk/protocol/auth" + "github.com/openimsdk/protocol/constant" + "github.com/openimsdk/protocol/msg" + "github.com/openimsdk/tools/apiresp" + "github.com/openimsdk/tools/errs" + "io" + "net/http" + "strings" + "time" +) + +const ( + getAdminToken = "/auth/get_admin_token" + sendMsgApi = "/msg/send_msg" + appendStreamMsg = "/msg/append_stream_msg" +) + +var ( + ApiAddr = "http://127.0.0.1:10002" + Token string +) + +func ApiCall[R any](api string, req any) (*R, error) { + data, err := json.Marshal(req) + if err != nil { + return nil, err + } + ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) + defer cancel() + request, err := http.NewRequestWithContext(ctx, http.MethodPost, ApiAddr+api, bytes.NewBuffer(data)) + if err != nil { + return nil, err + } + if Token != "" { + request.Header.Set("token", Token) + } + request.Header.Set(constant.OperationID, uuid.New().String()) + response, err := http.DefaultClient.Do(request) + if err != nil { + return nil, err + } + defer response.Body.Close() + var resp R + apiResponse := apiresp.ApiResponse{ + Data: &resp, + } + if err := json.NewDecoder(response.Body).Decode(&apiResponse); err != nil { + return nil, err + } + if apiResponse.ErrCode != 0 { + return nil, errs.NewCodeError(apiResponse.ErrCode, apiResponse.ErrMsg) + } + return &resp, nil +} + +func main() { + resp, err := ApiCall[auth.GetAdminTokenResp](getAdminToken, &auth.GetAdminTokenReq{ + Secret: "openIM123", + UserID: "imAdmin", + }) + if err != nil { + fmt.Println("get admin token failed", err) + return + } + Token = resp.Token + g := gin.Default() + g.POST("/callbackExample/callbackAfterSendSingleMsgCommand", toGin(handlerUserMsg)) + if err := g.Run(":10006"); err != nil { + panic(err) + } +} + +func toGin[R any](fn func(c *gin.Context, req *R) error) gin.HandlerFunc { + return func(c *gin.Context) { + body, err := io.ReadAll(c.Request.Body) + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + fmt.Printf("HTTP %s %s %s\n", c.Request.Method, c.Request.URL, body) + var req R + if err := json.Unmarshal(body, &req); err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + if err := fn(c, &req); err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + c.String(http.StatusOK, "{}") + } +} + +func handlerUserMsg(c *gin.Context, req *cbapi.CallbackAfterSendSingleMsgReq) error { + if req.ContentType != constant.Text { + return nil + } + if !strings.Contains(req.Content, "stream") { + return nil + } + apiReq := apistruct.SendMsgReq{ + RecvID: req.SendID, + SendMsg: apistruct.SendMsg{ + SendID: req.RecvID, + SenderNickname: "xxx", + SenderFaceURL: "", + SenderPlatformID: constant.AdminPlatformID, + ContentType: constant.Stream, + SessionType: req.SessionType, + SendTime: time.Now().UnixMilli(), + Content: map[string]any{ + "type": "xxx", + "content": "server test stream msg", + }, + }, + } + go func() { + if err := doPushStreamMsg(&apiReq); err != nil { + fmt.Println("doPushStreamMsg failed", err) + return + } + fmt.Println("doPushStreamMsg success") + }() + return nil +} + +func doPushStreamMsg(sendReq *apistruct.SendMsgReq) error { + resp, err := ApiCall[msg.SendMsgResp](sendMsgApi, sendReq) + if err != nil { + return err + } + const num = 5 + for i := 1; i <= num; i++ { + _, err := ApiCall[msg.AppendStreamMsgResp](appendStreamMsg, &msg.AppendStreamMsgReq{ + ClientMsgID: resp.ClientMsgID, + StartIndex: int64(i - 1), + Packets: []string{ + fmt.Sprintf("stream_msg_packet_%03d", i), + }, + End: i == num, + }) + if err != nil { + fmt.Println("append stream msg failed", "clientMsgID", resp.ClientMsgID, "index", fmt.Sprintf("%d/%d", i, num), "error", err) + return err + } + fmt.Println("append stream msg success", "clientMsgID", resp.ClientMsgID, "index", fmt.Sprintf("%d/%d", i, num)) + time.Sleep(time.Second * 10) + } + return nil +} From 3ebd1bbaa4d8b72667dea395a9df2428587bbc48 Mon Sep 17 00:00:00 2001 From: icey-yu <119291641+icey-yu@users.noreply.github.com> Date: Tue, 5 Nov 2024 15:33:15 +0800 Subject: [PATCH 4/6] fix: write msg to redis (#2836) --- pkg/common/storage/cache/redis/msg.go | 14 -------------- pkg/common/storage/controller/msg.go | 10 ++++++++++ 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/pkg/common/storage/cache/redis/msg.go b/pkg/common/storage/cache/redis/msg.go index 30f367bb7..b04bc5c35 100644 --- a/pkg/common/storage/cache/redis/msg.go +++ b/pkg/common/storage/cache/redis/msg.go @@ -1,17 +1,3 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - package redis import ( diff --git a/pkg/common/storage/controller/msg.go b/pkg/common/storage/controller/msg.go index d579069b6..90b479064 100644 --- a/pkg/common/storage/controller/msg.go +++ b/pkg/common/storage/controller/msg.go @@ -443,6 +443,11 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin return 0, 0, nil, err } successMsgs = append(mongoMsgs, successMsgs...) + + _, err = db.msg.SetMessagesToCache(ctx, conversationID, mongoMsgs) + if err != nil { + return 0, 0, nil, err + } } return minSeq, maxSeq, successMsgs, nil @@ -500,6 +505,11 @@ func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, co } successMsgs = append(successMsgs, mongoMsgs...) + + _, err = db.msg.SetMessagesToCache(ctx, conversationID, mongoMsgs) + if err != nil { + return 0, 0, nil, err + } } return minSeq, maxSeq, successMsgs, nil } From ad8ae5f5f0faf643ba022991f964fe31544c3979 Mon Sep 17 00:00:00 2001 From: icey-yu <119291641+icey-yu@users.noreply.github.com> Date: Thu, 7 Nov 2024 14:59:31 +0800 Subject: [PATCH 5/6] fix: get group return repeated result (#2842) --- pkg/common/storage/database/mgo/group.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/common/storage/database/mgo/group.go b/pkg/common/storage/database/mgo/group.go index 3be7883af..620269b43 100644 --- a/pkg/common/storage/database/mgo/group.go +++ b/pkg/common/storage/database/mgo/group.go @@ -76,7 +76,7 @@ func (g *GroupMgo) Take(ctx context.Context, groupID string) (group *model.Group func (g *GroupMgo) Search(ctx context.Context, keyword string, pagination pagination.Pagination) (total int64, groups []*model.Group, err error) { // Define the sorting options - opts := options.Find().SetSort(bson.D{{Key: "created_at", Value: -1}}) + opts := options.Find().SetSort(bson.D{{Key: "create_time", Value: -1}}) // Perform the search with pagination and sorting return mongoutil.FindPage[*model.Group](ctx, g.coll, bson.M{ From 68698961f18a2aac751d5422f5924f7a637d7aff Mon Sep 17 00:00:00 2001 From: icey-yu <119291641+icey-yu@users.noreply.github.com> Date: Thu, 7 Nov 2024 18:11:43 +0800 Subject: [PATCH 6/6] fix: SetConversations can update new conversation (#2838) --- internal/rpc/conversation/conversation.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/internal/rpc/conversation/conversation.go b/internal/rpc/conversation/conversation.go index 6f6ca1f67..0c8a6fd85 100644 --- a/internal/rpc/conversation/conversation.go +++ b/internal/rpc/conversation/conversation.go @@ -261,27 +261,35 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver setConversationFieldsFunc := func() { if req.Conversation.RecvMsgOpt != nil { + conversation.RecvMsgOpt = req.Conversation.RecvMsgOpt.Value m["recv_msg_opt"] = req.Conversation.RecvMsgOpt.Value } if req.Conversation.AttachedInfo != nil { + conversation.AttachedInfo = req.Conversation.AttachedInfo.Value m["attached_info"] = req.Conversation.AttachedInfo.Value } if req.Conversation.Ex != nil { + conversation.Ex = req.Conversation.Ex.Value m["ex"] = req.Conversation.Ex.Value } if req.Conversation.IsPinned != nil { + conversation.IsPinned = req.Conversation.IsPinned.Value m["is_pinned"] = req.Conversation.IsPinned.Value } if req.Conversation.GroupAtType != nil { + conversation.GroupAtType = req.Conversation.GroupAtType.Value m["group_at_type"] = req.Conversation.GroupAtType.Value } if req.Conversation.MsgDestructTime != nil { + conversation.MsgDestructTime = req.Conversation.MsgDestructTime.Value m["msg_destruct_time"] = req.Conversation.MsgDestructTime.Value } if req.Conversation.IsMsgDestruct != nil { + conversation.IsMsgDestruct = req.Conversation.IsMsgDestruct.Value m["is_msg_destruct"] = req.Conversation.IsMsgDestruct.Value } if req.Conversation.BurnDuration != nil { + conversation.BurnDuration = req.Conversation.BurnDuration.Value m["burn_duration"] = req.Conversation.BurnDuration.Value } }