diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index a2c7ba05d..40e4c6e35 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -16,6 +16,8 @@ package msgtransfer import ( "context" + "github.com/openimsdk/tools/utils/idutil" + "github.com/openimsdk/tools/utils/stringutil" "strconv" "strings" "sync" @@ -34,7 +36,6 @@ import ( "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" - "github.com/openimsdk/tools/utils" "google.golang.org/protobuf/proto" ) @@ -347,7 +348,7 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() { log.ZDebug(ctx, "generate map list users len", "length", len(aggregationMsgs)) for uniqueKey, v := range aggregationMsgs { if len(v) >= 0 { - hashCode := utils.GetHashCode(uniqueKey) + hashCode := stringutil.GetHashCode(uniqueKey) channelID := hashCode % ChannelNum newCtx := withAggregationCtx(ctx, v) log.ZDebug( @@ -438,7 +439,7 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim( rwLock.Unlock() start := time.Now() - ctx := mcontext.WithTriggerIDContext(context.Background(), utils.OperationIDGenerator()) + ctx := mcontext.WithTriggerIDContext(context.Background(), idutil.OperationIDGenerator()) log.ZDebug(ctx, "timer trigger msg consumer start", "length", len(buffer)) for i := 0; i < len(buffer)/split; i++ { och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ diff --git a/internal/push/callback.go b/internal/push/callback.go index 7ce342568..37e57dc61 100644 --- a/internal/push/callback.go +++ b/internal/push/callback.go @@ -16,6 +16,7 @@ package push import ( "context" + "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -23,7 +24,6 @@ import ( "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/mcontext" - "github.com/openimsdk/tools/utils" ) func callbackOfflinePush(ctx context.Context, callback *config.Callback, userIDs []string, msg *sdkws.MsgData, offlinePushUserIDs *[]string) error { @@ -65,7 +65,7 @@ func callbackOfflinePush(ctx context.Context, callback *config.Callback, userIDs } func callbackOnlinePush(ctx context.Context, callback *config.Callback, userIDs []string, msg *sdkws.MsgData) error { - if !callback.CallbackOnlinePush.Enable || utils.Contain(msg.SendID, userIDs...) || msg.ContentType == constant.Typing { + if !callback.CallbackOnlinePush.Enable || datautil.Contain(msg.SendID, userIDs...) || msg.ContentType == constant.Typing { return nil } req := callbackstruct.CallbackBeforePushReq{ diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index ef666d1fb..647082966 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -16,6 +16,8 @@ package push import ( "context" + "github.com/openimsdk/tools/utils/datautil" + "github.com/openimsdk/tools/utils/timeutil" "github.com/IBM/sarama" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -24,7 +26,6 @@ import ( pbchat "github.com/openimsdk/protocol/msg" pbpush "github.com/openimsdk/protocol/push" "github.com/openimsdk/tools/log" - "github.com/openimsdk/tools/utils" "google.golang.org/protobuf/proto" ) @@ -55,7 +56,7 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) { ConversationID: msgFromMQ.ConversationID, } sec := msgFromMQ.MsgData.SendTime / 1000 - nowSec := utils.GetCurrentTimestampBySecond() + nowSec := timeutil.GetCurrentTimestampBySecond() log.ZDebug(ctx, "push msg", "msg", pbData.String(), "sec", sec, "nowSec", nowSec) if nowSec-sec > 10 { return @@ -66,7 +67,7 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) { err = c.pusher.Push2SuperGroup(ctx, pbData.MsgData.GroupID, pbData.MsgData) default: var pushUserIDList []string - isSenderSync := utils.GetSwitchFromOptions(pbData.MsgData.Options, constant.IsSenderSync) + isSenderSync := datautil.GetSwitchFromOptions(pbData.MsgData.Options, constant.IsSenderSync) if !isSenderSync || pbData.MsgData.SendID == pbData.MsgData.RecvID { pushUserIDList = append(pushUserIDList, pbData.MsgData.RecvID) } else { diff --git a/internal/push/push_rpc_server.go b/internal/push/push_rpc_server.go index 17cd75f80..6f5a28b18 100644 --- a/internal/push/push_rpc_server.go +++ b/internal/push/push_rpc_server.go @@ -16,6 +16,8 @@ package push import ( "context" + "github.com/openimsdk/tools/discovery" + "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" @@ -24,9 +26,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/protocol/constant" pbpush "github.com/openimsdk/protocol/push" - "github.com/openimsdk/tools/discoveryregistry" "github.com/openimsdk/tools/log" - "github.com/openimsdk/tools/utils" "google.golang.org/grpc" ) @@ -34,7 +34,7 @@ type pushServer struct { pusher *Pusher } -func Start(ctx context.Context, config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { +func Start(ctx context.Context, config *config.GlobalConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { rdb, err := cache.NewRedis(ctx, &config.Redis) if err != nil { return err @@ -80,7 +80,7 @@ func (r *pushServer) PushMsg(ctx context.Context, pbData *pbpush.PushMsgReq) (re err = r.pusher.Push2SuperGroup(ctx, pbData.MsgData.GroupID, pbData.MsgData) default: var pushUserIDList []string - isSenderSync := utils.GetSwitchFromOptions(pbData.MsgData.Options, constant.IsSenderSync) + isSenderSync := datautil.GetSwitchFromOptions(pbData.MsgData.Options, constant.IsSenderSync) if !isSenderSync { pushUserIDList = append(pushUserIDList, pbData.MsgData.RecvID) } else { diff --git a/pkg/util/conversationutil/conversationutil.go b/pkg/util/conversationutil/conversationutil.go new file mode 100644 index 000000000..5683d8df8 --- /dev/null +++ b/pkg/util/conversationutil/conversationutil.go @@ -0,0 +1,46 @@ +package conversationutil + +import ( + "sort" + "strings" +) + +func GenConversationIDForSingle(sendID, recvID string) string { + l := []string{sendID, recvID} + sort.Strings(l) + return "si_" + strings.Join(l, "_") +} + +func GenConversationUniqueKeyForGroup(groupID string) string { + return groupID +} + +func GenGroupConversationID(groupID string) string { + return "sg_" + groupID +} + +func GenConversationUniqueKeyForSingle(sendID, recvID string) string { + l := []string{sendID, recvID} + sort.Strings(l) + return strings.Join(l, "_") +} + +func GetNotificationConversationIDByConversationID(conversationID string) string { + l := strings.Split(conversationID, "_") + if len(l) > 1 { + l[0] = "n" + return strings.Join(l, "_") + } + return "" +} + +func GetSelfNotificationConversationID(userID string) string { + return "n_" + userID + "_" + userID +} + +func GetSeqsBeginEnd(seqs []int64) (int64, int64) { + if len(seqs) == 0 { + return 0, 0 + } + return seqs[0], seqs[len(seqs)-1] +}