fix: single chat unread status change. (#1171)

* fix: to start im or chat, ZooKeeper must be started first.

* fix: msg gateway start output err info

Signed-off-by: Gordon <1432970085@qq.com>

* fix: msg gateway start output err info

Signed-off-by: Gordon <1432970085@qq.com>

* chore: package path changes

Signed-off-by: withchao <993506633@qq.com>

* fix: go mod update

Signed-off-by: Gordon <1432970085@qq.com>

* fix: token update

Signed-off-by: Gordon <1432970085@qq.com>

* chore: package path changes

Signed-off-by: withchao <993506633@qq.com>

* chore: package path changes

Signed-off-by: withchao <993506633@qq.com>

* fix: token update

Signed-off-by: Gordon <1432970085@qq.com>

* fix: token update

Signed-off-by: Gordon <1432970085@qq.com>

* fix: token update

Signed-off-by: Gordon <1432970085@qq.com>

* fix: token update

Signed-off-by: Gordon <1432970085@qq.com>

* fix: token update

Signed-off-by: Gordon <1432970085@qq.com>

* fix: token update

Signed-off-by: Gordon <1432970085@qq.com>

* fix: get all userID

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: msggateway add online status call

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* refactor: log change

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* refactor: log change

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* chore: network mode change

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* cicd: robot automated Change

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* cicd: robot automated Change

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* cicd: robot automated Change

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* feat: add api of get server time

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* feat: remove go work sum

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* cicd: robot automated Change

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* fix: pull message add isRead field

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: check msg-transfer script

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: start don't kill old process

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* cicd: robot automated Change

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* fix: check component

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: pull message set isRead only message come from single.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* cicd: robot automated Change

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* fix: multiple gateway kick user each other.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: multiple gateway kick user each other.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: multiple gateway kick user each other.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: multiple gateway kick user each other.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: multiple gateway kick user each other.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: multiple gateway kick user each other.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* cicd: robot automated Change

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* fix: multiple gateway kick user each other.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: add ex field to update group info.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* cicd: robot automated Change

* cicd: robot automated Change

* refactor: change project module name.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* refactor: change project module name.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* refactor: change project module name.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* cicd: robot automated Change

* test: for pressure test.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* test: for pressure test.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* test: for pressure test.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* test: message log.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* cicd: robot automated Change

* fxi: component check output valid info.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fxi: component check output valid info.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* test: send message test log.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* cicd: robot automated Change

* cicd: robot automated Change

* test: remove info log.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* feat: api of send message add sendTime field.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: add callback for update user's info.

* cicd: robot automated Change

* fix: change callback command name.

* cicd: robot automated Change

* fix: single chat unread status change.

* fix: single chat unread status change.

* fix: single chat unread status change.

* fix: user status change.

* cicd: robot automated Change

* fix: user status change.

* fix: user status change.

* fix: user status change.

* cicd: robot automated Change

* fix: ws close when user logout.

---------

Signed-off-by: Gordon <1432970085@qq.com>
Signed-off-by: withchao <993506633@qq.com>
Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>
Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: withchao <993506633@qq.com>
Co-authored-by: Xinwei Xiong <3293172751NSS@gmail.com>
Co-authored-by: FGadvancer <FGadvancer@users.noreply.github.com>
pull/1219/head v3.3.3-beta.2
Gordon 1 year ago committed by GitHub
parent b783f0eb98
commit 8e6430625c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -37,7 +37,7 @@ require github.com/google/uuid v1.3.1
require ( require (
github.com/IBM/sarama v1.41.2 github.com/IBM/sarama v1.41.2
github.com/OpenIMSDK/protocol v0.0.24 github.com/OpenIMSDK/protocol v0.0.25
github.com/OpenIMSDK/tools v0.0.14 github.com/OpenIMSDK/tools v0.0.14
github.com/aliyun/aliyun-oss-go-sdk v2.2.9+incompatible github.com/aliyun/aliyun-oss-go-sdk v2.2.9+incompatible
github.com/go-redis/redis v6.15.9+incompatible github.com/go-redis/redis v6.15.9+incompatible

@ -18,8 +18,8 @@ firebase.google.com/go v3.13.0+incompatible/go.mod h1:xlah6XbEyW6tbfSklcfe5FHJIw
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/IBM/sarama v1.41.2 h1:ZDBZfGPHAD4uuAtSv4U22fRZBgst0eEwGFzLj0fb85c= github.com/IBM/sarama v1.41.2 h1:ZDBZfGPHAD4uuAtSv4U22fRZBgst0eEwGFzLj0fb85c=
github.com/IBM/sarama v1.41.2/go.mod h1:xdpu7sd6OE1uxNdjYTSKUfY8FaKkJES9/+EyjSgiGQk= github.com/IBM/sarama v1.41.2/go.mod h1:xdpu7sd6OE1uxNdjYTSKUfY8FaKkJES9/+EyjSgiGQk=
github.com/OpenIMSDK/protocol v0.0.24 h1:wk/S0GOGVh8mBbpmjKxSsyYMhyBazdn/ZcS9VqXfT24= github.com/OpenIMSDK/protocol v0.0.25 h1:AtB0Ia5LO26oqPoPJDIS4UMH3Wb2li96fMgfzI2cr4I=
github.com/OpenIMSDK/protocol v0.0.24/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= github.com/OpenIMSDK/protocol v0.0.25/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y=
github.com/OpenIMSDK/tools v0.0.14 h1:WLof/+WxyPyRST+QkoTKubYCiV73uCLiL8pgnpH/yKQ= github.com/OpenIMSDK/tools v0.0.14 h1:WLof/+WxyPyRST+QkoTKubYCiV73uCLiL8pgnpH/yKQ=
github.com/OpenIMSDK/tools v0.0.14/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI= github.com/OpenIMSDK/tools v0.0.14/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI=
github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM= github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM=

@ -207,8 +207,8 @@ func (c *Client) handleMessage(message []byte) error {
binaryReq.ReqIdentifier, binaryReq.ReqIdentifier,
) )
} }
c.replyMessage(ctx, &binaryReq, messageErr, resp)
return nil return c.replyMessage(ctx, &binaryReq, messageErr, resp)
} }
func (c *Client) setAppBackgroundStatus(ctx context.Context, req Req) ([]byte, error) { func (c *Client) setAppBackgroundStatus(ctx context.Context, req Req) ([]byte, error) {
@ -229,7 +229,7 @@ func (c *Client) close() {
c.longConnServer.UnRegister(c) c.longConnServer.UnRegister(c)
} }
func (c *Client) replyMessage(ctx context.Context, binaryReq *Req, err error, resp []byte) { func (c *Client) replyMessage(ctx context.Context, binaryReq *Req, err error, resp []byte) error {
errResp := apiresp.ParseError(err) errResp := apiresp.ParseError(err)
mReply := Resp{ mReply := Resp{
ReqIdentifier: binaryReq.ReqIdentifier, ReqIdentifier: binaryReq.ReqIdentifier,
@ -244,6 +244,10 @@ func (c *Client) replyMessage(ctx context.Context, binaryReq *Req, err error, re
if err != nil { if err != nil {
log.ZWarn(ctx, "wireBinaryMsg replyMessage", err, "resp", mReply.String()) log.ZWarn(ctx, "wireBinaryMsg replyMessage", err, "resp", mReply.String())
} }
if binaryReq.ReqIdentifier == WsLogoutMsg {
return errors.New("user logout")
}
return nil
} }
func (c *Client) PushMessage(ctx context.Context, msgData *sdkws.MsgData) error { func (c *Client) PushMessage(ctx context.Context, msgData *sdkws.MsgData) error {

@ -81,7 +81,8 @@ func (m *msgServer) SetConversationHasReadSeq(
if err := m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq); err != nil { if err := m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq); err != nil {
return nil, err return nil, err
} }
if err = m.sendMarkAsReadNotification(ctx, req.ConversationID, constant.SingleChatType, req.UserID, req.UserID, nil, req.HasReadSeq); err != nil { if err = m.sendMarkAsReadNotification(ctx, req.ConversationID, constant.SingleChatType, req.UserID,
req.UserID, nil, req.HasReadSeq); err != nil {
return return
} }
return &msg.SetConversationHasReadSeqResp{}, nil return &msg.SetConversationHasReadSeqResp{}, nil
@ -119,7 +120,8 @@ func (m *msgServer) MarkMsgsAsRead(
return return
} }
} }
if err = m.sendMarkAsReadNotification(ctx, req.ConversationID, conversation.ConversationType, req.UserID, m.conversationAndGetRecvID(conversation, req.UserID), req.Seqs, hasReadSeq); err != nil { if err = m.sendMarkAsReadNotification(ctx, req.ConversationID, conversation.ConversationType, req.UserID,
m.conversationAndGetRecvID(conversation, req.UserID), req.Seqs, hasReadSeq); err != nil {
return return
} }
return &msg.MarkMsgsAsReadResp{}, nil return &msg.MarkMsgsAsReadResp{}, nil
@ -131,44 +133,61 @@ func (m *msgServer) MarkConversationAsRead(
) (resp *msg.MarkConversationAsReadResp, err error) { ) (resp *msg.MarkConversationAsReadResp, err error) {
conversation, err := m.Conversation.GetConversation(ctx, req.UserID, req.ConversationID) conversation, err := m.Conversation.GetConversation(ctx, req.UserID, req.ConversationID)
if err != nil { if err != nil {
return return nil, err
} }
hasReadSeq, err := m.MsgDatabase.GetHasReadSeq(ctx, req.UserID, req.ConversationID) hasReadSeq, err := m.MsgDatabase.GetHasReadSeq(ctx, req.UserID, req.ConversationID)
if err != nil && errs.Unwrap(err) != redis.Nil { if err != nil && errs.Unwrap(err) != redis.Nil {
return return nil, err
} }
log.ZDebug(ctx, "MarkConversationAsRead", "hasReadSeq", hasReadSeq, "req.HasReadSeq", req.HasReadSeq)
var seqs []int64 var seqs []int64
if len(req.Seqs) == 0 {
log.ZDebug(ctx, "MarkConversationAsRead", "hasReadSeq", hasReadSeq,
"req.HasReadSeq", req.HasReadSeq)
if conversation.ConversationType == constant.SingleChatType {
for i := hasReadSeq + 1; i <= req.HasReadSeq; i++ { for i := hasReadSeq + 1; i <= req.HasReadSeq; i++ {
seqs = append(seqs, i) seqs = append(seqs, i)
} }
} else {
seqs = req.Seqs
}
if len(seqs) > 0 { if len(seqs) > 0 {
log.ZDebug(ctx, "MarkConversationAsRead", "seqs", seqs, "conversationID", req.ConversationID) log.ZDebug(ctx, "MarkConversationAsRead", "seqs", seqs, "conversationID", req.ConversationID)
if err = m.MsgDatabase.MarkSingleChatMsgsAsRead(ctx, req.UserID, req.ConversationID, seqs); err != nil { if err = m.MsgDatabase.MarkSingleChatMsgsAsRead(ctx, req.UserID, req.ConversationID, seqs); err != nil {
return return nil, err
} }
} }
if req.HasReadSeq > hasReadSeq { if req.HasReadSeq > hasReadSeq {
err = m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq) err = m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq)
if err != nil { if err != nil {
return return nil, err
} }
hasReadSeq = req.HasReadSeq hasReadSeq = req.HasReadSeq
} }
if err = m.sendMarkAsReadNotification(ctx, req.ConversationID, conversation.ConversationType, req.UserID, m.conversationAndGetRecvID(conversation, req.UserID), seqs, hasReadSeq); err != nil { if err = m.sendMarkAsReadNotification(ctx, req.ConversationID, conversation.ConversationType, req.UserID,
return m.conversationAndGetRecvID(conversation, req.UserID), seqs, hasReadSeq); err != nil {
return nil, err
} }
} else if conversation.ConversationType == constant.SuperGroupChatType {
if req.HasReadSeq > hasReadSeq {
err = m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq)
if err != nil {
return nil, err
}
hasReadSeq = req.HasReadSeq
}
if err = m.sendMarkAsReadNotification(ctx, req.ConversationID, constant.SingleChatType, req.UserID,
req.UserID, seqs, hasReadSeq); err != nil {
return nil, err
}
}
return &msg.MarkConversationAsReadResp{}, nil return &msg.MarkConversationAsReadResp{}, nil
} }
func (m *msgServer) sendMarkAsReadNotification( func (m *msgServer) sendMarkAsReadNotification(
ctx context.Context, ctx context.Context,
conversationID string, conversationID string,
sesstionType int32, sessionType int32,
sendID, recvID string, sendID, recvID string,
seqs []int64, seqs []int64,
hasReadSeq int64, hasReadSeq int64,
@ -179,6 +198,9 @@ func (m *msgServer) sendMarkAsReadNotification(
Seqs: seqs, Seqs: seqs,
HasReadSeq: hasReadSeq, HasReadSeq: hasReadSeq,
} }
m.notificationSender.NotificationWithSesstionType(ctx, sendID, recvID, constant.HasReadReceipt, sesstionType, tips) err := m.notificationSender.NotificationWithSesstionType(ctx, sendID, recvID, constant.HasReadReceipt, sessionType, tips)
if err != nil {
log.ZWarn(ctx, "send has read Receipt err", err)
}
return nil return nil
} }

@ -289,7 +289,8 @@ func (s *userServer) SubscribeOrCancelUsersStatus(ctx context.Context, req *pbus
} }
// GetUserStatus Get the online status of the user. // GetUserStatus Get the online status of the user.
func (s *userServer) GetUserStatus(ctx context.Context, req *pbuser.GetUserStatusReq) (resp *pbuser.GetUserStatusResp, err error) { func (s *userServer) GetUserStatus(ctx context.Context, req *pbuser.GetUserStatusReq) (resp *pbuser.GetUserStatusResp,
err error) {
onlineStatusList, err := s.UserDatabase.GetUserStatus(ctx, req.UserIDs) onlineStatusList, err := s.UserDatabase.GetUserStatus(ctx, req.UserIDs)
if err != nil { if err != nil {
return nil, err return nil, err
@ -298,31 +299,32 @@ func (s *userServer) GetUserStatus(ctx context.Context, req *pbuser.GetUserStatu
} }
// SetUserStatus Synchronize user's online status. // SetUserStatus Synchronize user's online status.
func (s *userServer) SetUserStatus(ctx context.Context, req *pbuser.SetUserStatusReq) (resp *pbuser.SetUserStatusResp, err error) { func (s *userServer) SetUserStatus(ctx context.Context, req *pbuser.SetUserStatusReq) (resp *pbuser.SetUserStatusResp,
err = s.UserDatabase.SetUserStatus(ctx, req.StatusList) err error) {
err = s.UserDatabase.SetUserStatus(ctx, req.UserID, req.Status, req.PlatformID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
for _, value := range req.StatusList { list, err := s.UserDatabase.GetSubscribedList(ctx, req.UserID)
list, err := s.UserDatabase.GetSubscribedList(ctx, value.UserID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
for _, userID := range list { for _, userID := range list {
tips := &sdkws.UserStatusChangeTips{ tips := &sdkws.UserStatusChangeTips{
FromUserID: value.UserID, FromUserID: req.UserID,
ToUserID: userID, ToUserID: userID,
Status: value.Status, Status: req.Status,
PlatformID: value.PlatformIDs[0], PlatformID: req.PlatformID,
} }
s.userNotificationSender.UserStatusChangeNotification(ctx, tips) s.userNotificationSender.UserStatusChangeNotification(ctx, tips)
} }
}
return &pbuser.SetUserStatusResp{}, nil return &pbuser.SetUserStatusResp{}, nil
} }
// GetSubscribeUsersStatus Get the online status of subscribers. // GetSubscribeUsersStatus Get the online status of subscribers.
func (s *userServer) GetSubscribeUsersStatus(ctx context.Context, req *pbuser.GetSubscribeUsersStatusReq) (*pbuser.GetSubscribeUsersStatusResp, error) { func (s *userServer) GetSubscribeUsersStatus(ctx context.Context,
req *pbuser.GetSubscribeUsersStatusReq) (*pbuser.GetSubscribeUsersStatusResp, error) {
userList, err := s.UserDatabase.GetAllSubscribeList(ctx, req.UserID) userList, err := s.UserDatabase.GetAllSubscribeList(ctx, req.UserID)
if err != nil { if err != nil {
return nil, err return nil, err

@ -21,6 +21,8 @@ import (
"strconv" "strconv"
"time" "time"
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/protocol/user" "github.com/OpenIMSDK/protocol/user"
@ -51,7 +53,7 @@ type UserCache interface {
GetUserGlobalRecvMsgOpt(ctx context.Context, userID string) (opt int, err error) GetUserGlobalRecvMsgOpt(ctx context.Context, userID string) (opt int, err error)
DelUsersGlobalRecvMsgOpt(userIDs ...string) UserCache DelUsersGlobalRecvMsgOpt(userIDs ...string) UserCache
GetUserStatus(ctx context.Context, userIDs []string) ([]*user.OnlineStatus, error) GetUserStatus(ctx context.Context, userIDs []string) ([]*user.OnlineStatus, error)
SetUserStatus(ctx context.Context, list []*user.OnlineStatus) error SetUserStatus(ctx context.Context, userID string, status, platformID int32) error
} }
type UserCacheRedis struct { type UserCacheRedis struct {
@ -198,58 +200,72 @@ func (u *UserCacheRedis) GetUserStatus(ctx context.Context, userIDs []string) ([
return nil, errs.Wrap(err) return nil, errs.Wrap(err)
} }
onlineStatus.UserID = userID onlineStatus.UserID = userID
onlineStatus.Status = constant.Online
res = append(res, &onlineStatus) res = append(res, &onlineStatus)
} }
return res, nil return res, nil
} }
// SetUserStatus Set the user status and save it in redis. // SetUserStatus Set the user status and save it in redis.
func (u *UserCacheRedis) SetUserStatus(ctx context.Context, list []*user.OnlineStatus) error { func (u *UserCacheRedis) SetUserStatus(ctx context.Context, userID string, status, platformID int32) error {
for _, status := range list { UserIDNum := crc32.ChecksumIEEE([]byte(userID))
var isNewKey int64
UserIDNum := crc32.ChecksumIEEE([]byte(status.UserID))
modKey := strconv.Itoa(int(UserIDNum % statusMod)) modKey := strconv.Itoa(int(UserIDNum % statusMod))
key := olineStatusKey + modKey key := olineStatusKey + modKey
jsonData, err := json.Marshal(status) log.ZDebug(ctx, "SetUserStatus args", "userID", userID, "status", status,
"platformID", platformID, "modKey", modKey, "key", key)
isNewKey, err := u.rdb.Exists(ctx, key).Result()
if err != nil { if err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
isNewKey, err = u.rdb.Exists(ctx, key).Result() if isNewKey == 0 {
if status == constant.Online {
onlineStatus := user.OnlineStatus{
UserID: userID,
Status: constant.Online,
PlatformIDs: []int32{platformID},
}
jsonData, err := json.Marshal(onlineStatus)
if err != nil { if err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
if isNewKey == 0 { _, err = u.rdb.HSet(ctx, key, userID, string(jsonData)).Result()
_, err = u.rdb.HSet(ctx, key, status.UserID, string(jsonData)).Result()
if err != nil { if err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
u.rdb.Expire(ctx, key, userOlineStatusExpireTime) u.rdb.Expire(ctx, key, userOlineStatusExpireTime)
} else { return nil
result, err := u.rdb.HGet(ctx, key, status.UserID).Result() }
}
isNil := false
result, err := u.rdb.HGet(ctx, key, userID).Result()
if err != nil { if err != nil {
if err == redis.Nil {
isNil = true
} else {
return errs.Wrap(err) return errs.Wrap(err)
} }
}
if status == constant.Offline {
if isNil {
log.ZWarn(ctx, "this user not online,maybe trigger order not right",
err, "userStatus", status)
return nil
}
var onlineStatus user.OnlineStatus var onlineStatus user.OnlineStatus
err = json.Unmarshal([]byte(result), &onlineStatus) err = json.Unmarshal([]byte(result), &onlineStatus)
if err != nil { if err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
onlineStatus.UserID = status.UserID
if status.Status == constant.Offline {
var newPlatformIDs []int32 var newPlatformIDs []int32
for _, val := range onlineStatus.PlatformIDs { for _, val := range onlineStatus.PlatformIDs {
if val != status.PlatformIDs[0] { if val != platformID {
newPlatformIDs = append(newPlatformIDs, val) newPlatformIDs = append(newPlatformIDs, val)
} }
} }
if newPlatformIDs == nil { if newPlatformIDs == nil {
onlineStatus.Status = constant.Offline _, err = u.rdb.HDel(ctx, key, userID).Result()
onlineStatus.PlatformIDs = []int32{}
newjsonData, err := json.Marshal(&onlineStatus)
if err != nil {
return errs.Wrap(err)
}
_, err = u.rdb.HSet(ctx, key, status.UserID, string(newjsonData)).Result()
if err != nil { if err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
@ -259,34 +275,32 @@ func (u *UserCacheRedis) SetUserStatus(ctx context.Context, list []*user.OnlineS
if err != nil { if err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
_, err = u.rdb.HSet(ctx, key, status.UserID, string(newjsonData)).Result() _, err = u.rdb.HSet(ctx, key, userID, string(newjsonData)).Result()
if err != nil { if err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
} }
} else { } else {
onlineStatus.Status = constant.Online var onlineStatus user.OnlineStatus
// Judging whether to be kicked out. if !isNil {
flag := false err = json.Unmarshal([]byte(result), &onlineStatus)
for _, val := range onlineStatus.PlatformIDs { if err != nil {
if val == status.PlatformIDs[0] { return errs.Wrap(err)
flag = true
break
}
} }
if !flag {
onlineStatus.PlatformIDs = append(onlineStatus.PlatformIDs, status.PlatformIDs[0])
} }
onlineStatus.Status = constant.Online
onlineStatus.UserID = userID
onlineStatus.PlatformIDs = append(onlineStatus.PlatformIDs, platformID)
newjsonData, err := json.Marshal(&onlineStatus) newjsonData, err := json.Marshal(&onlineStatus)
if err != nil { if err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
_, err = u.rdb.HSet(ctx, key, status.UserID, string(newjsonData)).Result() _, err = u.rdb.HSet(ctx, key, userID, string(newjsonData)).Result()
if err != nil { if err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
} }
}
}
return nil return nil
} }

@ -64,7 +64,7 @@ type UserDatabase interface {
// GetUserStatus Get the online status of the user // GetUserStatus Get the online status of the user
GetUserStatus(ctx context.Context, userIDs []string) ([]*user.OnlineStatus, error) GetUserStatus(ctx context.Context, userIDs []string) ([]*user.OnlineStatus, error)
// SetUserStatus Set the user status and store the user status in redis // SetUserStatus Set the user status and store the user status in redis
SetUserStatus(ctx context.Context, list []*user.OnlineStatus) error SetUserStatus(ctx context.Context, userID string, status, platformID int32) error
} }
type userDatabase struct { type userDatabase struct {
@ -217,6 +217,6 @@ func (u *userDatabase) GetUserStatus(ctx context.Context, userIDs []string) ([]*
} }
// SetUserStatus Set the user status and save it in redis. // SetUserStatus Set the user status and save it in redis.
func (u *userDatabase) SetUserStatus(ctx context.Context, list []*user.OnlineStatus) error { func (u *userDatabase) SetUserStatus(ctx context.Context, userID string, status, platformID int32) error {
return u.cache.SetUserStatus(ctx, list) return u.cache.SetUserStatus(ctx, userID, status, platformID)
} }

@ -163,8 +163,12 @@ func (u *UserMongoDriver) GetAllSubscribeList(ctx context.Context, userID string
bson.M{"user_id": SubscriptionPrefix + userID}) bson.M{"user_id": SubscriptionPrefix + userID})
err = cursor.Decode(&user) err = cursor.Decode(&user)
if err != nil { if err != nil {
if err == mongo.ErrNoDocuments {
return []string{}, nil
} else {
return nil, errs.Wrap(err) return nil, errs.Wrap(err)
} }
}
return user.UserIDList, nil return user.UserIDList, nil
} }
@ -176,7 +180,11 @@ func (u *UserMongoDriver) GetSubscribedList(ctx context.Context, userID string)
bson.M{"user_id": SubscribedPrefix + userID}) bson.M{"user_id": SubscribedPrefix + userID})
err = cursor.Decode(&user) err = cursor.Decode(&user)
if err != nil { if err != nil {
if err == mongo.ErrNoDocuments {
return []string{}, nil
} else {
return nil, errs.Wrap(err) return nil, errs.Wrap(err)
} }
}
return user.UserIDList, nil return user.UserIDList, nil
} }

@ -173,6 +173,9 @@ func (u *UserRpcClient) GetAllUserIDs(ctx context.Context, pageNumber, showNumbe
// SetUserStatus sets the status for a user based on the provided user ID, status, and platform ID. // SetUserStatus sets the status for a user based on the provided user ID, status, and platform ID.
func (u *UserRpcClient) SetUserStatus(ctx context.Context, userID string, status int32, platformID int) error { func (u *UserRpcClient) SetUserStatus(ctx context.Context, userID string, status int32, platformID int) error {
_, err := u.Client.SetUserStatus(ctx, &user.SetUserStatusReq{StatusList: []*user.OnlineStatus{{UserID: userID, Status: status, PlatformIDs: []int32{int32(platformID)}}}}) _, err := u.Client.SetUserStatus(ctx, &user.SetUserStatusReq{
UserID: userID,
Status: status, PlatformID: int32(platformID),
})
return err return err
} }

@ -133,7 +133,6 @@ func exactIP(urll string) string {
return host return host
} }
func checkMysql() error { func checkMysql() error {
var sqlDB *sql.DB var sqlDB *sql.DB
defer func() { defer func() {

@ -2,12 +2,14 @@ package main
import ( import (
"fmt" "fmt"
"github.com/openimsdk/open-im-server/v3/tools/data-conversion/chat/conversion" "log"
"github.com/openimsdk/open-im-server/v3/tools/data-conversion/utils"
"gorm.io/driver/mysql" "gorm.io/driver/mysql"
"gorm.io/gorm" "gorm.io/gorm"
"gorm.io/gorm/logger" "gorm.io/gorm/logger"
"log"
"github.com/openimsdk/open-im-server/v3/tools/data-conversion/chat/conversion"
"github.com/openimsdk/open-im-server/v3/tools/data-conversion/utils"
) )
func main() { func main() {

@ -16,7 +16,7 @@ package common
// =================================== V2 ===================================== // =================================== V2 =====================================
// MySQL // MySQL
// V2 // V2.
const ( const (
UsernameV2 = "root" UsernameV2 = "root"
PasswordV2 = "openIM" PasswordV2 = "openIM"
@ -24,7 +24,7 @@ const (
DatabaseV2 = "openIM_v2" DatabaseV2 = "openIM_v2"
) )
// V2 chat // V2 chat.
const ( const (
ChatUsernameV2 = "root" ChatUsernameV2 = "root"
ChatPasswordV2 = "openIM" ChatPasswordV2 = "openIM"
@ -32,14 +32,14 @@ const (
ChatDatabaseV2 = "admin_chat" ChatDatabaseV2 = "admin_chat"
) )
// Kafka // Kafka.
const ( const (
Topic = "ws2ms_chat" Topic = "ws2ms_chat"
KafkaAddr = "121.5.182.23:9092" KafkaAddr = "121.5.182.23:9092"
) )
// =================================== V3 ===================================== // =================================== V3 =====================================
// V3 // V3.
const ( const (
UsernameV3 = "root" UsernameV3 = "root"
PasswordV3 = "openIM123" PasswordV3 = "openIM123"
@ -47,7 +47,7 @@ const (
DatabaseV3 = "openIM_v3" DatabaseV3 = "openIM_v3"
) )
// V3 chat // V3 chat.
const ( const (
ChatUsernameV3 = "root" ChatUsernameV3 = "root"
ChatPasswordV3 = "openIM123" ChatPasswordV3 = "openIM123"
@ -55,7 +55,7 @@ const (
ChatDatabaseV3 = "openim_enterprise" ChatDatabaseV3 = "openim_enterprise"
) )
// Zookeeper // Zookeeper.
const ( const (
ZkAddr = "43.134.63.160:12181" ZkAddr = "43.134.63.160:12181"
ZKSchema = "openim" ZKSchema = "openim"

@ -3,20 +3,22 @@ package main
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"log"
"sync"
"sync/atomic"
"time"
"github.com/IBM/sarama" "github.com/IBM/sarama"
"github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/protocol/msg" "github.com/OpenIMSDK/protocol/msg"
"github.com/OpenIMSDK/protocol/sdkws" "github.com/OpenIMSDK/protocol/sdkws"
"github.com/OpenIMSDK/tools/mw" "github.com/OpenIMSDK/tools/mw"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/openimsdk/open-im-server/v3/pkg/apistruct"
pbmsg "github.com/openimsdk/open-im-server/v3/tools/data-conversion/openim/proto/msg"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
"log"
"sync" "github.com/openimsdk/open-im-server/v3/pkg/apistruct"
"sync/atomic" pbmsg "github.com/openimsdk/open-im-server/v3/tools/data-conversion/openim/proto/msg"
"time"
) )
func main() { func main() {

@ -16,12 +16,14 @@ package main
import ( import (
"fmt" "fmt"
"github.com/openimsdk/open-im-server/v3/tools/data-conversion/openim/mysql/conversion" "log"
"github.com/openimsdk/open-im-server/v3/tools/data-conversion/utils"
"gorm.io/driver/mysql" "gorm.io/driver/mysql"
"gorm.io/gorm" "gorm.io/gorm"
"gorm.io/gorm/logger" "gorm.io/gorm/logger"
"log"
"github.com/openimsdk/open-im-server/v3/tools/data-conversion/openim/mysql/conversion"
"github.com/openimsdk/open-im-server/v3/tools/data-conversion/utils"
) )
func main() { func main() {

@ -2,12 +2,14 @@ package mysql
import ( import (
"fmt" "fmt"
"github.com/openimsdk/open-im-server/v3/tools/data-conversion/openim/mysql/conversion" "log"
"github.com/openimsdk/open-im-server/v3/tools/data-conversion/utils"
"gorm.io/driver/mysql" "gorm.io/driver/mysql"
"gorm.io/gorm" "gorm.io/gorm"
"gorm.io/gorm/logger" "gorm.io/gorm/logger"
"log"
"github.com/openimsdk/open-im-server/v3/tools/data-conversion/openim/mysql/conversion"
"github.com/openimsdk/open-im-server/v3/tools/data-conversion/utils"
) )
func Cmd() { func Cmd() {

@ -2,6 +2,7 @@ package conversion
import ( import (
"github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/protocol/constant"
v3 "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" v3 "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
v2 "github.com/openimsdk/open-im-server/v3/tools/data-conversion/openim/mysql/v2" v2 "github.com/openimsdk/open-im-server/v3/tools/data-conversion/openim/mysql/v2"
"github.com/openimsdk/open-im-server/v3/tools/data-conversion/utils" "github.com/openimsdk/open-im-server/v3/tools/data-conversion/utils"

@ -35,7 +35,7 @@ type Group struct {
Introduction string `gorm:"column:introduction;size:255" json:"introduction"` Introduction string `gorm:"column:introduction;size:255" json:"introduction"`
FaceURL string `gorm:"column:face_url;size:255" json:"faceURL"` FaceURL string `gorm:"column:face_url;size:255" json:"faceURL"`
CreateTime time.Time `gorm:"column:create_time;index:create_time"` CreateTime time.Time `gorm:"column:create_time;index:create_time"`
Ex string `gorm:"column:ex" json:"ex;size:1024" json:"ex"` Ex string `gorm:"column:ex" json:"ex;size:1024"`
Status int32 `gorm:"column:status"` Status int32 `gorm:"column:status"`
CreatorUserID string `gorm:"column:creator_user_id;size:64"` CreatorUserID string `gorm:"column:creator_user_id;size:64"`
GroupType int32 `gorm:"column:group_type"` GroupType int32 `gorm:"column:group_type"`

@ -2,11 +2,12 @@ package utils
import ( import (
"fmt" "fmt"
"gorm.io/gorm"
"gorm.io/gorm/schema"
"log" "log"
"sync" "sync"
"sync/atomic" "sync/atomic"
"gorm.io/gorm"
"gorm.io/gorm/schema"
) )
func FindAndInsert[V2 any, V3 schema.Tabler](v2db *gorm.DB, v3db *gorm.DB, fn func(V2) (V3, bool)) (string, error) { func FindAndInsert[V2 any, V3 schema.Tabler](v2db *gorm.DB, v3db *gorm.DB, fn func(V2) (V3, bool)) (string, error) {

Loading…
Cancel
Save