From d20a95bb0867d964f45e38f7a659e80f731414e2 Mon Sep 17 00:00:00 2001 From: OpenIM-Robot Date: Thu, 13 Feb 2025 14:00:48 +0800 Subject: [PATCH 1/9] fix: the source message of the reference is withdrawn, and the referenced message is deleted (#3137) (#3140) * pb * fix: Modifying other fields while setting IsPrivateChat does not take effect * fix: quote message error revoke * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * upgrading pkg tools * fix * fix * optimize log output * feat: support GetLastMessage * feat: support GetLastMessage * feat: s3 switch * feat: s3 switch * fix: GetUsersOnline * feat: SendBusinessNotification supported configuration parameters * feat: SendBusinessNotification supported configuration parameters * feat: SendBusinessNotification supported configuration parameters * feat: seq conversion failed without exiting * fix: DeleteDoc crash * fix: fill send time * fix: fill send time * fix: crash caused by withdrawing messages from users who have left the group * fix: user msg timestamp * seq read config * seq read config * fix: the source message of the reference is withdrawn, and the referenced message is deleted Co-authored-by: chao <48119764+withchao@users.noreply.github.com> --- internal/msgtransfer/online_msg_to_mongo_handler.go | 12 ++++++++---- pkg/common/storage/controller/msg.go | 2 +- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index d8836d54e..8405be7fe 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -73,10 +73,14 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont } else { prommetrics.MsgInsertMongoSuccessCounter.Inc() } - var seqs []int64 - for _, msg := range msgFromMQ.MsgData { - seqs = append(seqs, msg.Seq) - } + //var seqs []int64 + //for _, msg := range msgFromMQ.MsgData { + // seqs = append(seqs, msg.Seq) + //} + //if err := mc.msgTransferDatabase.DeleteMessagesFromCache(ctx, msgFromMQ.ConversationID, seqs); err != nil { + // log.ZError(ctx, "remove cache msg from redis err", err, "msg", + // msgFromMQ.MsgData, "conversationID", msgFromMQ.ConversationID) + //} } func (*OnlineHistoryMongoConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } diff --git a/pkg/common/storage/controller/msg.go b/pkg/common/storage/controller/msg.go index b92f9b510..0069dc7cc 100644 --- a/pkg/common/storage/controller/msg.go +++ b/pkg/common/storage/controller/msg.go @@ -310,7 +310,7 @@ func (db *commonMsgDatabase) handlerDBMsg(ctx context.Context, cache map[int64][ log.ZError(ctx, "json.Unmarshal", err) return } - if quoteMsg.QuoteMessage == nil || quoteMsg.QuoteMessage.Content == "" { + if quoteMsg.QuoteMessage == nil { return } if quoteMsg.QuoteMessage.Content == "e30=" { From aad2d01681d13671c6981e12b43acb7af2500bf9 Mon Sep 17 00:00:00 2001 From: OpenIM-Robot Date: Tue, 25 Feb 2025 20:17:56 +0800 Subject: [PATCH 2/9] fix: Offline push does not have a badge && Android offline push (#3146) (#3149) * fix: offline push can display badge * feat: strategy * feat: log * feat: log * chore: offlinepush * fix: offlinepush * fix: log Co-authored-by: icey-yu <119291641+icey-yu@users.noreply.github.com> --- internal/push/offlinepush/fcm/push.go | 8 +++-- internal/push/offlinepush/getui/body.go | 38 ++++++++++++++++++-- internal/push/offlinepush/getui/push.go | 8 +++-- pkg/common/storage/cache/redis/lua_script.go | 4 ++- 4 files changed, 49 insertions(+), 9 deletions(-) diff --git a/internal/push/offlinepush/fcm/push.go b/internal/push/offlinepush/fcm/push.go index 6e8355af3..463b72759 100644 --- a/internal/push/offlinepush/fcm/push.go +++ b/internal/push/offlinepush/fcm/push.go @@ -16,12 +16,14 @@ package fcm import ( "context" + "errors" "fmt" - "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options" - "github.com/openimsdk/tools/utils/httputil" "path/filepath" "strings" + "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options" + "github.com/openimsdk/tools/utils/httputil" + firebase "firebase.google.com/go/v4" "firebase.google.com/go/v4/messaging" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -133,7 +135,7 @@ func (f *Fcm) Push(ctx context.Context, userIDs []string, title, content string, unreadCountSum, err := f.cache.GetUserBadgeUnreadCountSum(ctx, userID) if err == nil && unreadCountSum != 0 { apns.Payload.Aps.Badge = &unreadCountSum - } else if err == redis.Nil || unreadCountSum == 0 { + } else if errors.Is(err, redis.Nil) || unreadCountSum == 0 { zero := 1 apns.Payload.Aps.Badge = &zero } else { diff --git a/internal/push/offlinepush/getui/body.go b/internal/push/offlinepush/getui/body.go index a96ff4efc..2f2469b4f 100644 --- a/internal/push/offlinepush/getui/body.go +++ b/internal/push/offlinepush/getui/body.go @@ -18,6 +18,16 @@ import ( "fmt" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/tools/utils/datautil" +) + +var ( + incOne = datautil.ToPtr("+1") + addNum = "1" + defaultStrategy = strategy{ + Default: 1, + } + msgCategory = "CATEGORY_MESSAGE" ) type Resp struct { @@ -58,7 +68,24 @@ type TaskResp struct { } type Settings struct { - TTL *int64 `json:"ttl"` + TTL *int64 `json:"ttl"` + Strategy strategy `json:"strategy"` +} + +type strategy struct { + Default int64 `json:"default"` + //IOS int64 `json:"ios"` + //St int64 `json:"st"` + //Hw int64 `json:"hw"` + //Ho int64 `json:"ho"` + //XM int64 `json:"xm"` + //XMG int64 `json:"xmg"` + //VV int64 `json:"vv"` + //Op int64 `json:"op"` + //OpG int64 `json:"opg"` + //MZ int64 `json:"mz"` + //HosHw int64 `json:"hoshw"` + //WX int64 `json:"wx"` } type Audience struct { @@ -112,6 +139,8 @@ type Notification struct { ChannelID string `json:"channelID"` ChannelName string `json:"ChannelName"` ClickType string `json:"click_type"` + BadgeAddNum string `json:"badge_add_num"` + Category string `json:"category"` } type Options struct { @@ -120,6 +149,7 @@ type Options struct { ChannelID string `json:"/message/android/notification/channel_id"` Sound string `json:"/message/android/notification/sound"` Importance string `json:"/message/android/notification/importance"` + Category string `json:"/message/android/category"` } `json:"HW"` XM struct { ChannelID string `json:"/extra.channel_id"` @@ -140,6 +170,8 @@ func newPushReq(pushConf *config.Push, title, content string) PushReq { ClickType: "startapp", ChannelID: pushConf.GeTui.ChannelID, ChannelName: pushConf.GeTui.ChannelName, + BadgeAddNum: addNum, + Category: msgCategory, }}} return pushReq } @@ -156,6 +188,7 @@ func (pushReq *PushReq) setPushChannel(title string, body string) { notify := "notify" pushReq.PushChannel.Ios.NotificationType = ¬ify pushReq.PushChannel.Ios.Aps.Sound = "default" + pushReq.PushChannel.Ios.AutoBadge = incOne pushReq.PushChannel.Ios.Aps.Alert = Alert{ Title: title, Body: body, @@ -172,7 +205,8 @@ func (pushReq *PushReq) setPushChannel(title string, body string) { ChannelID string `json:"/message/android/notification/channel_id"` Sound string `json:"/message/android/notification/sound"` Importance string `json:"/message/android/notification/importance"` - }{ChannelID: "RingRing4", Sound: "/raw/ring001", Importance: "NORMAL"}, + Category string `json:"/message/android/category"` + }{ChannelID: "RingRing4", Sound: "/raw/ring001", Importance: "NORMAL", Category: "IM"}, XM: struct { ChannelID string `json:"/extra.channel_id"` }{ChannelID: "high_system"}, diff --git a/internal/push/offlinepush/getui/push.go b/internal/push/offlinepush/getui/push.go index e266f9c46..e82b62c7a 100644 --- a/internal/push/offlinepush/getui/push.go +++ b/internal/push/offlinepush/getui/push.go @@ -18,6 +18,7 @@ import ( "context" "crypto/sha256" "encoding/hex" + "errors" "strconv" "sync" "time" @@ -70,7 +71,7 @@ func NewClient(pushConf *config.Push, cache cache.ThirdCache) *Client { func (g *Client) Push(ctx context.Context, userIDs []string, title, content string, opts *options.Opts) error { token, err := g.cache.GetGetuiToken(ctx) if err != nil { - if errs.Unwrap(err) == redis.Nil { + if errors.Is(err, redis.Nil) { log.ZDebug(ctx, "getui token not exist in redis") token, err = g.getTokenAndSave2Redis(ctx) if err != nil { @@ -144,7 +145,7 @@ func (g *Client) Auth(ctx context.Context, timeStamp int64) (token string, expir func (g *Client) GetTaskID(ctx context.Context, token string, pushReq PushReq) (string, error) { respTask := TaskResp{} ttl := int64(1000 * 60 * 5) - pushReq.Settings = &Settings{TTL: &ttl} + pushReq.Settings = &Settings{TTL: &ttl, Strategy: defaultStrategy} err := g.request(ctx, taskURL, pushReq, token, &respTask) if err != nil { return "", errs.Wrap(err) @@ -188,6 +189,7 @@ func (g *Client) postReturn( if err != nil { return err } + log.ZDebug(ctx, "postReturn", "url", url, "header", header, "input", input, "timeout", timeout, "output", output) return output.parseError() } @@ -204,7 +206,7 @@ func (g *Client) getTokenAndSave2Redis(ctx context.Context) (token string, err e } func (g *Client) GetTaskIDAndSave2Redis(ctx context.Context, token string, pushReq PushReq) (taskID string, err error) { - pushReq.Settings = &Settings{TTL: &g.taskIDTTL} + pushReq.Settings = &Settings{TTL: &g.taskIDTTL, Strategy: defaultStrategy} taskID, err = g.GetTaskID(ctx, token, pushReq) if err != nil { return diff --git a/pkg/common/storage/cache/redis/lua_script.go b/pkg/common/storage/cache/redis/lua_script.go index c7609cb44..acf7eaa79 100644 --- a/pkg/common/storage/cache/redis/lua_script.go +++ b/pkg/common/storage/cache/redis/lua_script.go @@ -2,7 +2,9 @@ package redis import ( "context" + "errors" "fmt" + "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" @@ -56,7 +58,7 @@ func callLua(ctx context.Context, rdb redis.Scripter, script *redis.Script, keys } } v, err := r.Result() - if err == redis.Nil { + if errors.Is(err, redis.Nil) { err = nil } return v, errs.WrapMsg(err, "call lua err", "scriptHash", script.Hash(), "keys", keys, "args", args) From 8d9c826fe44144c58ff8b2f112349fc523f311fa Mon Sep 17 00:00:00 2001 From: chao <48119764+withchao@users.noreply.github.com> Date: Thu, 27 Feb 2025 16:17:00 +0800 Subject: [PATCH 3/9] feat: the default notification.yml is not configured properly (#3168) * pb * fix: Modifying other fields while setting IsPrivateChat does not take effect * fix: quote message error revoke * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * upgrading pkg tools * fix * fix * optimize log output * feat: support GetLastMessage * feat: support GetLastMessage * feat: s3 switch * feat: s3 switch * fix: GetUsersOnline * feat: SendBusinessNotification supported configuration parameters * feat: SendBusinessNotification supported configuration parameters * feat: SendBusinessNotification supported configuration parameters * feat: seq conversion failed without exiting * fix: DeleteDoc crash * fix: fill send time * fix: fill send time * fix: crash caused by withdrawing messages from users who have left the group * fix: user msg timestamp * seq read config * seq read config * fix: the source message of the reference is withdrawn, and the referenced message is deleted * feat: optimize the default notification.yml * fix: shouldPushOffline --- config/notification.yml | 26 +++++++++++++------------- internal/push/push_handler.go | 5 ++++- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/config/notification.yml b/config/notification.yml index 9445a67c6..0376fefb7 100644 --- a/config/notification.yml +++ b/config/notification.yml @@ -31,7 +31,7 @@ joinGroupApplication: reliabilityLevel: 1 unreadCount: false offlinePush: - enable: false + enable: true title: joinGroupApplication title desc: joinGroupApplication desc ext: joinGroupApplication ext @@ -51,7 +51,7 @@ groupApplicationAccepted: reliabilityLevel: 1 unreadCount: false offlinePush: - enable: false + enable: true title: groupApplicationAccepted title desc: groupApplicationAccepted desc ext: groupApplicationAccepted ext @@ -61,7 +61,7 @@ groupApplicationRejected: reliabilityLevel: 1 unreadCount: false offlinePush: - enable: false + enable: true title: groupApplicationRejected title desc: groupApplicationRejected desc ext: groupApplicationRejected ext @@ -198,7 +198,7 @@ friendApplicationAdded: reliabilityLevel: 1 unreadCount: false offlinePush: - enable: false + enable: true title: Somebody applies to add you as a friend desc: Somebody applies to add you as a friend ext: Somebody applies to add you as a friend @@ -228,7 +228,7 @@ friendAdded: reliabilityLevel: 1 unreadCount: false offlinePush: - enable: true + enable: false title: We have become friends desc: We have become friends ext: We have become friends @@ -238,7 +238,7 @@ friendDeleted: reliabilityLevel: 1 unreadCount: false offlinePush: - enable: true + enable: false title: deleted a friend desc: deleted a friend ext: deleted a friend @@ -248,7 +248,7 @@ friendRemarkSet: reliabilityLevel: 1 unreadCount: false offlinePush: - enable: true + enable: false title: Your friend's profile has been changed desc: Your friend's profile has been changed ext: Your friend's profile has been changed @@ -258,7 +258,7 @@ blackAdded: reliabilityLevel: 1 unreadCount: false offlinePush: - enable: true + enable: false title: blocked a user desc: blocked a user ext: blocked a user @@ -268,7 +268,7 @@ blackDeleted: reliabilityLevel: 1 unreadCount: false offlinePush: - enable: true + enable: false title: Remove a blocked user desc: Remove a blocked user ext: Remove a blocked user @@ -278,7 +278,7 @@ friendInfoUpdated: reliabilityLevel: 1 unreadCount: false offlinePush: - enable: true + enable: false title: friend info updated desc: friend info updated ext: friend info updated @@ -289,7 +289,7 @@ userInfoUpdated: reliabilityLevel: 1 unreadCount: false offlinePush: - enable: true + enable: false title: userInfo updated desc: userInfo updated ext: userInfo updated @@ -310,7 +310,7 @@ conversationChanged: reliabilityLevel: 1 unreadCount: false offlinePush: - enable: true + enable: false title: conversation changed desc: conversation changed ext: conversation changed @@ -320,7 +320,7 @@ conversationSetPrivate: reliabilityLevel: 1 unreadCount: false offlinePush: - enable: true + enable: false title: burn after reading desc: burn after reading ext: burn after reading \ No newline at end of file diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index 80d14499f..826a6e944 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -208,7 +208,10 @@ func (c *ConsumerHandler) shouldPushOffline(_ context.Context, msg *sdkws.MsgDat if !isOfflinePush { return false } - if msg.ContentType == constant.SignalingNotification { + switch msg.ContentType { + case constant.RoomParticipantsConnectedNotification: + return false + case constant.RoomParticipantsDisconnectedNotification: return false } return true From 72cb20f1fe97c2ab8e67bde5b44d2dd1ed12be50 Mon Sep 17 00:00:00 2001 From: icey-yu <119291641+icey-yu@users.noreply.github.com> Date: Thu, 20 Feb 2025 16:13:47 +0800 Subject: [PATCH 4/9] feat: Change after webhook filter && feat SendSimpleMsg (#3151) * feat: msg filter and search system account * feat: search system account * chore: msg * chore: msg * chore: msg * chore: webhook filter && sendSimpleMessage --- config/webhooks.yml | 16 ++--- go.mod | 4 +- go.sum | 12 ++-- internal/api/msg.go | 81 +++++++++++++++++++++++++ internal/rpc/msg/callback.go | 21 ++++++- internal/rpc/msg/filter.go | 46 ++++++++++---- internal/rpc/user/user.go | 26 +++++--- pkg/apistruct/manage.go | 15 +++++ pkg/common/config/config.go | 18 +++--- pkg/common/storage/controller/user.go | 9 ++- pkg/common/storage/database/mgo/user.go | 7 ++- pkg/common/storage/database/user.go | 4 +- pkg/common/webhook/http_client.go | 50 ++++++++++++++- 13 files changed, 255 insertions(+), 54 deletions(-) diff --git a/config/webhooks.yml b/config/webhooks.yml index 854d2dc2c..41c60e7e2 100644 --- a/config/webhooks.yml +++ b/config/webhooks.yml @@ -3,14 +3,7 @@ beforeSendSingleMsg: enable: false timeout: 5 failedContinue: true - # Only the contentType in allowedTypes will send the callback. - # Supports two formats: a single type or a range. The range is defined by the lower and upper bounds connected with a hyphen ("-"). - # e.g. allowedTypes: [1, 100, 200-500, 600-700] means that only contentType within the range - # {1, 100} ∪ [200, 500] ∪ [600, 700] will be allowed through the filter. - # If not set, all contentType messages will through this filter. - allowedTypes: [] # Only the contentType not in deniedTypes will send the callback. - # Supports two formats, same as allowedTypes. # If not set, all contentType messages will through this filter. deniedTypes: [] beforeUpdateUserInfoEx: @@ -23,31 +16,30 @@ afterUpdateUserInfoEx: afterSendSingleMsg: enable: false timeout: 5 - # Only the senID/recvID specified in attentionIds will send the callback + # Only the recvID specified in attentionIds will send the callback # if not set, all user messages will be callback attentionIds: [] # See beforeSendSingleMsg comment. - allowedTypes: [] deniedTypes: [] beforeSendGroupMsg: enable: false timeout: 5 failedContinue: true # See beforeSendSingleMsg comment. - allowedTypes: [] deniedTypes: [] beforeMsgModify: enable: false timeout: 5 failedContinue: true # See beforeSendSingleMsg comment. - allowedTypes: [] deniedTypes: [] afterSendGroupMsg: enable: false timeout: 5 + # Only the recvID specified in attentionIds will send the callback + # if not set, all user messages will be callback + attentionIds: [] # See beforeSendSingleMsg comment. - allowedTypes: [] deniedTypes: [] afterUserOnline: enable: false diff --git a/go.mod b/go.mod index 9a66d7cea..960bc56ce 100644 --- a/go.mod +++ b/go.mod @@ -12,8 +12,8 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.72-alpha.71 - github.com/openimsdk/tools v0.0.50-alpha.72 + github.com/openimsdk/protocol v0.0.72-alpha.74 + github.com/openimsdk/tools v0.0.50-alpha.74 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index 6bd4ecde0..63dfe9236 100644 --- a/go.sum +++ b/go.sum @@ -345,12 +345,12 @@ github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To= github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= -github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= -github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.72-alpha.71 h1:R3utzOlqepaJWTAmnfJi4ccUM/XIoFasSyjQMOipM70= -github.com/openimsdk/protocol v0.0.72-alpha.71/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= -github.com/openimsdk/tools v0.0.50-alpha.72 h1:d/vaZjIfvrNp3EeRJEIiamBO7HiPx6CP4wiuq8NpfzY= -github.com/openimsdk/tools v0.0.50-alpha.72/go.mod h1:B+oqV0zdewN7OiEHYJm+hW+8/Te7B8tHHgD8rK5ZLZk= +github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrkXcDACCqtyM= +github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= +github.com/openimsdk/protocol v0.0.72-alpha.74 h1:cGycdzEOxjPuaeoQhIWEKKVf5zp1I+wx7ZnBemjCJJI= +github.com/openimsdk/protocol v0.0.72-alpha.74/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= +github.com/openimsdk/tools v0.0.50-alpha.74 h1:yh10SiMiivMEjicEQg+QAsH4pvaO+4noMPdlw+ew0Kc= +github.com/openimsdk/tools v0.0.50-alpha.74/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/internal/api/msg.go b/internal/api/msg.go index 0b73af0cb..5503527a5 100644 --- a/internal/api/msg.go +++ b/internal/api/msg.go @@ -15,12 +15,16 @@ package api import ( + "encoding/base64" + "encoding/json" + "github.com/gin-gonic/gin" "github.com/go-playground/validator/v10" "github.com/mitchellh/mapstructure" "github.com/openimsdk/open-im-server/v3/pkg/apistruct" "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/msg" @@ -368,6 +372,83 @@ func (m *MessageApi) BatchSendMsg(c *gin.Context) { apiresp.GinSuccess(c, resp) } +func (m *MessageApi) SendSimpleMessage(c *gin.Context) { + encodedKey, ok := c.GetQuery(webhook.Key) + if !ok { + apiresp.GinError(c, errs.ErrArgs.WithDetail("missing key in query").Wrap()) + return + } + + decodedData, err := base64.StdEncoding.DecodeString(encodedKey) + if err != nil { + apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) + return + } + var ( + req apistruct.SendSingleMsgReq + keyMsgData apistruct.KeyMsgData + + sendID string + sessionType int32 + recvID string + ) + err = json.Unmarshal(decodedData, &keyMsgData) + if err != nil { + apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) + return + } + if keyMsgData.GroupID != "" { + sessionType = constant.ReadGroupChatType + sendID = req.SendID + } else { + sessionType = constant.SingleChatType + sendID = keyMsgData.RecvID + recvID = keyMsgData.SendID + } + // check param + if keyMsgData.SendID == "" { + apiresp.GinError(c, errs.ErrArgs.WithDetail("missing recvID or GroupID").Wrap()) + return + } + if sendID == "" { + apiresp.GinError(c, errs.ErrArgs.WithDetail("missing sendID").Wrap()) + return + } + + msgData := &sdkws.MsgData{ + SendID: sendID, + RecvID: recvID, + GroupID: keyMsgData.GroupID, + ClientMsgID: idutil.GetMsgIDByMD5(sendID), + SenderPlatformID: constant.AdminPlatformID, + SessionType: sessionType, + MsgFrom: constant.UserMsgType, + ContentType: constant.Text, + Content: []byte(req.Content), + OfflinePushInfo: req.OfflinePushInfo, + Ex: req.Ex, + } + + respPb, err := m.Client.SendMsg(c, &msg.SendMsgReq{MsgData: msgData}) + if err != nil { + apiresp.GinError(c, err) + return + } + + var status = constant.MsgSendSuccessed + + _, err = m.Client.SetSendMsgStatus(c, &msg.SetSendMsgStatusReq{ + Status: int32(status), + }) + + if err != nil { + apiresp.GinError(c, err) + return + } + + apiresp.GinSuccess(c, respPb) +} + func (m *MessageApi) CheckMsgIsSendSuccess(c *gin.Context) { a2r.Call(c, msg.MsgClient.GetSendMsgStatus, m.Client) } diff --git a/internal/rpc/msg/callback.go b/internal/rpc/msg/callback.go index c66dd6ca9..adf1ff735 100644 --- a/internal/rpc/msg/callback.go +++ b/internal/rpc/msg/callback.go @@ -16,7 +16,11 @@ package msg import ( "context" + "encoding/base64" + + "github.com/openimsdk/open-im-server/v3/pkg/apistruct" "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" + "github.com/openimsdk/tools/utils/stringutil" cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -94,7 +98,7 @@ func (m *msgServer) webhookAfterSendSingleMsg(ctx context.Context, after *config CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand), RecvID: msg.MsgData.RecvID, } - m.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after) + m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData)) } func (m *msgServer) webhookBeforeSendGroupMsg(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq) error { @@ -128,7 +132,8 @@ func (m *msgServer) webhookAfterSendGroupMsg(ctx context.Context, after *config. CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand), GroupID: msg.MsgData.GroupID, } - m.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after) + + m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData)) } func (m *msgServer) webhookBeforeMsgModify(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq) error { @@ -192,3 +197,15 @@ func (m *msgServer) webhookAfterRevokeMsg(ctx context.Context, after *config.Aft } m.webhookClient.AsyncPost(ctx, callbackReq.GetCallbackCommand(), callbackReq, &cbapi.CallbackAfterRevokeMsgResp{}, after) } + +func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string { + keyMsgData := apistruct.KeyMsgData{ + SendID: msg.SendID, + RecvID: msg.RecvID, + GroupID: msg.GroupID, + } + + return map[string]string{ + webhook.Key: base64.StdEncoding.EncodeToString(stringutil.StructToJsonBytes(keyMsgData)), + } +} diff --git a/internal/rpc/msg/filter.go b/internal/rpc/msg/filter.go index ed1a488f1..36511ec7b 100644 --- a/internal/rpc/msg/filter.go +++ b/internal/rpc/msg/filter.go @@ -1,11 +1,13 @@ package msg import ( + "strconv" + "strings" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/protocol/constant" pbchat "github.com/openimsdk/protocol/msg" "github.com/openimsdk/tools/utils/datautil" - "strconv" - "strings" ) const ( @@ -13,28 +15,50 @@ const ( ) func filterAfterMsg(msg *pbchat.SendMsgReq, after *config.AfterConfig) bool { - return filterMsg(msg, after.AttentionIds, after.AllowedTypes, after.DeniedTypes) + return filterMsg(msg, after.AttentionIds, after.DeniedTypes) } func filterBeforeMsg(msg *pbchat.SendMsgReq, before *config.BeforeConfig) bool { - return filterMsg(msg, nil, before.AllowedTypes, before.DeniedTypes) + return filterMsg(msg, nil, before.DeniedTypes) } -func filterMsg(msg *pbchat.SendMsgReq, attentionIds, allowedTypes, deniedTypes []string) bool { +func filterMsg(msg *pbchat.SendMsgReq, attentionIds []string, deniedTypes []int32) bool { // According to the attentionIds configuration, only some users are sent - if len(attentionIds) != 0 && !datautil.Contains([]string{msg.MsgData.SendID, msg.MsgData.RecvID}, attentionIds...) { + if len(attentionIds) != 0 && !datautil.Contain(msg.MsgData.RecvID, attentionIds...) { return false } - if len(allowedTypes) != 0 && !isInInterval(msg.MsgData.ContentType, allowedTypes) { + + if defaultDeniedTypes(msg.MsgData.ContentType) { return false } - if len(deniedTypes) != 0 && isInInterval(msg.MsgData.ContentType, deniedTypes) { + + if len(deniedTypes) != 0 && datautil.Contain(msg.MsgData.ContentType, deniedTypes...) { return false } + //if len(allowedTypes) != 0 && !isInInterval(msg.MsgData.ContentType, allowedTypes) { + // return false + //} + //if len(deniedTypes) != 0 && isInInterval(msg.MsgData.ContentType, deniedTypes) { + // return false + //} return true } -func isInInterval(contentType int32, interval []string) bool { +func defaultDeniedTypes(contentType int32) bool { + if contentType >= constant.NotificationBegin && contentType <= constant.NotificationEnd { + return true + } + if contentType == constant.Typing { + return true + } + return false +} + +// isInInterval if data is in interval +// Supports two formats: a single type or a range. The range is defined by the lower and upper bounds connected with a hyphen ("-") +// e.g. [1, 100, 200-500, 600-700] means that only data within the range +// {1, 100} ∪ [200, 500] ∪ [600, 700] will return true. +func isInInterval(data int32, interval []string) bool { for _, v := range interval { if strings.Contains(v, separator) { // is interval @@ -50,7 +74,7 @@ func isInInterval(contentType int32, interval []string) bool { if err != nil { continue } - if datautil.BetweenEq(int(contentType), bottom, top) { + if datautil.BetweenEq(int(data), bottom, top) { return true } } else { @@ -58,7 +82,7 @@ func isInInterval(contentType int32, interval []string) bool { if err != nil { continue } - if int(contentType) == iv { + if int(data) == iv { return true } } diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index 15e93a988..d4fe7ecc4 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -566,7 +566,7 @@ func (s *userServer) SearchNotificationAccount(ctx context.Context, req *pbuser. } // Convert users to response format - resp := s.userModelToResp(users, req.Pagination) + resp := s.userModelToResp(users, req.Pagination, req.AppManagerLevel) if resp.Total != 0 { return resp, nil } @@ -576,17 +576,24 @@ func (s *userServer) SearchNotificationAccount(ctx context.Context, req *pbuser. if err != nil { return nil, err } - resp = s.userModelToResp(users, req.Pagination) + resp = s.userModelToResp(users, req.Pagination, req.AppManagerLevel) return resp, nil } // If no keyword, find users with notification settings - users, err = s.db.FindNotification(ctx, constant.AppNotificationAdmin) - if err != nil { - return nil, err + if req.AppManagerLevel != nil { + users, err = s.db.FindNotification(ctx, int64(*req.AppManagerLevel)) + if err != nil { + return nil, err + } + } else { + users, err = s.db.FindSystemAccount(ctx) + if err != nil { + return nil, err + } } - resp := s.userModelToResp(users, req.Pagination) + resp := s.userModelToResp(users, req.Pagination, req.AppManagerLevel) return resp, nil } @@ -625,11 +632,16 @@ func (s *userServer) genUserID() string { return string(data) } -func (s *userServer) userModelToResp(users []*tablerelation.User, pagination pagination.Pagination) *pbuser.SearchNotificationAccountResp { +func (s *userServer) userModelToResp(users []*tablerelation.User, pagination pagination.Pagination, appManagerLevel *int32) *pbuser.SearchNotificationAccountResp { accounts := make([]*pbuser.NotificationAccountInfo, 0) var total int64 for _, v := range users { if v.AppMangerLevel >= constant.AppNotificationAdmin && !datautil.Contain(v.UserID, s.config.Share.IMAdminUserID...) { + if appManagerLevel != nil { + if v.AppMangerLevel != *appManagerLevel { + continue + } + } temp := &pbuser.NotificationAccountInfo{ UserID: v.UserID, FaceURL: v.FaceURL, diff --git a/pkg/apistruct/manage.go b/pkg/apistruct/manage.go index f4deb9fb1..4f1d5863f 100644 --- a/pkg/apistruct/manage.go +++ b/pkg/apistruct/manage.go @@ -111,6 +111,21 @@ type BatchSendMsgResp struct { FailedIDs []string `json:"failedUserIDs"` } +// SendSingleMsgReq defines the structure for sending a message to multiple recipients. +type SendSingleMsgReq struct { + // groupMsg should appoint sendID + SendID string `json:"sendID"` + Content string `json:"content" binding:"required"` + OfflinePushInfo *sdkws.OfflinePushInfo `json:"offlinePushInfo"` + Ex string `json:"ex"` +} + +type KeyMsgData struct { + SendID string `json:"sendID"` + RecvID string `json:"recvID"` + GroupID string `json:"groupID"` +} + // SingleReturnResult encapsulates the result of a single message send attempt. type SingleReturnResult struct { // ServerMsgID is the message identifier on the server-side. diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 275dec70f..eb38b64ac 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -364,19 +364,17 @@ type Redis struct { } type BeforeConfig struct { - Enable bool `mapstructure:"enable"` - Timeout int `mapstructure:"timeout"` - FailedContinue bool `mapstructure:"failedContinue"` - AllowedTypes []string `mapstructure:"allowedTypes"` - DeniedTypes []string `mapstructure:"deniedTypes"` + Enable bool `yaml:"enable"` + Timeout int `yaml:"timeout"` + FailedContinue bool `yaml:"failedContinue"` + DeniedTypes []int32 `yaml:"deniedTypes"` } type AfterConfig struct { - Enable bool `mapstructure:"enable"` - Timeout int `mapstructure:"timeout"` - AttentionIds []string `mapstructure:"attentionIds"` - AllowedTypes []string `mapstructure:"allowedTypes"` - DeniedTypes []string `mapstructure:"deniedTypes"` + Enable bool `yaml:"enable"` + Timeout int `yaml:"timeout"` + AttentionIds []string `yaml:"attentionIds"` + DeniedTypes []int32 `yaml:"deniedTypes"` } type Share struct { diff --git a/pkg/common/storage/controller/user.go b/pkg/common/storage/controller/user.go index 3f34481a3..a8ef1033e 100644 --- a/pkg/common/storage/controller/user.go +++ b/pkg/common/storage/controller/user.go @@ -20,6 +20,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/protocol/constant" "github.com/openimsdk/tools/db/pagination" "github.com/openimsdk/tools/db/tx" "github.com/openimsdk/tools/utils/datautil" @@ -37,8 +38,10 @@ type UserDatabase interface { Find(ctx context.Context, userIDs []string) (users []*model.User, err error) // Find userInfo By Nickname FindByNickname(ctx context.Context, nickname string) (users []*model.User, err error) - // Find notificationAccounts + // FindNotification find system account by level FindNotification(ctx context.Context, level int64) (users []*model.User, err error) + // FindSystemAccount find all system account + FindSystemAccount(ctx context.Context) (users []*model.User, err error) // Create Insert multiple external guarantees that the userID is not repeated and does not exist in the storage Create(ctx context.Context, users []*model.User) (err error) // UpdateByMap update (zero value) external guarantee userID exists @@ -139,6 +142,10 @@ func (u *userDatabase) FindNotification(ctx context.Context, level int64) (users return u.userDB.TakeNotification(ctx, level) } +func (u *userDatabase) FindSystemAccount(ctx context.Context) (users []*model.User, err error) { + return u.userDB.TakeGTEAppManagerLevel(ctx, constant.AppNotificationAdmin) +} + // Create Insert multiple external guarantees that the userID is not repeated and does not exist in the storage. func (u *userDatabase) Create(ctx context.Context, users []*model.User) (err error) { return u.tx.Transaction(ctx, func(ctx context.Context) error { diff --git a/pkg/common/storage/database/mgo/user.go b/pkg/common/storage/database/mgo/user.go index ee92b7554..a08765baf 100644 --- a/pkg/common/storage/database/mgo/user.go +++ b/pkg/common/storage/database/mgo/user.go @@ -16,9 +16,10 @@ package mgo import ( "context" + "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" - "time" "github.com/openimsdk/protocol/user" "github.com/openimsdk/tools/db/mongoutil" @@ -71,6 +72,10 @@ func (u *UserMgo) TakeNotification(ctx context.Context, level int64) (user []*mo return mongoutil.Find[*model.User](ctx, u.coll, bson.M{"app_manger_level": level}) } +func (u *UserMgo) TakeGTEAppManagerLevel(ctx context.Context, level int64) (user []*model.User, err error) { + return mongoutil.Find[*model.User](ctx, u.coll, bson.M{"app_manager_level": bson.M{"$gte": level}}) +} + func (u *UserMgo) TakeByNickname(ctx context.Context, nickname string) (user []*model.User, err error) { return mongoutil.Find[*model.User](ctx, u.coll, bson.M{"nickname": nickname}) } diff --git a/pkg/common/storage/database/user.go b/pkg/common/storage/database/user.go index 4ddc8285f..c597424b9 100644 --- a/pkg/common/storage/database/user.go +++ b/pkg/common/storage/database/user.go @@ -16,10 +16,11 @@ package database import ( "context" + "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/protocol/user" "github.com/openimsdk/tools/db/pagination" - "time" ) type User interface { @@ -28,6 +29,7 @@ type User interface { Find(ctx context.Context, userIDs []string) (users []*model.User, err error) Take(ctx context.Context, userID string) (user *model.User, err error) TakeNotification(ctx context.Context, level int64) (user []*model.User, err error) + TakeGTEAppManagerLevel(ctx context.Context, level int64) (user []*model.User, err error) TakeByNickname(ctx context.Context, nickname string) (user []*model.User, err error) Page(ctx context.Context, pagination pagination.Pagination) (count int64, users []*model.User, err error) PageFindUser(ctx context.Context, level1 int64, level2 int64, pagination pagination.Pagination) (count int64, users []*model.User, err error) diff --git a/pkg/common/webhook/http_client.go b/pkg/common/webhook/http_client.go index e46f08806..0cd13f6e2 100644 --- a/pkg/common/webhook/http_client.go +++ b/pkg/common/webhook/http_client.go @@ -17,6 +17,9 @@ package webhook import ( "context" "encoding/json" + "net/http" + "net/url" + "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" @@ -25,7 +28,6 @@ import ( "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/mq/memamq" "github.com/openimsdk/tools/utils/httputil" - "net/http" ) type Client struct { @@ -37,6 +39,8 @@ type Client struct { const ( webhookWorkerCount = 2 webhookBufferSize = 100 + + Key = "key" ) func NewWebhookClient(url string, options ...*memamq.MemoryQueue) *Client { @@ -66,6 +70,12 @@ func (c *Client) AsyncPost(ctx context.Context, command string, req callbackstru } } +func (c *Client) AsyncPostWithQuery(ctx context.Context, command string, req callbackstruct.CallbackReq, resp callbackstruct.CallbackResp, after *config.AfterConfig, queryParams map[string]string) { + if after.Enable { + c.queue.Push(func() { c.postWithQuery(ctx, command, req, resp, after.Timeout, queryParams) }) + } +} + func (c *Client) post(ctx context.Context, command string, input interface{}, output callbackstruct.CallbackResp, timeout int) error { ctx = mcontext.WithMustInfoCtx([]string{mcontext.GetOperationID(ctx), mcontext.GetOpUserID(ctx), mcontext.GetOpUserPlatform(ctx), mcontext.GetConnID(ctx)}) fullURL := c.url + "/" + command @@ -84,3 +94,41 @@ func (c *Client) post(ctx context.Context, command string, input interface{}, ou log.ZInfo(ctx, "webhook success", "url", fullURL, "input", input, "response", string(b)) return nil } + +func (c *Client) postWithQuery(ctx context.Context, command string, input interface{}, output callbackstruct.CallbackResp, timeout int, queryParams map[string]string) error { + ctx = mcontext.WithMustInfoCtx([]string{mcontext.GetOperationID(ctx), mcontext.GetOpUserID(ctx), mcontext.GetOpUserPlatform(ctx), mcontext.GetConnID(ctx)}) + fullURL := c.url + "/" + command + + parsedURL, err := url.Parse(fullURL) + if err != nil { + return servererrs.ErrNetwork.WrapMsg(err.Error(), "failed to parse URL", fullURL) + } + + query := parsedURL.Query() + + operationID, _ := ctx.Value(constant.OperationID).(string) + + for key, value := range queryParams { + query.Set(key, value) + } + + parsedURL.RawQuery = query.Encode() + + fullURL = parsedURL.String() + log.ZInfo(ctx, "webhook", "url", fullURL, "input", input, "config", timeout) + + b, err := c.client.Post(ctx, fullURL, map[string]string{constant.OperationID: operationID}, input, timeout) + if err != nil { + return servererrs.ErrNetwork.WrapMsg(err.Error(), "post url", fullURL) + } + + if err = json.Unmarshal(b, output); err != nil { + return servererrs.ErrData.WithDetail(err.Error() + " response format error") + } + if err := output.Parse(); err != nil { + return err + } + + log.ZInfo(ctx, "webhook success", "url", fullURL, "input", input, "response", string(b)) + return nil +} From 6bba316e649f8b11f90eb4a35e7b971713119255 Mon Sep 17 00:00:00 2001 From: OpenIM-Gordon <1432970085@qq.com> Date: Fri, 28 Feb 2025 16:22:00 +0800 Subject: [PATCH 5/9] feat: add a new message type: Markdown text (#3162) --- go.mod | 4 ++-- go.sum | 12 ++++++------ internal/api/msg.go | 3 +++ internal/rpc/msg/verify.go | 31 +++++++------------------------ pkg/apistruct/msg.go | 9 +++++++++ 5 files changed, 27 insertions(+), 32 deletions(-) diff --git a/go.mod b/go.mod index 9a66d7cea..e515400a0 100644 --- a/go.mod +++ b/go.mod @@ -12,8 +12,8 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.72-alpha.71 - github.com/openimsdk/tools v0.0.50-alpha.72 + github.com/openimsdk/protocol v0.0.72-alpha.75 + github.com/openimsdk/tools v0.0.50-alpha.74 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index 6bd4ecde0..e8f1300f4 100644 --- a/go.sum +++ b/go.sum @@ -345,12 +345,12 @@ github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To= github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= -github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= -github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.72-alpha.71 h1:R3utzOlqepaJWTAmnfJi4ccUM/XIoFasSyjQMOipM70= -github.com/openimsdk/protocol v0.0.72-alpha.71/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= -github.com/openimsdk/tools v0.0.50-alpha.72 h1:d/vaZjIfvrNp3EeRJEIiamBO7HiPx6CP4wiuq8NpfzY= -github.com/openimsdk/tools v0.0.50-alpha.72/go.mod h1:B+oqV0zdewN7OiEHYJm+hW+8/Te7B8tHHgD8rK5ZLZk= +github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrkXcDACCqtyM= +github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= +github.com/openimsdk/protocol v0.0.72-alpha.75 h1:WlmBn8g2Fvv21g8TEFVbolmbw2rU0sN9kj6sQaDK7cA= +github.com/openimsdk/protocol v0.0.72-alpha.75/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= +github.com/openimsdk/tools v0.0.50-alpha.74 h1:yh10SiMiivMEjicEQg+QAsH4pvaO+4noMPdlw+ew0Kc= +github.com/openimsdk/tools v0.0.50-alpha.74/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/internal/api/msg.go b/internal/api/msg.go index 0b73af0cb..eeea3b816 100644 --- a/internal/api/msg.go +++ b/internal/api/msg.go @@ -18,6 +18,7 @@ import ( "github.com/gin-gonic/gin" "github.com/go-playground/validator/v10" "github.com/mitchellh/mapstructure" + "github.com/openimsdk/open-im-server/v3/pkg/apistruct" "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -171,6 +172,8 @@ func (m *MessageApi) getSendMsgReq(c *gin.Context, req apistruct.SendMsg) (sendM data = apistruct.AtElem{} case constant.Custom: data = apistruct.CustomElem{} + case constant.MarkdownText: + data = apistruct.MarkdownTextElem{} case constant.OANotification: data = apistruct.OANotificationElem{} req.SessionType = constant.NotificationChatType diff --git a/internal/rpc/msg/verify.go b/internal/rpc/msg/verify.go index f6c3147ba..a492232a4 100644 --- a/internal/rpc/msg/verify.go +++ b/internal/rpc/msg/verify.go @@ -16,13 +16,14 @@ package msg import ( "context" + "math/rand" + "strconv" + "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/encrypt" "github.com/openimsdk/tools/utils/timeutil" - "math/rand" - "strconv" - "time" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/msg" @@ -137,27 +138,9 @@ func (m *msgServer) encapsulateMsgData(msg *sdkws.MsgData) { msg.SendTime = timeutil.GetCurrentTimestampByMill() } switch msg.ContentType { - case constant.Text: - fallthrough - case constant.Picture: - fallthrough - case constant.Voice: - fallthrough - case constant.Video: - fallthrough - case constant.File: - fallthrough - case constant.AtText: - fallthrough - case constant.Merger: - fallthrough - case constant.Card: - fallthrough - case constant.Location: - fallthrough - case constant.Custom: - fallthrough - case constant.Quote: + case constant.Text, constant.Picture, constant.Voice, constant.Video, + constant.File, constant.AtText, constant.Merger, constant.Card, + constant.Location, constant.Custom, constant.Quote, constant.AdvancedText, constant.MarkdownText: case constant.Revoke: datautil.SetSwitchFromOptions(msg.Options, constant.IsUnreadCount, false) datautil.SetSwitchFromOptions(msg.Options, constant.IsOfflinePush, false) diff --git a/pkg/apistruct/msg.go b/pkg/apistruct/msg.go index dc20b5104..40444d6f5 100644 --- a/pkg/apistruct/msg.go +++ b/pkg/apistruct/msg.go @@ -81,6 +81,15 @@ type TextElem struct { Content string `json:"content" validate:"required"` } +type MarkdownTextElem struct { + Content string `mapstructure:"content" validate:"required"` +} + +type StreamMsgElem struct { + Type string `mapstructure:"type" validate:"required"` + Content string `mapstructure:"content" validate:"required"` +} + type RevokeElem struct { RevokeMsgClientID string `mapstructure:"revokeMsgClientID" validate:"required"` } From a66312dabe42730b0afd8186d6f332386e497d32 Mon Sep 17 00:00:00 2001 From: icey-yu <119291641+icey-yu@users.noreply.github.com> Date: Tue, 4 Mar 2025 18:04:21 +0800 Subject: [PATCH 6/9] feat: system account send msg doesn't need friend verify (#3187) --- internal/rpc/msg/verify.go | 12 ++++++++++++ pkg/authverify/token.go | 5 +++++ 2 files changed, 17 insertions(+) diff --git a/internal/rpc/msg/verify.go b/internal/rpc/msg/verify.go index f6c3147ba..37f0dae97 100644 --- a/internal/rpc/msg/verify.go +++ b/internal/rpc/msg/verify.go @@ -16,6 +16,11 @@ package msg import ( "context" + "math/rand" + "strconv" + "time" + + "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/encrypt" @@ -62,6 +67,13 @@ func (m *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgRe if err := m.webhookBeforeSendSingleMsg(ctx, &m.config.WebhooksConfig.BeforeSendSingleMsg, data); err != nil { return err } + u, err := m.UserLocalCache.GetUserInfo(ctx, data.MsgData.SendID) + if err != nil { + return err + } + if authverify.CheckSystemAccount(ctx, u.AppMangerLevel) { + return nil + } black, err := m.FriendLocalCache.IsBlack(ctx, data.MsgData.SendID, data.MsgData.RecvID) if err != nil { return err diff --git a/pkg/authverify/token.go b/pkg/authverify/token.go index f1b377bad..872feb1cf 100644 --- a/pkg/authverify/token.go +++ b/pkg/authverify/token.go @@ -20,6 +20,7 @@ import ( "github.com/golang-jwt/jwt/v4" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" + "github.com/openimsdk/protocol/constant" "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/utils/datautil" ) @@ -55,3 +56,7 @@ func CheckAdmin(ctx context.Context, imAdminUserID []string) error { func IsManagerUserID(opUserID string, imAdminUserID []string) bool { return datautil.Contain(opUserID, imAdminUserID...) } + +func CheckSystemAccount(ctx context.Context, level int32) bool { + return level >= constant.AppAdmin +} From 883ea4dcb9880e4345c45706c31503d16c05b830 Mon Sep 17 00:00:00 2001 From: chao <48119764+withchao@users.noreply.github.com> Date: Wed, 5 Mar 2025 17:04:57 +0800 Subject: [PATCH 7/9] feat: sending messages supports returning fields modified by webhook (#3192) * pb * fix: Modifying other fields while setting IsPrivateChat does not take effect * fix: quote message error revoke * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * upgrading pkg tools * fix * fix * optimize log output * feat: support GetLastMessage * feat: support GetLastMessage * feat: s3 switch * feat: s3 switch * fix: GetUsersOnline * feat: SendBusinessNotification supported configuration parameters * feat: SendBusinessNotification supported configuration parameters * feat: SendBusinessNotification supported configuration parameters * feat: seq conversion failed without exiting * fix: DeleteDoc crash * fix: fill send time * fix: fill send time * fix: crash caused by withdrawing messages from users who have left the group * fix: user msg timestamp * seq read config * seq read config * fix: the source message of the reference is withdrawn, and the referenced message is deleted * feat: optimize the default notification.yml * fix: shouldPushOffline * fix: the sorting is wrong after canceling the administrator in group settings * feat: Sending messages supports returning fields modified by webhook * feat: Sending messages supports returning fields modified by webhook * feat: Sending messages supports returning fields modified by webhook --- go.mod | 4 +- go.sum | 12 +-- internal/api/msg.go | 161 ++++++++++++++++++++++++++++++++- internal/rpc/msg/callback.go | 21 ++++- internal/rpc/msg/send.go | 55 +++++++---- pkg/apistruct/manage.go | 12 +++ pkg/apistruct/msg_test.go | 1 - test/webhook/msgmodify/main.go | 65 +++++++++++++ 8 files changed, 296 insertions(+), 35 deletions(-) delete mode 100644 pkg/apistruct/msg_test.go create mode 100644 test/webhook/msgmodify/main.go diff --git a/go.mod b/go.mod index 9a66d7cea..0a9de4010 100644 --- a/go.mod +++ b/go.mod @@ -12,8 +12,8 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.72-alpha.71 - github.com/openimsdk/tools v0.0.50-alpha.72 + github.com/openimsdk/protocol v0.0.72-alpha.79 + github.com/openimsdk/tools v0.0.50-alpha.74 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index 6bd4ecde0..2a86d97ea 100644 --- a/go.sum +++ b/go.sum @@ -345,12 +345,12 @@ github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To= github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= -github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= -github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.72-alpha.71 h1:R3utzOlqepaJWTAmnfJi4ccUM/XIoFasSyjQMOipM70= -github.com/openimsdk/protocol v0.0.72-alpha.71/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= -github.com/openimsdk/tools v0.0.50-alpha.72 h1:d/vaZjIfvrNp3EeRJEIiamBO7HiPx6CP4wiuq8NpfzY= -github.com/openimsdk/tools v0.0.50-alpha.72/go.mod h1:B+oqV0zdewN7OiEHYJm+hW+8/Te7B8tHHgD8rK5ZLZk= +github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrkXcDACCqtyM= +github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= +github.com/openimsdk/protocol v0.0.72-alpha.79 h1:e46no8WVAsmTzyy405klrdoUiG7u+1ohDsXvQuFng4s= +github.com/openimsdk/protocol v0.0.72-alpha.79/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= +github.com/openimsdk/tools v0.0.50-alpha.74 h1:yh10SiMiivMEjicEQg+QAsH4pvaO+4noMPdlw+ew0Kc= +github.com/openimsdk/tools v0.0.50-alpha.74/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/internal/api/msg.go b/internal/api/msg.go index 0b73af0cb..e06d5a7a4 100644 --- a/internal/api/msg.go +++ b/internal/api/msg.go @@ -15,9 +15,15 @@ package api import ( + "encoding/base64" + "encoding/json" + "sync" + "github.com/gin-gonic/gin" "github.com/go-playground/validator/v10" "github.com/mitchellh/mapstructure" + "google.golang.org/protobuf/reflect/protoreflect" + "github.com/openimsdk/open-im-server/v3/pkg/apistruct" "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -36,6 +42,39 @@ import ( "github.com/openimsdk/tools/utils/timeutil" ) +var ( + msgDataDescriptor []protoreflect.FieldDescriptor + msgDataDescriptorOnce sync.Once +) + +func getMsgDataDescriptor() []protoreflect.FieldDescriptor { + msgDataDescriptorOnce.Do(func() { + skip := make(map[string]struct{}) + respFields := new(msg.SendMsgResp).ProtoReflect().Descriptor().Fields() + for i := 0; i < respFields.Len(); i++ { + field := respFields.Get(i) + if !field.HasJSONName() { + continue + } + skip[field.JSONName()] = struct{}{} + } + fields := new(sdkws.MsgData).ProtoReflect().Descriptor().Fields() + num := fields.Len() + msgDataDescriptor = make([]protoreflect.FieldDescriptor, 0, num) + for i := 0; i < num; i++ { + field := fields.Get(i) + if !field.HasJSONName() { + continue + } + if _, ok := skip[field.JSONName()]; ok { + continue + } + msgDataDescriptor = append(msgDataDescriptor, fields.Get(i)) + } + }) + return msgDataDescriptor +} + type MessageApi struct { Client msg.MsgClient userClient *rpcli.UserClient @@ -190,6 +229,42 @@ func (m *MessageApi) getSendMsgReq(c *gin.Context, req apistruct.SendMsg) (sendM return m.newUserSendMsgReq(c, &req), nil } +func (m *MessageApi) getModifyFields(req, respModify *sdkws.MsgData) map[string]any { + if req == nil || respModify == nil { + return nil + } + fields := make(map[string]any) + reqProtoReflect := req.ProtoReflect() + respProtoReflect := respModify.ProtoReflect() + for _, descriptor := range getMsgDataDescriptor() { + reqValue := reqProtoReflect.Get(descriptor) + respValue := respProtoReflect.Get(descriptor) + if !reqValue.Equal(respValue) { + val := respValue.Interface() + name := descriptor.JSONName() + if name == "content" { + if bs, ok := val.([]byte); ok { + val = string(bs) + } + } + fields[name] = val + } + } + if len(fields) == 0 { + fields = nil + } + return fields +} + +func (m *MessageApi) ginRespSendMsg(c *gin.Context, req *msg.SendMsgReq, resp *msg.SendMsgResp) { + res := m.getModifyFields(req.MsgData, resp.Modify) + resp.Modify = nil + apiresp.GinSuccess(c, &apistruct.SendMsgResp{ + SendMsgResp: resp, + Modify: res, + }) +} + // SendMessage handles the sending of a message. It's an HTTP handler function to be used with Gin framework. func (m *MessageApi) SendMessage(c *gin.Context) { // Initialize a request struct for sending a message. @@ -243,7 +318,7 @@ func (m *MessageApi) SendMessage(c *gin.Context) { } // Respond with a success message and the response payload. - apiresp.GinSuccess(c, respPb) + m.ginRespSendMsg(c, sendMsgReq, respPb) } func (m *MessageApi) SendBusinessNotification(c *gin.Context) { @@ -309,7 +384,7 @@ func (m *MessageApi) SendBusinessNotification(c *gin.Context) { apiresp.GinError(c, err) return } - apiresp.GinSuccess(c, respPb) + m.ginRespSendMsg(c, &sendMsgReq, respPb) } func (m *MessageApi) BatchSendMsg(c *gin.Context) { @@ -363,11 +438,93 @@ func (m *MessageApi) BatchSendMsg(c *gin.Context) { ClientMsgID: rpcResp.ClientMsgID, SendTime: rpcResp.SendTime, RecvID: recvID, + Modify: m.getModifyFields(sendMsgReq.MsgData, rpcResp.Modify), }) } apiresp.GinSuccess(c, resp) } +func (m *MessageApi) SendSimpleMessage(c *gin.Context) { + encodedKey, ok := c.GetQuery(webhook.Key) + if !ok { + apiresp.GinError(c, errs.ErrArgs.WithDetail("missing key in query").Wrap()) + return + } + + decodedData, err := base64.StdEncoding.DecodeString(encodedKey) + if err != nil { + apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) + return + } + var ( + req apistruct.SendSingleMsgReq + keyMsgData apistruct.KeyMsgData + + sendID string + sessionType int32 + recvID string + ) + err = json.Unmarshal(decodedData, &keyMsgData) + if err != nil { + apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) + return + } + if keyMsgData.GroupID != "" { + sessionType = constant.ReadGroupChatType + sendID = req.SendID + } else { + sessionType = constant.SingleChatType + sendID = keyMsgData.RecvID + recvID = keyMsgData.SendID + } + // check param + if keyMsgData.SendID == "" { + apiresp.GinError(c, errs.ErrArgs.WithDetail("missing recvID or GroupID").Wrap()) + return + } + if sendID == "" { + apiresp.GinError(c, errs.ErrArgs.WithDetail("missing sendID").Wrap()) + return + } + + msgData := &sdkws.MsgData{ + SendID: sendID, + RecvID: recvID, + GroupID: keyMsgData.GroupID, + ClientMsgID: idutil.GetMsgIDByMD5(sendID), + SenderPlatformID: constant.AdminPlatformID, + SessionType: sessionType, + MsgFrom: constant.UserMsgType, + ContentType: constant.Text, + Content: []byte(req.Content), + OfflinePushInfo: req.OfflinePushInfo, + Ex: req.Ex, + } + + sendReq := &msg.SendMsgReq{ + MsgData: msgData, + } + + respPb, err := m.Client.SendMsg(c, sendReq) + if err != nil { + apiresp.GinError(c, err) + return + } + + var status = constant.MsgSendSuccessed + + _, err = m.Client.SetSendMsgStatus(c, &msg.SetSendMsgStatusReq{ + Status: int32(status), + }) + + if err != nil { + apiresp.GinError(c, err) + return + } + + m.ginRespSendMsg(c, sendReq, respPb) +} + func (m *MessageApi) CheckMsgIsSendSuccess(c *gin.Context) { a2r.Call(c, msg.MsgClient.GetSendMsgStatus, m.Client) } diff --git a/internal/rpc/msg/callback.go b/internal/rpc/msg/callback.go index c66dd6ca9..5bbe64f3d 100644 --- a/internal/rpc/msg/callback.go +++ b/internal/rpc/msg/callback.go @@ -16,7 +16,13 @@ package msg import ( "context" + "encoding/base64" + "encoding/json" + + "github.com/openimsdk/open-im-server/v3/pkg/apistruct" "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" + "github.com/openimsdk/tools/errs" + "github.com/openimsdk/tools/utils/stringutil" cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -131,11 +137,11 @@ func (m *msgServer) webhookAfterSendGroupMsg(ctx context.Context, after *config. m.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after) } -func (m *msgServer) webhookBeforeMsgModify(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq) error { +func (m *msgServer) webhookBeforeMsgModify(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq, beforeMsgData **sdkws.MsgData) error { return webhook.WithCondition(ctx, before, func(ctx context.Context) error { - if msg.MsgData.ContentType != constant.Text { - return nil - } + //if msg.MsgData.ContentType != constant.Text { + // return nil + //} if !filterBeforeMsg(msg, before) { return nil } @@ -146,9 +152,14 @@ func (m *msgServer) webhookBeforeMsgModify(ctx context.Context, before *config.B if err := m.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil { return err } - + if beforeMsgData != nil { + *beforeMsgData = proto.Clone(msg.MsgData).(*sdkws.MsgData) + } if resp.Content != nil { msg.MsgData.Content = []byte(*resp.Content) + if err := json.Unmarshal(msg.MsgData.Content, &struct{}{}); err != nil { + return errs.ErrArgs.WrapMsg("webhook msg modify content is not json", "content", string(msg.MsgData.Content)) + } } datautil.NotNilReplace(msg.MsgData.OfflinePushInfo, resp.OfflinePushInfo) datautil.NotNilReplace(&msg.MsgData.RecvID, resp.RecvID) diff --git a/internal/rpc/msg/send.go b/internal/rpc/msg/send.go index c93f18148..0731dd8d4 100644 --- a/internal/rpc/msg/send.go +++ b/internal/rpc/msg/send.go @@ -29,26 +29,44 @@ import ( "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/utils/datautil" + "google.golang.org/protobuf/proto" ) func (m *msgServer) SendMsg(ctx context.Context, req *pbmsg.SendMsgReq) (*pbmsg.SendMsgResp, error) { - if req.MsgData != nil { - m.encapsulateMsgData(req.MsgData) - switch req.MsgData.SessionType { - case constant.SingleChatType: - return m.sendMsgSingleChat(ctx, req) - case constant.NotificationChatType: - return m.sendMsgNotification(ctx, req) - case constant.ReadGroupChatType: - return m.sendMsgGroupChat(ctx, req) - default: - return nil, errs.ErrArgs.WrapMsg("unknown sessionType") + if req.MsgData == nil { + return nil, errs.ErrArgs.WrapMsg("msgData is nil") + } + before := new(*sdkws.MsgData) + resp, err := m.sendMsg(ctx, req, before) + if err != nil { + return nil, err + } + if *before != nil && proto.Equal(*before, req.MsgData) == false { + resp.Modify = req.MsgData + } + return resp, nil +} + +func (m *msgServer) sendMsg(ctx context.Context, req *pbmsg.SendMsgReq, before **sdkws.MsgData) (*pbmsg.SendMsgResp, error) { + m.encapsulateMsgData(req.MsgData) + if req.MsgData.ContentType == constant.Stream { + if err := m.handlerStreamMsg(ctx, req.MsgData); err != nil { + return nil, err } } - return nil, errs.ErrArgs.WrapMsg("msgData is nil") + switch req.MsgData.SessionType { + case constant.SingleChatType: + return m.sendMsgSingleChat(ctx, req, before) + case constant.NotificationChatType: + return m.sendMsgNotification(ctx, req, before) + case constant.ReadGroupChatType: + return m.sendMsgGroupChat(ctx, req, before) + default: + return nil, errs.ErrArgs.WrapMsg("unknown sessionType") + } } -func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *pbmsg.SendMsgReq) (resp *pbmsg.SendMsgResp, err error) { +func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *pbmsg.SendMsgReq, before **sdkws.MsgData) (resp *pbmsg.SendMsgResp, err error) { if err = m.messageVerification(ctx, req); err != nil { prommetrics.GroupChatMsgProcessFailedCounter.Inc() return nil, err @@ -57,7 +75,7 @@ func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *pbmsg.SendMsgReq) if err = m.webhookBeforeSendGroupMsg(ctx, &m.config.WebhooksConfig.BeforeSendGroupMsg, req); err != nil { return nil, err } - if err := m.webhookBeforeMsgModify(ctx, &m.config.WebhooksConfig.BeforeMsgModify, req); err != nil { + if err := m.webhookBeforeMsgModify(ctx, &m.config.WebhooksConfig.BeforeMsgModify, req, before); err != nil { return nil, err } err = m.MsgDatabase.MsgToMQ(ctx, conversationutil.GenConversationUniqueKeyForGroup(req.MsgData.GroupID), req.MsgData) @@ -139,7 +157,7 @@ func (m *msgServer) setConversationAtInfo(nctx context.Context, msg *sdkws.MsgDa } } -func (m *msgServer) sendMsgNotification(ctx context.Context, req *pbmsg.SendMsgReq) (resp *pbmsg.SendMsgResp, err error) { +func (m *msgServer) sendMsgNotification(ctx context.Context, req *pbmsg.SendMsgReq, before **sdkws.MsgData) (resp *pbmsg.SendMsgResp, err error) { if err := m.MsgDatabase.MsgToMQ(ctx, conversationutil.GenConversationUniqueKeyForSingle(req.MsgData.SendID, req.MsgData.RecvID), req.MsgData); err != nil { return nil, err } @@ -151,7 +169,7 @@ func (m *msgServer) sendMsgNotification(ctx context.Context, req *pbmsg.SendMsgR return resp, nil } -func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq) (resp *pbmsg.SendMsgResp, err error) { +func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq, before **sdkws.MsgData) (resp *pbmsg.SendMsgResp, err error) { if err := m.messageVerification(ctx, req); err != nil { return nil, err } @@ -171,12 +189,11 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq } if !isSend { prommetrics.SingleChatMsgProcessFailedCounter.Inc() - return nil, nil + return nil, errs.ErrArgs.WrapMsg("message is not sent") } else { - if err := m.webhookBeforeMsgModify(ctx, &m.config.WebhooksConfig.BeforeMsgModify, req); err != nil { + if err := m.webhookBeforeMsgModify(ctx, &m.config.WebhooksConfig.BeforeMsgModify, req, before); err != nil { return nil, err } - if err := m.MsgDatabase.MsgToMQ(ctx, conversationutil.GenConversationUniqueKeyForSingle(req.MsgData.SendID, req.MsgData.RecvID), req.MsgData); err != nil { prommetrics.SingleChatMsgProcessFailedCounter.Inc() return nil, err diff --git a/pkg/apistruct/manage.go b/pkg/apistruct/manage.go index f4deb9fb1..d3c1b25e2 100644 --- a/pkg/apistruct/manage.go +++ b/pkg/apistruct/manage.go @@ -15,6 +15,7 @@ package apistruct import ( + pbmsg "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/sdkws" ) @@ -124,4 +125,15 @@ type SingleReturnResult struct { // RecvID uniquely identifies the receiver of the message. RecvID string `json:"recvID"` + + // Modify fields modified via webhook. + Modify map[string]any `json:"modify,omitempty"` +} + +type SendMsgResp struct { + // SendMsgResp original response. + *pbmsg.SendMsgResp + + // Modify fields modified via webhook. + Modify map[string]any `json:"modify,omitempty"` } diff --git a/pkg/apistruct/msg_test.go b/pkg/apistruct/msg_test.go deleted file mode 100644 index 28f878a9f..000000000 --- a/pkg/apistruct/msg_test.go +++ /dev/null @@ -1 +0,0 @@ -package apistruct diff --git a/test/webhook/msgmodify/main.go b/test/webhook/msgmodify/main.go new file mode 100644 index 000000000..3bdfa0ec5 --- /dev/null +++ b/test/webhook/msgmodify/main.go @@ -0,0 +1,65 @@ +package main + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + + "github.com/gin-gonic/gin" + cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" + "github.com/openimsdk/protocol/constant" +) + +func main() { + g := gin.Default() + g.POST("/callbackExample/callbackBeforeMsgModifyCommand", toGin(handlerMsg)) + if err := g.Run(":10006"); err != nil { + panic(err) + } +} + +func toGin[R any](fn func(c *gin.Context, req *R)) gin.HandlerFunc { + return func(c *gin.Context) { + body, err := io.ReadAll(c.Request.Body) + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + fmt.Printf("HTTP %s %s %s\n", c.Request.Method, c.Request.URL, body) + var req R + if err := json.Unmarshal(body, &req); err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + fn(c, &req) + } +} + +func handlerMsg(c *gin.Context, req *cbapi.CallbackMsgModifyCommandReq) { + var resp cbapi.CallbackMsgModifyCommandResp + if req.ContentType != constant.Text { + c.JSON(http.StatusOK, &resp) + return + } + var textElem struct { + Content string `json:"content"` + } + if err := json.Unmarshal([]byte(req.Content), &textElem); err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + const word = "xxx" + if strings.Contains(textElem.Content, word) { + textElem.Content = strings.ReplaceAll(textElem.Content, word, strings.Repeat("*", len(word))) + content, err := json.Marshal(&textElem) + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + tmp := string(content) + resp.Content = &tmp + } + c.JSON(http.StatusOK, &resp) +} From aeee3f33b19810ed9c5214e2b8827574be48026a Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Fri, 7 Mar 2025 14:48:44 +0800 Subject: [PATCH 8/9] resolving merge conflicts --- go.sum | 4 +- internal/api/config_manager.go | 2 +- internal/api/init.go | 2 +- internal/msggateway/ws_server.go | 12 ++- internal/msgtransfer/init.go | 2 +- .../msgtransfer/online_history_msg_handler.go | 2 +- .../online_msg_to_mongo_handler.go | 2 +- internal/push/offlinepush_handler.go | 2 +- internal/push/onlinepusher.go | 2 +- internal/push/push_handler.go | 2 +- internal/rpc/auth/auth.go | 2 +- internal/rpc/msg/send.go | 5 -- internal/rpc/third/s3.go | 8 +- internal/tools/cron_task.go | 2 +- pkg/common/cmd/root.go | 4 +- pkg/common/config/config.go | 2 +- pkg/common/startrpc/start.go | 3 +- pkg/common/storage/controller/msg.go | 2 +- pkg/common/storage/controller/msg_transfer.go | 2 +- pkg/common/storage/controller/push.go | 2 +- pkg/common/storage/controller/s3.go | 4 +- pkg/common/storage/kafka/config.go | 33 +++++++ pkg/common/storage/kafka/consumer_group.go | 68 +++++++++++++++ pkg/common/storage/kafka/producer.go | 82 ++++++++++++++++++ pkg/common/storage/kafka/sarama.go | 85 +++++++++++++++++++ pkg/common/storage/kafka/tls.go | 83 ++++++++++++++++++ pkg/common/storage/kafka/util.go | 34 ++++++++ pkg/common/storage/kafka/verify.go | 79 +++++++++++++++++ tools/check-component/main.go | 4 +- tools/seq/internal/main.go | 2 +- 30 files changed, 503 insertions(+), 35 deletions(-) create mode 100644 pkg/common/storage/kafka/config.go create mode 100644 pkg/common/storage/kafka/consumer_group.go create mode 100644 pkg/common/storage/kafka/producer.go create mode 100644 pkg/common/storage/kafka/sarama.go create mode 100644 pkg/common/storage/kafka/tls.go create mode 100644 pkg/common/storage/kafka/util.go create mode 100644 pkg/common/storage/kafka/verify.go diff --git a/go.sum b/go.sum index 2a86d97ea..66af77379 100644 --- a/go.sum +++ b/go.sum @@ -345,8 +345,8 @@ github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To= github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= -github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrkXcDACCqtyM= -github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= +github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= +github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/protocol v0.0.72-alpha.79 h1:e46no8WVAsmTzyy405klrdoUiG7u+1ohDsXvQuFng4s= github.com/openimsdk/protocol v0.0.72-alpha.79/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= github.com/openimsdk/tools v0.0.50-alpha.74 h1:yh10SiMiivMEjicEQg+QAsH4pvaO+4noMPdlw+ew0Kc= diff --git a/internal/api/config_manager.go b/internal/api/config_manager.go index c61b2cb0b..4d846dd9b 100644 --- a/internal/api/config_manager.go +++ b/internal/api/config_manager.go @@ -73,7 +73,7 @@ func (cm *ConfigManager) GetConfig(c *gin.Context) { func (cm *ConfigManager) GetConfigList(c *gin.Context) { var resp apistruct.GetConfigListResp resp.ConfigNames = cm.config.GetConfigNames() - resp.Environment = runtimeenv.PrintRuntimeEnvironment() + resp.Environment = runtimeenv.RuntimeEnvironment() resp.Version = version.Version apiresp.GinSuccess(c, resp) diff --git a/internal/api/init.go b/internal/api/init.go index 20237ebc2..4bd29c9e0 100644 --- a/internal/api/init.go +++ b/internal/api/init.go @@ -56,7 +56,7 @@ func Start(ctx context.Context, index int, config *Config) error { return err } - config.RuntimeEnv = runtimeenv.PrintRuntimeEnvironment() + config.RuntimeEnv = runtimeenv.RuntimeEnvironment() client, err := kdisc.NewDiscoveryRegister(&config.Discovery, config.RuntimeEnv, []string{ config.Discovery.RpcService.MessageGateway, diff --git a/internal/msggateway/ws_server.go b/internal/msggateway/ws_server.go index 24dd823f6..0731074c0 100644 --- a/internal/msggateway/ws_server.go +++ b/internal/msggateway/ws_server.go @@ -223,15 +223,19 @@ func (ws *WsServer) sendUserOnlineInfoToOtherNode(ctx context.Context, client *C if err != nil { return err } + if len(conns) == 0 || (len(conns) == 1 && ws.disCov.IsSelfNode(conns[0])) { + return nil + } + wg := errgroup.Group{} wg.SetLimit(concurrentRequest) // Online push user online message to other node for _, v := range conns { v := v - log.ZDebug(ctx, " sendUserOnlineInfoToOtherNode conn ", "target", v.Target()) - if v.Target() == ws.disCov.GetSelfConnTarget() { - log.ZDebug(ctx, "Filter out this node", "node", v.Target()) + log.ZDebug(ctx, "sendUserOnlineInfoToOtherNode conn") + if ws.disCov.IsSelfNode(v) { + log.ZDebug(ctx, "Filter out this node") continue } @@ -242,7 +246,7 @@ func (ws *WsServer) sendUserOnlineInfoToOtherNode(ctx context.Context, client *C PlatformID: int32(client.PlatformID), Token: client.token, }) if err != nil { - log.ZWarn(ctx, "MultiTerminalLoginCheck err", err, "node", v.Target()) + log.ZWarn(ctx, "MultiTerminalLoginCheck err", err) } return nil }) diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 96e6bbde0..ea7ebb24c 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -74,7 +74,7 @@ type Config struct { } func Start(ctx context.Context, index int, config *Config) error { - runTimeEnv := runtimeenv.PrintRuntimeEnvironment() + runTimeEnv := runtimeenv.RuntimeEnvironment() log.CInfo(ctx, "MSG-TRANSFER server is initializing", "runTimeEnv", runTimeEnv, "prometheusPorts", config.MsgTransfer.Prometheus.Ports, "index", index) diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 6334c95fd..e1d3f71fd 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -29,6 +29,7 @@ import ( "github.com/go-redis/redis" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/open-im-server/v3/pkg/tools/batcher" "github.com/openimsdk/protocol/constant" @@ -37,7 +38,6 @@ import ( "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" - "github.com/openimsdk/tools/mq/kafka" "github.com/openimsdk/tools/utils/stringutil" "google.golang.org/protobuf/proto" ) diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index 8405be7fe..01872562e 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -21,9 +21,9 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" pbmsg "github.com/openimsdk/protocol/msg" "github.com/openimsdk/tools/log" - "github.com/openimsdk/tools/mq/kafka" "google.golang.org/protobuf/proto" ) diff --git a/internal/push/offlinepush_handler.go b/internal/push/offlinepush_handler.go index 5c69da005..c685a188a 100644 --- a/internal/push/offlinepush_handler.go +++ b/internal/push/offlinepush_handler.go @@ -7,12 +7,12 @@ import ( "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush" "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" "github.com/openimsdk/protocol/constant" pbpush "github.com/openimsdk/protocol/push" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" - "github.com/openimsdk/tools/mq/kafka" "github.com/openimsdk/tools/utils/jsonutil" "google.golang.org/protobuf/proto" ) diff --git a/internal/push/onlinepusher.go b/internal/push/onlinepusher.go index 23e68339c..8bec6e60b 100644 --- a/internal/push/onlinepusher.go +++ b/internal/push/onlinepusher.go @@ -166,7 +166,7 @@ func (k *K8sStaticConsistentHash) GetConnsAndOnlinePush(ctx context.Context, msg } } log.ZDebug(ctx, "genUsers send hosts struct:", "usersHost", usersHost) - var usersConns = make(map[*grpc.ClientConn][]string) + var usersConns = make(map[grpc.ClientConnInterface][]string) for host, userIds := range usersHost { tconn, _ := k.disCov.GetConn(ctx, host) usersConns[tconn] = userIds diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index 826a6e944..23233ef2e 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -14,6 +14,7 @@ import ( "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/open-im-server/v3/pkg/rpccache" @@ -25,7 +26,6 @@ import ( "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" - "github.com/openimsdk/tools/mq/kafka" "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/jsonutil" "github.com/openimsdk/tools/utils/timeutil" diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index 3e096aa64..2e64c365c 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -192,7 +192,7 @@ func (s *authServer) forceKickOff(ctx context.Context, userID string, platformID return err } for _, v := range conns { - log.ZDebug(ctx, "forceKickOff", "conn", v.Target()) + log.ZDebug(ctx, "forceKickOff", "userID", userID, "platformID", platformID) client := msggateway.NewMsgGatewayClient(v) kickReq := &msggateway.KickUserOfflineReq{KickUserIDList: []string{userID}, PlatformID: platformID} _, err := client.KickUserOffline(ctx, kickReq) diff --git a/internal/rpc/msg/send.go b/internal/rpc/msg/send.go index 0731dd8d4..2b66f7a9a 100644 --- a/internal/rpc/msg/send.go +++ b/internal/rpc/msg/send.go @@ -49,11 +49,6 @@ func (m *msgServer) SendMsg(ctx context.Context, req *pbmsg.SendMsgReq) (*pbmsg. func (m *msgServer) sendMsg(ctx context.Context, req *pbmsg.SendMsgReq, before **sdkws.MsgData) (*pbmsg.SendMsgResp, error) { m.encapsulateMsgData(req.MsgData) - if req.MsgData.ContentType == constant.Stream { - if err := m.handlerStreamMsg(ctx, req.MsgData); err != nil { - return nil, err - } - } switch req.MsgData.SessionType { case constant.SingleChatType: return m.sendMsgSingleChat(ctx, req, before) diff --git a/internal/rpc/third/s3.go b/internal/rpc/third/s3.go index 8796fe824..97206dd6d 100644 --- a/internal/rpc/third/s3.go +++ b/internal/rpc/third/s3.go @@ -19,11 +19,12 @@ import ( "encoding/base64" "encoding/hex" "encoding/json" - "github.com/openimsdk/open-im-server/v3/pkg/authverify" "path" "strconv" "time" + "github.com/openimsdk/open-im-server/v3/pkg/authverify" + "github.com/google/uuid" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" @@ -37,7 +38,10 @@ import ( ) func (t *thirdServer) PartLimit(ctx context.Context, req *third.PartLimitReq) (*third.PartLimitResp, error) { - limit := t.s3dataBase.PartLimit() + limit, err := t.s3dataBase.PartLimit() + if err != nil { + return nil, err + } return &third.PartLimitResp{ MinPartSize: limit.MinPartSize, MaxPartSize: limit.MaxPartSize, diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index da1c6320e..5932053d0 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -31,7 +31,7 @@ type CronTaskConfig struct { } func Start(ctx context.Context, conf *CronTaskConfig) error { - conf.runTimeEnv = runtimeenv.PrintRuntimeEnvironment() + conf.runTimeEnv = runtimeenv.RuntimeEnvironment() log.CInfo(ctx, "CRON-TASK server is initializing", "runTimeEnv", conf.runTimeEnv, "chatRecordsClearTime", conf.CronTask.CronExecuteTime, "msgDestructTime", conf.CronTask.RetainChatRecords) if conf.CronTask.RetainChatRecords < 1 { diff --git a/pkg/common/cmd/root.go b/pkg/common/cmd/root.go index 0a405fb6e..80912679c 100644 --- a/pkg/common/cmd/root.go +++ b/pkg/common/cmd/root.go @@ -86,7 +86,7 @@ func (r *RootCmd) initEtcd() error { return err } disConfig := config.Discovery{} - env := runtimeenv.PrintRuntimeEnvironment() + env := runtimeenv.RuntimeEnvironment() err = config.Load(configDirectory, config.DiscoveryConfigFilename, config.EnvPrefixMap[config.DiscoveryConfigFilename], env, &disConfig) if err != nil { @@ -125,7 +125,7 @@ func (r *RootCmd) initializeConfiguration(cmd *cobra.Command, opts *CmdOpts) err return err } - runtimeEnv := runtimeenv.PrintRuntimeEnvironment() + runtimeEnv := runtimeenv.RuntimeEnvironment() // Load common configuration file //opts.configMap[ShareFileName] = StructEnvPrefix{EnvPrefix: shareEnvPrefix, ConfigStruct: &r.share} diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index eb38b64ac..ca448083c 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -18,9 +18,9 @@ import ( "strings" "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/redisutil" - "github.com/openimsdk/tools/mq/kafka" "github.com/openimsdk/tools/s3/aws" "github.com/openimsdk/tools/s3/cos" "github.com/openimsdk/tools/s3/kodo" diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index 27aabca95..99df537f7 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -70,7 +70,7 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf return err } - runTimeEnv := runtimeenv.PrintRuntimeEnvironment() + runTimeEnv := runtimeenv.RuntimeEnvironment() if !autoSetPorts { rpcPort, err := datautil.GetElemByIndex(rpcPorts, index) @@ -177,6 +177,7 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf } err = client.Register( + ctx, rpcRegisterName, registerIP, port, diff --git a/pkg/common/storage/controller/msg.go b/pkg/common/storage/controller/msg.go index 0069dc7cc..5f924e55a 100644 --- a/pkg/common/storage/controller/msg.go +++ b/pkg/common/storage/controller/msg.go @@ -33,12 +33,12 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/convert" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" "github.com/openimsdk/protocol/constant" pbmsg "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" - "github.com/openimsdk/tools/mq/kafka" "github.com/openimsdk/tools/utils/datautil" ) diff --git a/pkg/common/storage/controller/msg_transfer.go b/pkg/common/storage/controller/msg_transfer.go index f4c0c6270..15f3b968f 100644 --- a/pkg/common/storage/controller/msg_transfer.go +++ b/pkg/common/storage/controller/msg_transfer.go @@ -9,12 +9,12 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" pbmsg "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" - "github.com/openimsdk/tools/mq/kafka" "go.mongodb.org/mongo-driver/mongo" ) diff --git a/pkg/common/storage/controller/push.go b/pkg/common/storage/controller/push.go index 91ef126fe..a805eaf00 100644 --- a/pkg/common/storage/controller/push.go +++ b/pkg/common/storage/controller/push.go @@ -19,10 +19,10 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" "github.com/openimsdk/protocol/push" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/log" - "github.com/openimsdk/tools/mq/kafka" ) type PushDatabase interface { diff --git a/pkg/common/storage/controller/s3.go b/pkg/common/storage/controller/s3.go index 6693d2dde..30d8d20ec 100644 --- a/pkg/common/storage/controller/s3.go +++ b/pkg/common/storage/controller/s3.go @@ -30,7 +30,7 @@ import ( ) type S3Database interface { - PartLimit() *s3.PartLimit + PartLimit() (*s3.PartLimit, error) PartSize(ctx context.Context, size int64) (int64, error) AuthSign(ctx context.Context, uploadID string, partNumbers []int) (*s3.AuthSignResult, error) InitiateMultipartUpload(ctx context.Context, hash string, size int64, expire time.Duration, maxParts int) (*cont.InitiateUploadResult, error) @@ -65,7 +65,7 @@ func (s *s3Database) PartSize(ctx context.Context, size int64) (int64, error) { return s.s3.PartSize(ctx, size) } -func (s *s3Database) PartLimit() *s3.PartLimit { +func (s *s3Database) PartLimit() (*s3.PartLimit, error) { return s.s3.PartLimit() } diff --git a/pkg/common/storage/kafka/config.go b/pkg/common/storage/kafka/config.go new file mode 100644 index 000000000..1c9c4b0a0 --- /dev/null +++ b/pkg/common/storage/kafka/config.go @@ -0,0 +1,33 @@ +// Copyright © 2024 OpenIM open source community. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +type TLSConfig struct { + EnableTLS bool `yaml:"enableTLS"` + CACrt string `yaml:"caCrt"` + ClientCrt string `yaml:"clientCrt"` + ClientKey string `yaml:"clientKey"` + ClientKeyPwd string `yaml:"clientKeyPwd"` + InsecureSkipVerify bool `yaml:"insecureSkipVerify"` +} + +type Config struct { + Username string `yaml:"username"` + Password string `yaml:"password"` + ProducerAck string `yaml:"producerAck"` + CompressType string `yaml:"compressType"` + Addr []string `yaml:"addr"` + TLS TLSConfig `yaml:"tls"` +} diff --git a/pkg/common/storage/kafka/consumer_group.go b/pkg/common/storage/kafka/consumer_group.go new file mode 100644 index 000000000..f0e84bbc9 --- /dev/null +++ b/pkg/common/storage/kafka/consumer_group.go @@ -0,0 +1,68 @@ +// Copyright © 2023 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "context" + "errors" + + "github.com/IBM/sarama" + "github.com/openimsdk/tools/log" +) + +type MConsumerGroup struct { + sarama.ConsumerGroup + groupID string + topics []string +} + +func NewMConsumerGroup(conf *Config, groupID string, topics []string, autoCommitEnable bool) (*MConsumerGroup, error) { + config, err := BuildConsumerGroupConfig(conf, sarama.OffsetNewest, autoCommitEnable) + if err != nil { + return nil, err + } + group, err := NewConsumerGroup(config, conf.Addr, groupID) + if err != nil { + return nil, err + } + return &MConsumerGroup{ + ConsumerGroup: group, + groupID: groupID, + topics: topics, + }, nil +} + +func (mc *MConsumerGroup) GetContextFromMsg(cMsg *sarama.ConsumerMessage) context.Context { + return GetContextWithMQHeader(cMsg.Headers) +} + +func (mc *MConsumerGroup) RegisterHandleAndConsumer(ctx context.Context, handler sarama.ConsumerGroupHandler) { + for { + err := mc.ConsumerGroup.Consume(ctx, mc.topics, handler) + if errors.Is(err, sarama.ErrClosedConsumerGroup) { + return + } + if errors.Is(err, context.Canceled) { + return + } + if err != nil { + log.ZWarn(ctx, "consume err", err, "topic", mc.topics, "groupID", mc.groupID) + } + } +} + +func (mc *MConsumerGroup) Close() error { + return mc.ConsumerGroup.Close() +} diff --git a/pkg/common/storage/kafka/producer.go b/pkg/common/storage/kafka/producer.go new file mode 100644 index 000000000..5f6be29ed --- /dev/null +++ b/pkg/common/storage/kafka/producer.go @@ -0,0 +1,82 @@ +// Copyright © 2023 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "context" + "github.com/IBM/sarama" + "github.com/openimsdk/tools/errs" + "google.golang.org/protobuf/proto" +) + +// Producer represents a Kafka producer. +type Producer struct { + addr []string + topic string + config *sarama.Config + producer sarama.SyncProducer +} + +func NewKafkaProducer(config *sarama.Config, addr []string, topic string) (*Producer, error) { + producer, err := NewProducer(config, addr) + if err != nil { + return nil, err + } + return &Producer{ + addr: addr, + topic: topic, + config: config, + producer: producer, + }, nil +} + +// SendMessage sends a message to the Kafka topic configured in the Producer. +func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Message) (int32, int64, error) { + // Marshal the protobuf message + bMsg, err := proto.Marshal(msg) + if err != nil { + return 0, 0, errs.WrapMsg(err, "kafka proto Marshal err") + } + if len(bMsg) == 0 { + return 0, 0, errs.WrapMsg(errEmptyMsg, "kafka proto Marshal err") + } + + // Prepare Kafka message + kMsg := &sarama.ProducerMessage{ + Topic: p.topic, + Key: sarama.StringEncoder(key), + Value: sarama.ByteEncoder(bMsg), + } + + // Validate message key and value + if kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 { + return 0, 0, errs.Wrap(errEmptyMsg) + } + + // Attach context metadata as headers + header, err := GetMQHeaderWithContext(ctx) + if err != nil { + return 0, 0, err + } + kMsg.Headers = header + + // Send the message + partition, offset, err := p.producer.SendMessage(kMsg) + if err != nil { + return 0, 0, errs.WrapMsg(err, "p.producer.SendMessage error") + } + + return partition, offset, nil +} diff --git a/pkg/common/storage/kafka/sarama.go b/pkg/common/storage/kafka/sarama.go new file mode 100644 index 000000000..23220b4d0 --- /dev/null +++ b/pkg/common/storage/kafka/sarama.go @@ -0,0 +1,85 @@ +package kafka + +import ( + "bytes" + "strings" + + "github.com/IBM/sarama" + "github.com/openimsdk/tools/errs" +) + +func BuildConsumerGroupConfig(conf *Config, initial int64, autoCommitEnable bool) (*sarama.Config, error) { + kfk := sarama.NewConfig() + kfk.Version = sarama.V2_0_0_0 + kfk.Consumer.Offsets.Initial = initial + kfk.Consumer.Offsets.AutoCommit.Enable = autoCommitEnable + kfk.Consumer.Return.Errors = false + if conf.Username != "" || conf.Password != "" { + kfk.Net.SASL.Enable = true + kfk.Net.SASL.User = conf.Username + kfk.Net.SASL.Password = conf.Password + } + if conf.TLS.EnableTLS { + tls, err := newTLSConfig(conf.TLS.ClientCrt, conf.TLS.ClientKey, conf.TLS.CACrt, []byte(conf.TLS.ClientKeyPwd), conf.TLS.InsecureSkipVerify) + if err != nil { + return nil, err + } + kfk.Net.TLS.Config = tls + kfk.Net.TLS.Enable = true + } + return kfk, nil +} + +func NewConsumerGroup(conf *sarama.Config, addr []string, groupID string) (sarama.ConsumerGroup, error) { + cg, err := sarama.NewConsumerGroup(addr, groupID, conf) + if err != nil { + return nil, errs.WrapMsg(err, "NewConsumerGroup failed", "addr", addr, "groupID", groupID, "conf", *conf) + } + return cg, nil +} + +func BuildProducerConfig(conf Config) (*sarama.Config, error) { + kfk := sarama.NewConfig() + kfk.Producer.Return.Successes = true + kfk.Producer.Return.Errors = true + kfk.Producer.Partitioner = sarama.NewHashPartitioner + if conf.Username != "" || conf.Password != "" { + kfk.Net.SASL.Enable = true + kfk.Net.SASL.User = conf.Username + kfk.Net.SASL.Password = conf.Password + } + switch strings.ToLower(conf.ProducerAck) { + case "no_response": + kfk.Producer.RequiredAcks = sarama.NoResponse + case "wait_for_local": + kfk.Producer.RequiredAcks = sarama.WaitForLocal + case "wait_for_all": + kfk.Producer.RequiredAcks = sarama.WaitForAll + default: + kfk.Producer.RequiredAcks = sarama.WaitForAll + } + if conf.CompressType == "" { + kfk.Producer.Compression = sarama.CompressionNone + } else { + if err := kfk.Producer.Compression.UnmarshalText(bytes.ToLower([]byte(conf.CompressType))); err != nil { + return nil, errs.WrapMsg(err, "UnmarshalText failed", "compressType", conf.CompressType) + } + } + if conf.TLS.EnableTLS { + tls, err := newTLSConfig(conf.TLS.ClientCrt, conf.TLS.ClientKey, conf.TLS.CACrt, []byte(conf.TLS.ClientKeyPwd), conf.TLS.InsecureSkipVerify) + if err != nil { + return nil, err + } + kfk.Net.TLS.Config = tls + kfk.Net.TLS.Enable = true + } + return kfk, nil +} + +func NewProducer(conf *sarama.Config, addr []string) (sarama.SyncProducer, error) { + producer, err := sarama.NewSyncProducer(addr, conf) + if err != nil { + return nil, errs.WrapMsg(err, "NewSyncProducer failed", "addr", addr, "conf", *conf) + } + return producer, nil +} diff --git a/pkg/common/storage/kafka/tls.go b/pkg/common/storage/kafka/tls.go new file mode 100644 index 000000000..00c89dcc1 --- /dev/null +++ b/pkg/common/storage/kafka/tls.go @@ -0,0 +1,83 @@ +// Copyright © 2024 OpenIM open source community. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "crypto/tls" + "crypto/x509" + "encoding/pem" + "os" + + "github.com/openimsdk/tools/errs" +) + +// decryptPEM decrypts a PEM block using a password. +func decryptPEM(data []byte, passphrase []byte) ([]byte, error) { + if len(passphrase) == 0 { + return data, nil + } + b, _ := pem.Decode(data) + d, err := x509.DecryptPEMBlock(b, passphrase) + if err != nil { + return nil, errs.WrapMsg(err, "DecryptPEMBlock failed") + } + return pem.EncodeToMemory(&pem.Block{ + Type: b.Type, + Bytes: d, + }), nil +} + +func readEncryptablePEMBlock(path string, pwd []byte) ([]byte, error) { + data, err := os.ReadFile(path) + if err != nil { + return nil, errs.WrapMsg(err, "ReadFile failed", "path", path) + } + return decryptPEM(data, pwd) +} + +// newTLSConfig setup the TLS config from general config file. +func newTLSConfig(clientCertFile, clientKeyFile, caCertFile string, keyPwd []byte, insecureSkipVerify bool) (*tls.Config, error) { + var tlsConfig tls.Config + if clientCertFile != "" && clientKeyFile != "" { + certPEMBlock, err := os.ReadFile(clientCertFile) + if err != nil { + return nil, errs.WrapMsg(err, "ReadFile failed", "clientCertFile", clientCertFile) + } + keyPEMBlock, err := readEncryptablePEMBlock(clientKeyFile, keyPwd) + if err != nil { + return nil, err + } + + cert, err := tls.X509KeyPair(certPEMBlock, keyPEMBlock) + if err != nil { + return nil, errs.WrapMsg(err, "X509KeyPair failed") + } + tlsConfig.Certificates = []tls.Certificate{cert} + } + + if caCertFile != "" { + caCert, err := os.ReadFile(caCertFile) + if err != nil { + return nil, errs.WrapMsg(err, "ReadFile failed", "caCertFile", caCertFile) + } + caCertPool := x509.NewCertPool() + if ok := caCertPool.AppendCertsFromPEM(caCert); !ok { + return nil, errs.New("AppendCertsFromPEM failed") + } + tlsConfig.RootCAs = caCertPool + } + tlsConfig.InsecureSkipVerify = insecureSkipVerify + return &tlsConfig, nil +} diff --git a/pkg/common/storage/kafka/util.go b/pkg/common/storage/kafka/util.go new file mode 100644 index 000000000..61abe5450 --- /dev/null +++ b/pkg/common/storage/kafka/util.go @@ -0,0 +1,34 @@ +package kafka + +import ( + "context" + "errors" + "github.com/IBM/sarama" + "github.com/openimsdk/protocol/constant" + "github.com/openimsdk/tools/mcontext" +) + +var errEmptyMsg = errors.New("kafka binary msg is empty") + +// GetMQHeaderWithContext extracts message queue headers from the context. +func GetMQHeaderWithContext(ctx context.Context) ([]sarama.RecordHeader, error) { + operationID, opUserID, platform, connID, err := mcontext.GetCtxInfos(ctx) + if err != nil { + return nil, err + } + return []sarama.RecordHeader{ + {Key: []byte(constant.OperationID), Value: []byte(operationID)}, + {Key: []byte(constant.OpUserID), Value: []byte(opUserID)}, + {Key: []byte(constant.OpUserPlatform), Value: []byte(platform)}, + {Key: []byte(constant.ConnID), Value: []byte(connID)}, + }, nil +} + +// GetContextWithMQHeader creates a context from message queue headers. +func GetContextWithMQHeader(header []*sarama.RecordHeader) context.Context { + var values []string + for _, recordHeader := range header { + values = append(values, string(recordHeader.Value)) + } + return mcontext.WithMustInfoCtx(values) // Attach extracted values to context +} diff --git a/pkg/common/storage/kafka/verify.go b/pkg/common/storage/kafka/verify.go new file mode 100644 index 000000000..0a09eed4e --- /dev/null +++ b/pkg/common/storage/kafka/verify.go @@ -0,0 +1,79 @@ +// Copyright © 2024 OpenIM open source community. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "context" + "fmt" + + "github.com/IBM/sarama" + "github.com/openimsdk/tools/errs" +) + +func CheckTopics(ctx context.Context, conf *Config, topics []string) error { + kfk, err := BuildConsumerGroupConfig(conf, sarama.OffsetNewest, false) + if err != nil { + return err + } + cli, err := sarama.NewClient(conf.Addr, kfk) + if err != nil { + return errs.WrapMsg(err, "NewClient failed", "config: ", fmt.Sprintf("%+v", conf)) + } + defer cli.Close() + + existingTopics, err := cli.Topics() + if err != nil { + return errs.WrapMsg(err, "Failed to list topics") + } + + existingTopicsMap := make(map[string]bool) + for _, t := range existingTopics { + existingTopicsMap[t] = true + } + + for _, topic := range topics { + if !existingTopicsMap[topic] { + return errs.New("topic not exist", "topic", topic).Wrap() + } + } + return nil +} + +func CheckHealth(ctx context.Context, conf *Config) error { + kfk, err := BuildConsumerGroupConfig(conf, sarama.OffsetNewest, false) + if err != nil { + return err + } + cli, err := sarama.NewClient(conf.Addr, kfk) + if err != nil { + return errs.WrapMsg(err, "NewClient failed", "config: ", fmt.Sprintf("%+v", conf)) + } + defer cli.Close() + + // Get broker list + brokers := cli.Brokers() + if len(brokers) == 0 { + return errs.New("no brokers found").Wrap() + } + + // Check if all brokers are reachable + for _, broker := range brokers { + if err := broker.Open(kfk); err != nil { + return errs.WrapMsg(err, "failed to open broker", "broker", broker.Addr()) + } + } + + return nil +} diff --git a/tools/check-component/main.go b/tools/check-component/main.go index 9df0da7de..ad53c1921 100644 --- a/tools/check-component/main.go +++ b/tools/check-component/main.go @@ -25,11 +25,11 @@ import ( "time" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/redisutil" "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/discovery/zookeeper" - "github.com/openimsdk/tools/mq/kafka" "github.com/openimsdk/tools/s3/minio" "github.com/openimsdk/tools/system/program" "github.com/openimsdk/tools/utils/runtimeenv" @@ -84,7 +84,7 @@ func initConfig(configDir string) (*config.Mongo, *config.Redis, *config.Kafka, discovery = &config.Discovery{} thirdConfig = &config.Third{} ) - runtimeEnv := runtimeenv.PrintRuntimeEnvironment() + runtimeEnv := runtimeenv.RuntimeEnvironment() err := config.Load(configDir, config.MongodbConfigFileName, config.EnvPrefixMap[config.MongodbConfigFileName], runtimeEnv, mongoConfig) if err != nil { diff --git a/tools/seq/internal/main.go b/tools/seq/internal/main.go index 574e7cef9..7e5d5598c 100644 --- a/tools/seq/internal/main.go +++ b/tools/seq/internal/main.go @@ -43,7 +43,7 @@ const ( ) func readConfig[T any](dir string, name string) (*T, error) { - if runtimeenv.PrintRuntimeEnvironment() == config.KUBERNETES { + if runtimeenv.RuntimeEnvironment() == config.KUBERNETES { dir = os.Getenv(config.MountConfigFilePath) } v := viper.New() From 22ba315acd006f312ab418957f2896a9e6e8a100 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Wed, 12 Mar 2025 12:07:31 +0800 Subject: [PATCH 9/9] fix: solve unocrrect invite notification (#3220) --- internal/rpc/group/group.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index bea0e1af4..602c4f3ee 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -17,13 +17,14 @@ package group import ( "context" "fmt" - "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "math/big" "math/rand" "strconv" "strings" "time" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" + "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -392,6 +393,8 @@ func (g *groupServer) InviteUserToGroup(ctx context.Context, req *pbgroup.Invite if err := g.PopulateGroupMember(ctx, groupMember); err != nil { return nil, err } + } else { + opUserID = mcontext.GetOpUserID(ctx) } if err := g.webhookBeforeInviteUserToGroup(ctx, &g.config.WebhooksConfig.BeforeInviteUserToGroup, req); err != nil && err != servererrs.ErrCallbackContinue { @@ -427,6 +430,7 @@ func (g *groupServer) InviteUserToGroup(ctx context.Context, req *pbgroup.Invite } } } + var groupMembers []*model.GroupMember for _, userID := range req.InvitedUserIDs { member := &model.GroupMember{