fix: api send messages for notification conversation. (#1254)

* fix: to start im or chat, ZooKeeper must be started first.

* fix: msg gateway start output err info

Signed-off-by: Gordon <1432970085@qq.com>

* fix: msg gateway start output err info

Signed-off-by: Gordon <1432970085@qq.com>

* chore: package path changes

Signed-off-by: withchao <993506633@qq.com>

* fix: go mod update

Signed-off-by: Gordon <1432970085@qq.com>

* fix: token update

Signed-off-by: Gordon <1432970085@qq.com>

* chore: package path changes

Signed-off-by: withchao <993506633@qq.com>

* chore: package path changes

Signed-off-by: withchao <993506633@qq.com>

* fix: token update

Signed-off-by: Gordon <1432970085@qq.com>

* fix: token update

Signed-off-by: Gordon <1432970085@qq.com>

* fix: token update

Signed-off-by: Gordon <1432970085@qq.com>

* fix: token update

Signed-off-by: Gordon <1432970085@qq.com>

* fix: token update

Signed-off-by: Gordon <1432970085@qq.com>

* fix: token update

Signed-off-by: Gordon <1432970085@qq.com>

* fix: get all userID

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: msggateway add online status call

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* refactor: log change

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* refactor: log change

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* chore: network mode change

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* cicd: robot automated Change

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* cicd: robot automated Change

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* cicd: robot automated Change

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* feat: add api of get server time

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* feat: remove go work sum

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* cicd: robot automated Change

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* fix: pull message add isRead field

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: check msg-transfer script

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: start don't kill old process

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* cicd: robot automated Change

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* fix: check component

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: pull message set isRead only message come from single.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* cicd: robot automated Change

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* fix: multiple gateway kick user each other.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: multiple gateway kick user each other.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: multiple gateway kick user each other.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: multiple gateway kick user each other.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: multiple gateway kick user each other.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: multiple gateway kick user each other.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* cicd: robot automated Change

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* fix: multiple gateway kick user each other.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: add ex field to update group info.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* cicd: robot automated Change

* cicd: robot automated Change

* refactor: change project module name.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* refactor: change project module name.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* refactor: change project module name.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* cicd: robot automated Change

* test: for pressure test.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* test: for pressure test.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* test: for pressure test.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* test: message log.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* cicd: robot automated Change

* fxi: component check output valid info.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fxi: component check output valid info.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* test: send message test log.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* cicd: robot automated Change

* cicd: robot automated Change

* test: remove info log.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* feat: api of send message add sendTime field.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: add callback for update user's info.

* cicd: robot automated Change

* fix: change callback command name.

* cicd: robot automated Change

* fix: single chat unread status change.

* fix: single chat unread status change.

* fix: single chat unread status change.

* fix: user status change.

* cicd: robot automated Change

* fix: user status change.

* fix: user status change.

* fix: user status change.

* cicd: robot automated Change

* fix: ws close when user logout.

* fix: remove repeat platform on online status.

* cicd: robot automated Change

* fix: api send messages for notification conversation .

* fix: api send messages for notification conversation .

* fix: api send messages for notification conversation .

* fix: api send messages for notification conversation .

* fix: api send messages for notification conversation .

* fix: api send messages for notification conversation.

* fix: api send messages for notification conversation.

* fix: api send messages for notification conversation.

* fix: api send messages for notification conversation.

* fix: api send messages for notification conversation.

* fix: api send messages for notification conversation.

* re: remove router of unsubscribeStatus.

* re: remove router of unsubscribeStatus.

* re: remove router of unsubscribeStatus.

* re: remove router of unsubscribeStatus.

---------

Signed-off-by: Gordon <1432970085@qq.com>
Signed-off-by: withchao <993506633@qq.com>
Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>
Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: withchao <993506633@qq.com>
Co-authored-by: Xinwei Xiong <3293172751NSS@gmail.com>
Co-authored-by: FGadvancer <FGadvancer@users.noreply.github.com>
pull/1282/head
Gordon 1 year ago committed by GitHub
parent 6ffcfbe68e
commit e4f3e34249
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -171,7 +171,7 @@ linters-settings:
# exclude_godoc_examples: false # exclude_godoc_examples: false
funlen: funlen:
lines: 150 lines: 150
statements: 50 statements: 80
gci: gci:
# put imports beginning with prefix after 3rd-party packages; # put imports beginning with prefix after 3rd-party packages;
# only support one prefix # only support one prefix

@ -32,7 +32,6 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
) )
func main() { func main() {
apiCmd := cmd.NewApiCmd() apiCmd := cmd.NewApiCmd()
apiCmd.AddPortFlag() apiCmd.AddPortFlag()

@ -57,6 +57,10 @@ func (m MessageApi) newUserSendMsgReq(c *gin.Context, params *apistruct.SendMsg)
var newContent string var newContent string
options := make(map[string]bool, 5) options := make(map[string]bool, 5)
switch params.ContentType { switch params.ContentType {
case constant.OANotification:
notification := sdkws.NotificationElem{}
notification.Detail = utils.StructToJsonString(params.Content)
newContent = utils.StructToJsonString(&notification)
case constant.Text: case constant.Text:
fallthrough fallthrough
case constant.Picture: case constant.Picture:
@ -69,10 +73,6 @@ func (m MessageApi) newUserSendMsgReq(c *gin.Context, params *apistruct.SendMsg)
fallthrough fallthrough
case constant.File: case constant.File:
fallthrough fallthrough
case constant.CustomNotTriggerConversation:
fallthrough
case constant.CustomOnlineOnly:
fallthrough
default: default:
newContent = utils.StructToJsonString(params.Content) newContent = utils.StructToJsonString(params.Content)
} }
@ -82,11 +82,6 @@ func (m MessageApi) newUserSendMsgReq(c *gin.Context, params *apistruct.SendMsg)
if params.NotOfflinePush { if params.NotOfflinePush {
utils.SetSwitchFromOptions(options, constant.IsOfflinePush, false) utils.SetSwitchFromOptions(options, constant.IsOfflinePush, false)
} }
if params.ContentType == constant.CustomOnlineOnly {
m.SetOptions(options, false)
} else if params.ContentType == constant.CustomNotTriggerConversation {
utils.SetSwitchFromOptions(options, constant.IsConversationUpdate, false)
}
pbData := msg.SendMsgReq{ pbData := msg.SendMsgReq{
MsgData: &sdkws.MsgData{ MsgData: &sdkws.MsgData{
SendID: params.SendID, SendID: params.SendID,
@ -105,14 +100,6 @@ func (m MessageApi) newUserSendMsgReq(c *gin.Context, params *apistruct.SendMsg)
OfflinePushInfo: params.OfflinePushInfo, OfflinePushInfo: params.OfflinePushInfo,
}, },
} }
//if params.ContentType == constant.OANotification {
// var tips sdkws.TipsComm
// tips.JsonDetail = utils.StructToJsonString(params.Content)
// pbData.MsgData.Content, err = proto.Marshal(&tips)
// if err != nil {
// log.ZError(c, "Marshal failed ", err, "tips", tips.String())
// }
//}
return &pbData return &pbData
} }
@ -180,15 +167,13 @@ func (m *MessageApi) getSendMsgReq(c *gin.Context, req apistruct.SendMsg) (sendM
data = apistruct.FileElem{} data = apistruct.FileElem{}
case constant.Custom: case constant.Custom:
data = apistruct.CustomElem{} data = apistruct.CustomElem{}
case constant.Revoke:
data = apistruct.RevokeElem{}
case constant.OANotification: case constant.OANotification:
data = apistruct.OANotificationElem{} data = apistruct.OANotificationElem{}
req.SessionType = constant.NotificationChatType req.SessionType = constant.NotificationChatType
case constant.CustomNotTriggerConversation: if !authverify.IsManagerUserID(req.SendID) {
data = apistruct.CustomElem{} return nil, errs.ErrNoPermission.
case constant.CustomOnlineOnly: Wrap("only app manager can as sender send OANotificationElem")
data = apistruct.CustomElem{} }
default: default:
return nil, errs.ErrArgs.WithDetail("not support err contentType") return nil, errs.ErrArgs.WithDetail("not support err contentType")
} }
@ -212,7 +197,6 @@ func (m *MessageApi) SendMessage(c *gin.Context) {
apiresp.GinError(c, errs.ErrNoPermission.Wrap("only app manager can send message")) apiresp.GinError(c, errs.ErrNoPermission.Wrap("only app manager can send message"))
return return
} }
sendMsgReq, err := m.getSendMsgReq(c, req.SendMsg) sendMsgReq, err := m.getSendMsgReq(c, req.SendMsg)
if err != nil { if err != nil {
log.ZError(c, "decodeData failed", err) log.ZError(c, "decodeData failed", err)

@ -83,7 +83,6 @@ func NewGinRouter(discov discoveryregistry.SvcDiscoveryRegistry, rdb redis.Unive
userRouterGroup.POST("/get_users_online_status", ParseToken, u.GetUsersOnlineStatus) userRouterGroup.POST("/get_users_online_status", ParseToken, u.GetUsersOnlineStatus)
userRouterGroup.POST("/get_users_online_token_detail", ParseToken, u.GetUsersOnlineTokenDetail) userRouterGroup.POST("/get_users_online_token_detail", ParseToken, u.GetUsersOnlineTokenDetail)
userRouterGroup.POST("/subscribe_users_status", ParseToken, u.SubscriberStatus) userRouterGroup.POST("/subscribe_users_status", ParseToken, u.SubscriberStatus)
userRouterGroup.POST("/unsubscribe_users_status", ParseToken, u.UnSubscriberStatus)
userRouterGroup.POST("/get_users_status", ParseToken, u.GetUserStatus) userRouterGroup.POST("/get_users_status", ParseToken, u.GetUserStatus)
userRouterGroup.POST("/get_subscribe_users_status", ParseToken, u.GetSubscribeUsersStatus) userRouterGroup.POST("/get_subscribe_users_status", ParseToken, u.GetSubscribeUsersStatus)
} }

@ -190,11 +190,6 @@ func (u *UserApi) SubscriberStatus(c *gin.Context) {
a2r.Call(user.UserClient.SubscribeOrCancelUsersStatus, u.Client, c) a2r.Call(user.UserClient.SubscribeOrCancelUsersStatus, u.Client, c)
} }
// UnSubscriberStatus Unsubscribe a user's presence.
func (u *UserApi) UnSubscriberStatus(c *gin.Context) {
a2r.Call(user.UserClient.SubscribeOrCancelUsersStatus, u.Client, c)
}
// GetUserStatus Get the online status of the user. // GetUserStatus Get the online status of the user.
func (u *UserApi) GetUserStatus(c *gin.Context) { func (u *UserApi) GetUserStatus(c *gin.Context) {
a2r.Call(user.UserClient.GetUserStatus, u.Client, c) a2r.Call(user.UserClient.GetUserStatus, u.Client, c)

@ -16,12 +16,13 @@ package msgtransfer
import ( import (
"fmt" "fmt"
"sync" "sync"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
"github.com/openimsdk/open-im-server/v3/pkg/common/discovery_register"
"github.com/OpenIMSDK/tools/mw" "github.com/OpenIMSDK/tools/mw"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
@ -30,7 +31,6 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/db/relation" "github.com/openimsdk/open-im-server/v3/pkg/common/db/relation"
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation" "github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation"
"github.com/openimsdk/open-im-server/v3/pkg/common/discovery_register"
"github.com/openimsdk/open-im-server/v3/pkg/common/prome" "github.com/openimsdk/open-im-server/v3/pkg/common/prome"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
) )

@ -283,20 +283,30 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(
return return
} }
if isNewConversation { if isNewConversation {
if storageList[0].SessionType == constant.SuperGroupChatType { switch storageList[0].SessionType {
log.ZInfo(ctx, "group chat first create conversation", "conversationID", conversationID) case constant.SuperGroupChatType:
log.ZInfo(ctx, "group chat first create conversation", "conversationID",
conversationID)
userIDs, err := och.groupRpcClient.GetGroupMemberIDs(ctx, storageList[0].GroupID) userIDs, err := och.groupRpcClient.GetGroupMemberIDs(ctx, storageList[0].GroupID)
if err != nil { if err != nil {
log.ZWarn(ctx, "get group member ids error", err, "conversationID", conversationID) log.ZWarn(ctx, "get group member ids error", err, "conversationID",
conversationID)
} else { } else {
if err := och.conversationRpcClient.GroupChatFirstCreateConversation(ctx, storageList[0].GroupID, userIDs); err != nil { if err := och.conversationRpcClient.GroupChatFirstCreateConversation(ctx,
log.ZWarn(ctx, "single chat first create conversation error", err, "conversationID", conversationID) storageList[0].GroupID, userIDs); err != nil {
log.ZWarn(ctx, "single chat first create conversation error", err,
"conversationID", conversationID)
} }
} }
} else { case constant.SingleChatType, constant.NotificationChatType:
if err := och.conversationRpcClient.SingleChatFirstCreateConversation(ctx, storageList[0].RecvID, storageList[0].SendID); err != nil { if err := och.conversationRpcClient.SingleChatFirstCreateConversation(ctx, storageList[0].RecvID,
log.ZWarn(ctx, "single chat first create conversation error", err, "conversationID", conversationID) storageList[0].SendID, conversationID, storageList[0].SessionType); err != nil {
log.ZWarn(ctx, "single chat or notification first create conversation error", err,
"conversationID", conversationID, "sessionType", storageList[0].SessionType)
} }
default:
log.ZWarn(ctx, "unknown session type", nil, "sessionType",
storageList[0].SessionType)
} }
} }

@ -62,7 +62,7 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(
log.ZError(ctx, "msgFromMQ.MsgData is empty", nil, "cMsg", cMsg) log.ZError(ctx, "msgFromMQ.MsgData is empty", nil, "cMsg", cMsg)
return return
} }
log.ZInfo(ctx, "mongo consumer recv msg", "msgs", msgFromMQ.MsgData) log.ZInfo(ctx, "mongo consumer recv msg", "msgs", msgFromMQ.String())
err = mc.msgDatabase.BatchInsertChat2DB(ctx, msgFromMQ.ConversationID, msgFromMQ.MsgData, msgFromMQ.LastSeq) err = mc.msgDatabase.BatchInsertChat2DB(ctx, msgFromMQ.ConversationID, msgFromMQ.MsgData, msgFromMQ.LastSeq)
if err != nil { if err != nil {
log.ZError( log.ZError(

@ -58,6 +58,7 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) {
} }
sec := msgFromMQ.MsgData.SendTime / 1000 sec := msgFromMQ.MsgData.SendTime / 1000
nowSec := utils.GetCurrentTimestampBySecond() nowSec := utils.GetCurrentTimestampBySecond()
log.ZDebug(ctx, "push msg", "msg", pbData.String(), "sec", sec, "nowSec", nowSec)
if nowSec-sec > 10 { if nowSec-sec > 10 {
return return
} }

@ -126,13 +126,12 @@ func (p *Pusher) Push2User(ctx context.Context, userIDs []string, msg *sdkws.Msg
} }
func (p *Pusher) UnmarshalNotificationElem(bytes []byte, t interface{}) error { func (p *Pusher) UnmarshalNotificationElem(bytes []byte, t interface{}) error {
var notificationElem struct { var notification sdkws.NotificationElem
Detail string `json:"detail,omitempty"` if err := json.Unmarshal(bytes, &notification); err != nil {
}
if err := json.Unmarshal(bytes, &notificationElem); err != nil {
return err return err
} }
return json.Unmarshal([]byte(notificationElem.Detail), t)
return json.Unmarshal([]byte(notification.Detail), t)
} }
func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) { func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) {

@ -17,8 +17,6 @@ package conversation
import ( import (
"context" "context"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"google.golang.org/grpc" "google.golang.org/grpc"
"github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/protocol/constant"
@ -114,7 +112,10 @@ func (c *conversationServer) SetConversation(ctx context.Context, req *pbconvers
return resp, nil return resp, nil
} }
func (c *conversationServer) SetConversations(ctx context.Context, req *pbconversation.SetConversationsReq) (*pbconversation.SetConversationsResp, error) { //nolint
func (c *conversationServer) SetConversations(ctx context.Context,
req *pbconversation.SetConversationsReq,
) (*pbconversation.SetConversationsResp, error) {
if req.Conversation == nil { if req.Conversation == nil {
return nil, errs.ErrArgs.Wrap("conversation must not be nil") return nil, errs.ErrArgs.Wrap("conversation must not be nil")
} }
@ -124,14 +125,8 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver
return nil, err return nil, err
} }
if groupInfo.Status == constant.GroupStatusDismissed { if groupInfo.Status == constant.GroupStatusDismissed {
return nil, err return nil, errs.ErrDismissedAlready.Wrap("group dismissed")
} }
// for _, userID := range req.UserIDs {
// if _, err := c.groupRpcClient.GetGroupMemberCache(ctx, req.Conversation.GroupID, userID); err != nil {
// log.ZError(ctx, "user not in group", err, "userID", userID, "groupID", req.Conversation.GroupID)
// return nil, err
// }
// }
} }
var unequal int var unequal int
var conv tablerelation.ConversationModel var conv tablerelation.ConversationModel
@ -205,7 +200,14 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver
return nil, err return nil, err
} }
for _, userID := range req.UserIDs { for _, userID := range req.UserIDs {
c.conversationNotificationSender.ConversationSetPrivateNotification(ctx, userID, req.Conversation.UserID, req.Conversation.IsPrivateChat.Value, req.Conversation.ConversationID) err := c.conversationNotificationSender.ConversationSetPrivateNotification(ctx, userID, req.Conversation.UserID,
req.Conversation.IsPrivateChat.Value, req.Conversation.ConversationID)
if err != nil {
log.ZWarn(ctx, "send conversation set private notification failed", err,
"userID", userID, "conversationID", req.Conversation.ConversationID)
continue
}
} }
} }
if req.Conversation.BurnDuration != nil { if req.Conversation.BurnDuration != nil {
@ -235,24 +237,40 @@ func (c *conversationServer) GetRecvMsgNotNotifyUserIDs(ctx context.Context, req
} }
// create conversation without notification for msg redis transfer. // create conversation without notification for msg redis transfer.
func (c *conversationServer) CreateSingleChatConversations(ctx context.Context, req *pbconversation.CreateSingleChatConversationsReq) (*pbconversation.CreateSingleChatConversationsResp, error) { func (c *conversationServer) CreateSingleChatConversations(ctx context.Context,
var conversation tablerelation.ConversationModel req *pbconversation.CreateSingleChatConversationsReq,
conversation.ConversationID = msgprocessor.GetConversationIDBySessionType(constant.SingleChatType, req.RecvID, req.SendID) ) (*pbconversation.CreateSingleChatConversationsResp, error) {
conversation.ConversationType = constant.SingleChatType switch req.ConversationType {
conversation.OwnerUserID = req.SendID case constant.SingleChatType:
conversation.UserID = req.RecvID var conversation tablerelation.ConversationModel
err := c.conversationDatabase.CreateConversation(ctx, []*tablerelation.ConversationModel{&conversation}) conversation.ConversationID = req.ConversationID
if err != nil { conversation.ConversationType = req.ConversationType
log.ZWarn(ctx, "create conversation failed", err, "conversation", conversation) conversation.OwnerUserID = req.SendID
} conversation.UserID = req.RecvID
err := c.conversationDatabase.CreateConversation(ctx, []*tablerelation.ConversationModel{&conversation})
if err != nil {
log.ZWarn(ctx, "create conversation failed", err, "conversation", conversation)
}
conversation2 := conversation conversation2 := conversation
conversation2.OwnerUserID = req.RecvID conversation2.OwnerUserID = req.RecvID
conversation2.UserID = req.SendID conversation2.UserID = req.SendID
err = c.conversationDatabase.CreateConversation(ctx, []*tablerelation.ConversationModel{&conversation2}) err = c.conversationDatabase.CreateConversation(ctx, []*tablerelation.ConversationModel{&conversation2})
if err != nil { if err != nil {
log.ZWarn(ctx, "create conversation failed", err, "conversation2", conversation) log.ZWarn(ctx, "create conversation failed", err, "conversation2", conversation)
}
case constant.NotificationChatType:
var conversation tablerelation.ConversationModel
conversation.ConversationID = req.ConversationID
conversation.ConversationType = req.ConversationType
conversation.OwnerUserID = req.RecvID
conversation.UserID = req.SendID
err := c.conversationDatabase.CreateConversation(ctx, []*tablerelation.ConversationModel{&conversation})
if err != nil {
log.ZWarn(ctx, "create conversation failed", err, "conversation2", conversation)
}
} }
return &pbconversation.CreateSingleChatConversationsResp{}, nil return &pbconversation.CreateSingleChatConversationsResp{}, nil
} }

@ -252,7 +252,8 @@ func (s *friendServer) GetDesignatedFriends(
return resp, nil return resp, nil
} }
func (s *friendServer) GetDesignatedFriendsApply(ctx context.Context, req *pbfriend.GetDesignatedFriendsApplyReq) (resp *pbfriend.GetDesignatedFriendsApplyResp, err error) { func (s *friendServer) GetDesignatedFriendsApply(ctx context.Context,
req *pbfriend.GetDesignatedFriendsApplyReq) (resp *pbfriend.GetDesignatedFriendsApplyResp, err error) {
friendRequests, err := s.friendDatabase.FindBothFriendRequests(ctx, req.FromUserID, req.ToUserID) friendRequests, err := s.friendDatabase.FindBothFriendRequests(ctx, req.FromUserID, req.ToUserID)
if err != nil { if err != nil {
return nil, err return nil, err

@ -147,7 +147,6 @@ func (m *msgServer) MarkConversationAsRead(
for i := hasReadSeq + 1; i <= req.HasReadSeq; i++ { for i := hasReadSeq + 1; i <= req.HasReadSeq; i++ {
seqs = append(seqs, i) seqs = append(seqs, i)
} }
if len(seqs) > 0 { if len(seqs) > 0 {
log.ZDebug(ctx, "MarkConversationAsRead", "seqs", seqs, "conversationID", req.ConversationID) log.ZDebug(ctx, "MarkConversationAsRead", "seqs", seqs, "conversationID", req.ConversationID)
if err = m.MsgDatabase.MarkSingleChatMsgsAsRead(ctx, req.UserID, req.ConversationID, seqs); err != nil { if err = m.MsgDatabase.MarkSingleChatMsgsAsRead(ctx, req.UserID, req.ConversationID, seqs); err != nil {
@ -165,7 +164,9 @@ func (m *msgServer) MarkConversationAsRead(
m.conversationAndGetRecvID(conversation, req.UserID), seqs, hasReadSeq); err != nil { m.conversationAndGetRecvID(conversation, req.UserID), seqs, hasReadSeq); err != nil {
return nil, err return nil, err
} }
} else if conversation.ConversationType == constant.SuperGroupChatType {
} else if conversation.ConversationType == constant.SuperGroupChatType ||
conversation.ConversationType == constant.NotificationChatType {
if req.HasReadSeq > hasReadSeq { if req.HasReadSeq > hasReadSeq {
err = m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq) err = m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq)
if err != nil { if err != nil {

@ -23,6 +23,8 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
"github.com/openimsdk/open-im-server/v3/pkg/common/discovery_register"
"github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/log" "github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/mcontext" "github.com/OpenIMSDK/tools/mcontext"
@ -35,7 +37,6 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/relation" "github.com/openimsdk/open-im-server/v3/pkg/common/db/relation"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation" "github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation"
"github.com/openimsdk/open-im-server/v3/pkg/common/discovery_register"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification"
) )
@ -77,7 +78,8 @@ func InitMsgTool() (*MsgTool, error) {
/* /*
discov, err := zookeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, discov, err := zookeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema,
zookeeper.WithFreq(time.Hour), zookeeper.WithRoundRobin(), zookeeper.WithUserNameAndPassword(config.Config.Zookeeper.Username, zookeeper.WithFreq(time.Hour), zookeeper.WithRoundRobin(), zookeeper.WithUserNameAndPassword(config.Config.Zookeeper.Username,
config.Config.Zookeeper.Password), zookeeper.WithTimeout(10), zookeeper.WithLogger(log.NewZkLogger()))*/if err != nil { config.Config.Zookeeper.Password), zookeeper.WithTimeout(10), zookeeper.WithLogger(log.NewZkLogger()))*/
if err != nil {
return nil, err return nil, err
} }
discov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials())) discov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()))

@ -41,7 +41,7 @@ type SendMsgReq struct {
type BatchSendMsgReq struct { type BatchSendMsgReq struct {
SendMsg SendMsg
IsSendAll bool `json:"isSendAll"` IsSendAll bool `json:"isSendAll"`
RecvIDs []string `json:"recvIDs"` RecvIDs []string `json:"recvIDs" binding:"required"`
} }
type BatchSendMsgResp struct { type BatchSendMsgResp struct {

@ -39,7 +39,6 @@ func NewConversation(discov discoveryregistry.SvcDiscoveryRegistry) *Conversatio
panic(err) panic(err)
} }
client := pbconversation.NewConversationClient(conn) client := pbconversation.NewConversationClient(conn)
return &Conversation{discov: discov, conn: conn, Client: client} return &Conversation{discov: discov, conn: conn, Client: client}
} }
@ -57,13 +56,17 @@ func (c *ConversationRpcClient) GetSingleConversationRecvMsgOpt(ctx context.Cont
if err != nil { if err != nil {
return 0, err return 0, err
} }
return conversation.GetConversation().RecvMsgOpt, err return conversation.GetConversation().RecvMsgOpt, err
} }
func (c *ConversationRpcClient) SingleChatFirstCreateConversation(ctx context.Context, recvID, sendID string) error { func (c *ConversationRpcClient) SingleChatFirstCreateConversation(ctx context.Context, recvID, sendID,
_, err := c.Client.CreateSingleChatConversations(ctx, &pbconversation.CreateSingleChatConversationsReq{RecvID: recvID, SendID: sendID}) conversationID string, conversationType int32,
) error {
_, err := c.Client.CreateSingleChatConversations(ctx,
&pbconversation.CreateSingleChatConversationsReq{
RecvID: recvID, SendID: sendID, ConversationID: conversationID,
ConversationType: conversationType,
})
return err return err
} }

@ -221,12 +221,13 @@ func WithRpcGetUserName() NotificationOptions {
} }
} }
func (s *NotificationSender) NotificationWithSesstionType(ctx context.Context, sendID, recvID string, contentType, sesstionType int32, m proto.Message, opts ...NotificationOptions) (err error) { func (s *NotificationSender) NotificationWithSesstionType(ctx context.Context, sendID, recvID string,
contentType, sesstionType int32, m proto.Message, opts ...NotificationOptions) (err error) {
n := sdkws.NotificationElem{Detail: utils.StructToJsonString(m)} n := sdkws.NotificationElem{Detail: utils.StructToJsonString(m)}
content, err := json.Marshal(&n) content, err := json.Marshal(&n)
if err != nil { if err != nil {
log.ZError(ctx, "MsgClient Notification json.Marshal failed", err, "sendID", sendID, "recvID", recvID, "contentType", contentType, "msg", m) log.ZError(ctx, "MsgClient Notification json.Marshal failed", err, "sendID",
sendID, "recvID", recvID, "contentType", contentType, "msg", m)
return err return err
} }
notificationOpt := &notificationOpt{} notificationOpt := &notificationOpt{}
@ -235,8 +236,8 @@ func (s *NotificationSender) NotificationWithSesstionType(ctx context.Context, s
} }
var req msg.SendMsgReq var req msg.SendMsgReq
var msg sdkws.MsgData var msg sdkws.MsgData
var userInfo *sdkws.UserInfo
if notificationOpt.WithRpcGetUsername && s.getUserInfo != nil { if notificationOpt.WithRpcGetUsername && s.getUserInfo != nil {
var userInfo *sdkws.UserInfo
userInfo, err = s.getUserInfo(ctx, sendID) userInfo, err = s.getUserInfo(ctx, sendID)
if err != nil { if err != nil {
log.ZWarn(ctx, "getUserInfo failed", err, "sendID", sendID) log.ZWarn(ctx, "getUserInfo failed", err, "sendID", sendID)
@ -259,7 +260,7 @@ func (s *NotificationSender) NotificationWithSesstionType(ctx context.Context, s
msg.CreateTime = utils.GetCurrentTimestampByMill() msg.CreateTime = utils.GetCurrentTimestampByMill()
msg.ClientMsgID = utils.GetMsgID(sendID) msg.ClientMsgID = utils.GetMsgID(sendID)
optionsConfig := s.contentTypeConf[contentType] optionsConfig := s.contentTypeConf[contentType]
if sesstionType == constant.SuperGroupChatType && contentType == constant.HasReadReceipt { if sendID == recvID && contentType == constant.HasReadReceipt {
optionsConfig.ReliabilityLevel = constant.UnreliableNotification optionsConfig.ReliabilityLevel = constant.UnreliableNotification
} }
options := config.GetOptionsByNotification(optionsConfig) options := config.GetOptionsByNotification(optionsConfig)

Loading…
Cancel
Save