|
|
|
@ -28,6 +28,7 @@ import (
|
|
|
|
|
"github.com/openimsdk/open-im-server/v3/pkg/util/conversationutil"
|
|
|
|
|
"github.com/openimsdk/protocol/constant"
|
|
|
|
|
pbchat "github.com/openimsdk/protocol/msg"
|
|
|
|
|
"github.com/openimsdk/protocol/msggateway"
|
|
|
|
|
pbpush "github.com/openimsdk/protocol/push"
|
|
|
|
|
"github.com/openimsdk/protocol/sdkws"
|
|
|
|
|
"github.com/openimsdk/tools/discovery"
|
|
|
|
@ -128,16 +129,11 @@ func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim s
|
|
|
|
|
|
|
|
|
|
// Push2User Suitable for two types of conversations, one is SingleChatType and the other is NotificationChatType.
|
|
|
|
|
func (c *ConsumerHandler) Push2User(ctx context.Context, userIDs []string, msg *sdkws.MsgData) (err error) {
|
|
|
|
|
userIDs, err = c.onlineCache.GetUsersOnline(ctx, userIDs)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
log.ZDebug(ctx, "Get msg from msg_transfer And push msg", "userIDs", userIDs, "msg", msg.String())
|
|
|
|
|
if err := c.webhookBeforeOnlinePush(ctx, &c.config.WebhooksConfig.BeforeOnlinePush, userIDs, msg); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
wsResults, err := c.onlinePusher.GetConnsAndOnlinePush(ctx, msg, userIDs)
|
|
|
|
|
wsResults, err := c.GetConnsAndOnlinePush(ctx, msg, userIDs)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
@ -186,6 +182,38 @@ func (c *ConsumerHandler) shouldPushOffline(_ context.Context, msg *sdkws.MsgDat
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ConsumerHandler) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) ([]*msggateway.SingleMsgToUserResults, error) {
|
|
|
|
|
var (
|
|
|
|
|
onlineUserIDs []string
|
|
|
|
|
offlineUserIDs []string
|
|
|
|
|
)
|
|
|
|
|
for _, userID := range pushToUserIDs {
|
|
|
|
|
online, err := c.onlineCache.GetUserOnline(ctx, userID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
if online {
|
|
|
|
|
onlineUserIDs = append(onlineUserIDs, userID)
|
|
|
|
|
} else {
|
|
|
|
|
offlineUserIDs = append(offlineUserIDs, userID)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
var result []*msggateway.SingleMsgToUserResults
|
|
|
|
|
if len(onlineUserIDs) > 0 {
|
|
|
|
|
var err error
|
|
|
|
|
result, err = c.onlinePusher.GetConnsAndOnlinePush(ctx, msg, pushToUserIDs)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
for _, userID := range offlineUserIDs {
|
|
|
|
|
result = append(result, &msggateway.SingleMsgToUserResults{
|
|
|
|
|
UserID: userID,
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
return result, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *ConsumerHandler) Push2Group(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) {
|
|
|
|
|
log.ZDebug(ctx, "Get super group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID)
|
|
|
|
|
var pushToUserIDs []string
|
|
|
|
@ -199,7 +227,7 @@ func (c *ConsumerHandler) Push2Group(ctx context.Context, groupID string, msg *s
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
wsResults, err := c.onlinePusher.GetConnsAndOnlinePush(ctx, msg, pushToUserIDs)
|
|
|
|
|
wsResults, err := c.GetConnsAndOnlinePush(ctx, msg, pushToUserIDs)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
@ -240,8 +268,7 @@ func (c *ConsumerHandler) Push2Group(ctx context.Context, groupID string, msg *s
|
|
|
|
|
}
|
|
|
|
|
func (c *ConsumerHandler) groupMessagesHandler(ctx context.Context, groupID string, pushToUserIDs *[]string, msg *sdkws.MsgData) (err error) {
|
|
|
|
|
if len(*pushToUserIDs) == 0 {
|
|
|
|
|
//*pushToUserIDs, err = c.groupLocalCache.GetGroupMemberIDs(ctx, groupID)
|
|
|
|
|
*pushToUserIDs, err = c.onlineCache.GetGroupOnline(ctx, groupID) //
|
|
|
|
|
*pushToUserIDs, err = c.groupLocalCache.GetGroupMemberIDs(ctx, groupID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|