Merge branch 'main' of github.com:openimsdk/open-im-server

pull/2566/head
Monet Lee 1 year ago
commit fcb210df0d

@ -8,6 +8,6 @@ prometheus:
# Whether to enable prometheus
enable: true
# Prometheus listening ports, must match the number of api.ports
ports: [ 20113 ]
ports: [ 20502 ]
# This address can be accessed via a browser
grafanaURL: http://127.0.0.1:13000/

@ -8,7 +8,7 @@ prometheus:
# Enable or disable Prometheus monitoring
enable: true
# List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup
ports: [ 20112 ]
ports: [ 20640 ]
# IP address that the RPC/WebSocket service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP
listenIP: 0.0.0.0

@ -3,4 +3,4 @@ prometheus:
enable: true
# List of ports that Prometheus listens on; each port corresponds to an instance of monitoring. Ensure these are managed accordingly
# Because four instances have been launched, four ports need to be specified
ports: [ 20108, 20109, 20110, 20111 ]
ports: [ 20600, 20601, 20602, 20603 ]

@ -4,13 +4,13 @@ rpc:
# IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP
listenIP: 0.0.0.0
# List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports
ports: [ 10170 ]
ports: [ 10170, 10171, 10172, 10173 ]
prometheus:
# Enable or disable Prometheus monitoring
enable: true
# List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup
ports: [ 20107 ]
ports: [ 20670, 20671, 20672, 20673 ]
maxConcurrentWorkers: 3
#Use geTui for offline push notifications, or choose fcm or jpns; corresponding configuration settings must be specified.

@ -10,7 +10,7 @@ prometheus:
# Enable or disable Prometheus monitoring
enable: true
# List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup
ports: [ 20106 ]
ports: [ 20660 ]
tokenPolicy:
# Token validity period, in days

@ -10,4 +10,4 @@ prometheus:
# Enable or disable Prometheus monitoring
enable: true
# List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup
ports: [ 20105 ]
ports: [ 20680 ]

@ -10,4 +10,4 @@ prometheus:
# Enable or disable Prometheus monitoring
enable: true
# List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup
ports: [ 20104 ]
ports: [ 20620 ]

@ -10,4 +10,7 @@ prometheus:
# Enable or disable Prometheus monitoring
enable: true
# List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup
ports: [ 20103 ]
ports: [ 20650 ]
enableHistoryForNewMembers: true

@ -10,7 +10,7 @@ prometheus:
# Enable or disable Prometheus monitoring
enable: true
# List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup
ports: [ 20102 ]
ports: [ 20630 ]
# Does sending messages require friend verification

@ -10,7 +10,7 @@ prometheus:
# Enable or disable Prometheus monitoring
enable: true
# List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup
ports: [ 20101 ]
ports: [ 20690 ]
object:

@ -10,7 +10,7 @@ prometheus:
# Whether to enable prometheus
enable: true
# Prometheus listening ports, must be consistent with the number of rpc.ports
ports: [ 20100 ]
ports: [ 20610 ]

@ -19,65 +19,65 @@ rule_files:
# A scrape configuration containing exactly one endpoint to scrape:
# Here it's Prometheus itself.
scrape_configs:
# The job name is added as a label job=job_name" to any timeseries scraped from this config.
# The job name is added as a label "job=job_name" to any timeseries scraped from this config.
# Monitored information captured by prometheus
# prometheus fetches application services
- job_name: node_exporter
static_configs:
- targets: [ internal_ip:20114 ]
- targets: [ internal_ip:20500 ]
- job_name: openimserver-openim-api
static_configs:
- targets: [ internal_ip:20113 ]
- targets: [ internal_ip:20502 ]
labels:
namespace: default
- job_name: openimserver-openim-msggateway
static_configs:
- targets: [ internal_ip:20112 ]
- targets: [ internal_ip:20640 ]
labels:
namespace: default
- job_name: openimserver-openim-msgtransfer
static_configs:
- targets: [ internal_ip:20111, internal_ip:20110, internal_ip:20109, internal_ip:20108 ]
- targets: [ internal_ip:20600, internal_ip:20601, internal_ip:20602, internal_ip:20603 ]
labels:
namespace: default
- job_name: openimserver-openim-push
static_configs:
- targets: [ internal_ip:20107 ]
- targets: [ internal_ip:20670, internal_ip:20671, internal_ip:20672, internal_ip:20673]
labels:
namespace: default
- job_name: openimserver-openim-rpc-auth
static_configs:
- targets: [ internal_ip:20106 ]
- targets: [ internal_ip:20600 ]
labels:
namespace: default
- job_name: openimserver-openim-rpc-conversation
static_configs:
- targets: [ internal_ip:20105 ]
- targets: [ internal_ip:20680 ]
labels:
namespace: default
- job_name: openimserver-openim-rpc-friend
static_configs:
- targets: [ internal_ip:20104 ]
- targets: [ internal_ip:20620 ]
labels:
namespace: default
- job_name: openimserver-openim-rpc-group
static_configs:
- targets: [ internal_ip:20103 ]
- targets: [ internal_ip:20650 ]
labels:
namespace: default
- job_name: openimserver-openim-rpc-msg
static_configs:
- targets: [ internal_ip:20102 ]
- targets: [ internal_ip:20630 ]
labels:
namespace: default
- job_name: openimserver-openim-rpc-third
static_configs:
- targets: [ internal_ip:20101 ]
- targets: [ internal_ip:20690 ]
labels:
namespace: default
- job_name: openimserver-openim-rpc-user
static_configs:
- targets: [ internal_ip:20100 ]
- targets: [ internal_ip:20610 ]
labels:
namespace: default

@ -130,6 +130,13 @@ beforeSetGroupInfo:
enable: false
timeout: 5
failedContinue: true
afterSetGroupInfoEX:
enable: false
timeout: 5
beforeSetGroupInfoEX:
enable: false
timeout: 5
failedContinue: true
afterRevokeMsg:
enable: false
timeout: 5

@ -12,7 +12,7 @@ require (
github.com/gorilla/websocket v1.5.1
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.69
github.com/openimsdk/protocol v0.0.72-alpha.9
github.com/openimsdk/tools v0.0.49-alpha.55
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0
@ -41,6 +41,7 @@ require (
github.com/spf13/viper v1.18.2
github.com/stathat/consistent v1.0.0
go.uber.org/automaxprocs v1.5.3
golang.org/x/exp v0.0.0-20230905200255-921286631fa9
golang.org/x/sync v0.6.0
)
@ -74,7 +75,6 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/bytedance/sonic v1.9.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/chai2010/webp v1.1.1 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
github.com/clbanning/mxj v1.8.4 // indirect
github.com/coreos/go-semver v0.3.0 // indirect
@ -170,7 +170,6 @@ require (
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/arch v0.3.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/image v0.15.0 // indirect
golang.org/x/net v0.22.0 // indirect
golang.org/x/oauth2 v0.17.0 // indirect

@ -71,8 +71,6 @@ github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZX
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chai2010/webp v1.1.1 h1:jTRmEccAJ4MGrhFOrPMpNGIJ/eybIgwKpcACsrTEapk=
github.com/chai2010/webp v1.1.1/go.mod h1:0XVwvZWdjjdxpUEIf7b9g9VkHFnInUSYujwqTLEuldU=
github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY=
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 h1:qSGYFH7+jGhDF8vLC+iwCD4WpbV1EBDSzWkJODFLams=
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk=
@ -321,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.69 h1:dVi8meSg8kmUzSH1XQab4MjihqKkkcCAmt1BYXPJuXo=
github.com/openimsdk/protocol v0.0.69/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/protocol v0.0.72-alpha.9 h1:Dyx4vs88IU4rJ2YcP/TdYp4ww8JjsMkV89hB/Eazx+A=
github.com/openimsdk/protocol v0.0.72-alpha.9/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/tools v0.0.49-alpha.55 h1:KPgC53oqiwZYssLKljhtXbWXifMlTj2SSQEusj4Uf4k=
github.com/openimsdk/tools v0.0.49-alpha.55/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=

@ -35,6 +35,10 @@ func (o *GroupApi) SetGroupInfo(c *gin.Context) {
a2r.Call(group.GroupClient.SetGroupInfo, o.Client, c)
}
func (o *GroupApi) SetGroupInfoEX(c *gin.Context) {
a2r.Call(group.GroupClient.SetGroupInfoEX, o.Client, c)
}
func (o *GroupApi) JoinGroup(c *gin.Context) {
a2r.Call(group.GroupClient.JoinGroup, o.Client, c)
}

@ -2,12 +2,16 @@ package api
import (
"fmt"
"github.com/gin-gonic/gin"
"github.com/gin-gonic/gin/binding"
"github.com/go-playground/validator/v10"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"net/http"
"strings"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
@ -16,8 +20,6 @@ import (
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mw"
"net/http"
"strings"
)
func prommetricsGin() gin.HandlerFunc {
@ -112,6 +114,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En
{
groupRouterGroup.POST("/create_group", g.CreateGroup)
groupRouterGroup.POST("/set_group_info", g.SetGroupInfo)
groupRouterGroup.POST("/set_group_info_ex", g.SetGroupInfoEX)
groupRouterGroup.POST("/join_group", g.JoinGroup)
groupRouterGroup.POST("/quit_group", g.QuitGroup)
groupRouterGroup.POST("/group_application_response", g.ApplicationGroupResponse)

@ -36,9 +36,11 @@ func (u *UserApi) UserRegister(c *gin.Context) {
a2r.Call(user.UserClient.UserRegister, u.Client, c)
}
// UpdateUserInfo is deprecated. Use UpdateUserInfoEx
func (u *UserApi) UpdateUserInfo(c *gin.Context) {
a2r.Call(user.UserClient.UpdateUserInfo, u.Client, c)
}
func (u *UserApi) UpdateUserInfoEx(c *gin.Context) {
a2r.Call(user.UserClient.UpdateUserInfoEx, u.Client, c)
}

@ -271,11 +271,13 @@ func (c *Client) replyMessage(ctx context.Context, binaryReq *Req, err error, re
ErrMsg: errResp.ErrMsg,
Data: resp,
}
t := time.Now()
log.ZDebug(ctx, "gateway reply message", "resp", mReply.String())
err = c.writeBinaryMsg(mReply)
if err != nil {
log.ZWarn(ctx, "wireBinaryMsg replyMessage", err, "resp", mReply.String())
}
log.ZDebug(ctx, "wireBinaryMsg end", "time cost", time.Since(t))
if binaryReq.ReqIdentifier == WsLogoutMsg {
return errs.New("user logout", "operationID", binaryReq.OperationID).Wrap()

@ -16,6 +16,7 @@ package msgtransfer
import (
"context"
"encoding/json"
"errors"
"github.com/IBM/sarama"
"github.com/go-redis/redis"
@ -89,6 +90,7 @@ func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database cont
och.conversationRpcClient = conversationRpcClient
och.groupRpcClient = groupRpcClient
och.historyConsumerGroup = historyConsumerGroup
return &och, err
}
func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID int, val *batcher.Msg[sarama.ConsumerMessage]) {
@ -97,6 +99,7 @@ func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID
ctx = withAggregationCtx(ctx, ctxMessages)
log.ZInfo(ctx, "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMessages),
"key", val.Key())
och.doSetReadSeq(ctx, ctxMessages)
storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList :=
och.categorizeMessageLists(ctxMessages)
@ -110,6 +113,60 @@ func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID
och.handleNotification(ctx, val.Key(), conversationIDNotification, storageNotificationList, notStorageNotificationList)
}
func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context, msgs []*ContextMsg) {
type seqKey struct {
conversationID string
userID string
}
var readSeq map[seqKey]int64
for _, msg := range msgs {
if msg.message.ContentType != constant.HasReadReceipt {
continue
}
var elem sdkws.NotificationElem
if err := json.Unmarshal(msg.message.Content, &elem); err != nil {
log.ZError(ctx, "handlerConversationRead Unmarshal NotificationElem msg err", err, "msg", msg)
continue
}
var tips sdkws.MarkAsReadTips
if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil {
log.ZError(ctx, "handlerConversationRead Unmarshal MarkAsReadTips msg err", err, "msg", msg)
continue
}
if len(tips.Seqs) > 0 {
for _, seq := range tips.Seqs {
if tips.HasReadSeq < seq {
tips.HasReadSeq = seq
}
}
clear(tips.Seqs)
tips.Seqs = nil
}
if tips.HasReadSeq < 0 {
continue
}
if readSeq == nil {
readSeq = make(map[seqKey]int64)
}
key := seqKey{
conversationID: tips.ConversationID,
userID: tips.MarkAsReadUserID,
}
if readSeq[key] > tips.HasReadSeq {
continue
}
readSeq[key] = tips.HasReadSeq
}
if readSeq == nil {
return
}
for key, seq := range readSeq {
if err := och.msgDatabase.SetHasReadSeqToDB(ctx, key.userID, key.conversationID, seq); err != nil {
log.ZError(ctx, "set read seq to db error", err, "userID", key.userID, "conversationID", key.conversationID, "seq", seq)
}
}
}
func (och *OnlineHistoryRedisConsumerHandler) parseConsumerMessages(ctx context.Context, consumerMessages []*sarama.ConsumerMessage) []*ContextMsg {
var ctxMessages []*ContextMsg
for i := 0; i < len(consumerMessages); i++ {

@ -16,7 +16,6 @@ package msgtransfer
import (
"context"
"github.com/IBM/sarama"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"

@ -0,0 +1,29 @@
package push
import (
"github.com/openimsdk/protocol/sdkws"
"testing"
)
func TestName(t *testing.T) {
var c ConsumerHandler
c.readCh = make(chan *sdkws.MarkAsReadTips)
go c.loopRead()
go func() {
for i := 0; ; i++ {
seq := int64(i + 1)
if seq%3 == 0 {
seq = 1
}
c.readCh <- &sdkws.MarkAsReadTips{
ConversationID: "c100",
MarkAsReadUserID: "u100",
HasReadSeq: seq,
}
}
}()
select {}
}

@ -91,9 +91,9 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) {
}
sec := msgFromMQ.MsgData.SendTime / 1000
nowSec := timeutil.GetCurrentTimestampBySecond()
log.ZDebug(ctx, "push msg", "msg", pbData.String(), "sec", sec, "nowSec", nowSec)
if nowSec-sec > 10 {
return
log.ZWarn(ctx, "long time push msg", nil, "msg", pbData.String(), "sec", sec, "nowSec", nowSec, "nowSec-sec", nowSec-sec)
}
var err error
switch msgFromMQ.MsgData.SessionType {

@ -221,11 +221,11 @@ func (c *conversationServer) SetConversation(ctx context.Context, req *pbconvers
return resp, nil
}
// nolint
func (c *conversationServer) SetConversations(ctx context.Context, req *pbconversation.SetConversationsReq) (*pbconversation.SetConversationsResp, error) {
if req.Conversation == nil {
return nil, errs.ErrArgs.WrapMsg("conversation must not be nil")
}
if req.Conversation.ConversationType == constant.WriteGroupChatType {
groupInfo, err := c.groupRpcClient.GetGroupInfo(ctx, req.Conversation.GroupID)
if err != nil {
@ -235,17 +235,20 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver
return nil, servererrs.ErrDismissedAlready.WrapMsg("group dismissed")
}
}
var unequal int
var conv dbModel.Conversation
if len(req.UserIDs) == 1 {
cs, err := c.conversationDatabase.FindConversations(ctx, req.UserIDs[0], []string{req.Conversation.ConversationID})
conversationMap := make(map[string]*dbModel.Conversation)
var needUpdateUsersList []string
for _, userID := range req.UserIDs {
conversationList, err := c.conversationDatabase.FindConversations(ctx, userID, []string{req.Conversation.ConversationID})
if err != nil {
return nil, err
}
if len(cs) == 0 {
return nil, errs.ErrRecordNotFound.WrapMsg("conversation not found")
if len(conversationList) != 0 {
conversationMap[userID] = conversationList[0]
} else {
needUpdateUsersList = append(needUpdateUsersList, userID)
}
conv = *cs[0]
}
var conversation dbModel.Conversation
@ -253,91 +256,121 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver
conversation.ConversationType = req.Conversation.ConversationType
conversation.UserID = req.Conversation.UserID
conversation.GroupID = req.Conversation.GroupID
m := make(map[string]any)
setConversationFieldsFunc := func() {
if req.Conversation.RecvMsgOpt != nil {
m["recv_msg_opt"] = req.Conversation.RecvMsgOpt.Value
if req.Conversation.RecvMsgOpt.Value != conv.RecvMsgOpt {
unequal++
}
if req.Conversation.AttachedInfo != nil {
m["attached_info"] = req.Conversation.AttachedInfo.Value
}
if req.Conversation.Ex != nil {
m["ex"] = req.Conversation.Ex.Value
}
if req.Conversation.IsPinned != nil {
m["is_pinned"] = req.Conversation.IsPinned.Value
}
if req.Conversation.GroupAtType != nil {
m["group_at_type"] = req.Conversation.GroupAtType.Value
}
if req.Conversation.MsgDestructTime != nil {
m["msg_destruct_time"] = req.Conversation.MsgDestructTime.Value
}
if req.Conversation.MsgDestructTime != nil {
m["msg_destruct_time"] = req.Conversation.MsgDestructTime.Value
}
if req.Conversation.BurnDuration != nil {
m["burn_duration"] = req.Conversation.BurnDuration.Value
}
}
// set need set field in conversation
setConversationFieldsFunc()
for userID := range conversationMap {
unequal := len(m)
if req.Conversation.RecvMsgOpt != nil {
if req.Conversation.RecvMsgOpt.Value == conversationMap[userID].RecvMsgOpt {
unequal--
}
}
if req.Conversation.AttachedInfo != nil {
m["attached_info"] = req.Conversation.AttachedInfo.Value
if req.Conversation.AttachedInfo.Value != conv.AttachedInfo {
unequal++
if req.Conversation.AttachedInfo.Value == conversationMap[userID].AttachedInfo {
unequal--
}
}
if req.Conversation.Ex != nil {
m["ex"] = req.Conversation.Ex.Value
if req.Conversation.Ex.Value != conv.Ex {
unequal++
if req.Conversation.Ex.Value == conversationMap[userID].Ex {
unequal--
}
}
if req.Conversation.IsPinned != nil {
m["is_pinned"] = req.Conversation.IsPinned.Value
if req.Conversation.IsPinned.Value != conv.IsPinned {
unequal++
if req.Conversation.IsPinned.Value == conversationMap[userID].IsPinned {
unequal--
}
}
if req.Conversation.GroupAtType != nil {
m["group_at_type"] = req.Conversation.GroupAtType.Value
if req.Conversation.GroupAtType.Value != conv.GroupAtType {
unequal++
if req.Conversation.GroupAtType.Value == conversationMap[userID].GroupAtType {
unequal--
}
}
if req.Conversation.MsgDestructTime != nil {
m["msg_destruct_time"] = req.Conversation.MsgDestructTime.Value
if req.Conversation.MsgDestructTime.Value != conv.MsgDestructTime {
unequal++
if req.Conversation.MsgDestructTime.Value == conversationMap[userID].MsgDestructTime {
unequal--
}
}
if req.Conversation.IsMsgDestruct != nil {
m["is_msg_destruct"] = req.Conversation.IsMsgDestruct.Value
if req.Conversation.IsMsgDestruct.Value != conv.IsMsgDestruct {
unequal++
if req.Conversation.IsMsgDestruct.Value == conversationMap[userID].IsMsgDestruct {
unequal--
}
}
if req.Conversation.BurnDuration != nil {
m["burn_duration"] = req.Conversation.BurnDuration.Value
if req.Conversation.BurnDuration.Value != conv.BurnDuration {
unequal++
}
}
if len(m) != 0 {
if err := c.conversationDatabase.SetUsersConversationFieldTx(ctx, req.UserIDs, &conversation, m); err != nil {
return nil, err
if req.Conversation.BurnDuration.Value == conversationMap[userID].BurnDuration {
unequal--
}
}
if unequal > 0 {
for _, v := range req.UserIDs {
c.conversationNotificationSender.ConversationChangeNotification(ctx, v, []string{req.Conversation.ConversationID})
needUpdateUsersList = append(needUpdateUsersList, userID)
}
return &pbconversation.SetConversationsResp{}, nil
}
if req.Conversation.IsPrivateChat != nil && req.Conversation.ConversationType != constant.ReadGroupChatType {
var conversations []*dbModel.Conversation
for _, ownerUserID := range req.UserIDs {
conversation2 := conversation
conversation2.OwnerUserID = ownerUserID
conversation2.IsPrivateChat = req.Conversation.IsPrivateChat.Value
conversations = append(conversations, &conversation2)
transConversation := conversation
transConversation.OwnerUserID = ownerUserID
transConversation.IsPrivateChat = req.Conversation.IsPrivateChat.Value
conversations = append(conversations, &transConversation)
}
if err := c.conversationDatabase.SyncPeerUserPrivateConversationTx(ctx, conversations); err != nil {
return nil, err
}
for _, userID := range req.UserIDs {
c.conversationNotificationSender.ConversationSetPrivateNotification(ctx, userID, req.Conversation.UserID,
req.Conversation.IsPrivateChat.Value, req.Conversation.ConversationID)
}
} else {
if len(m) != 0 && len(needUpdateUsersList) != 0 {
if err := c.conversationDatabase.SetUsersConversationFieldTx(ctx, needUpdateUsersList, &conversation, m); err != nil {
return nil, err
}
for _, v := range needUpdateUsersList {
c.conversationNotificationSender.ConversationChangeNotification(ctx, v, []string{req.Conversation.ConversationID})
}
}
}
return &pbconversation.SetConversationsResp{}, nil
@ -402,6 +435,14 @@ func (c *conversationServer) SetConversationMaxSeq(ctx context.Context, req *pbc
return &pbconversation.SetConversationMaxSeqResp{}, nil
}
func (c *conversationServer) SetConversationMinSeq(ctx context.Context, req *pbconversation.SetConversationMinSeqReq) (*pbconversation.SetConversationMinSeqResp, error) {
if err := c.conversationDatabase.UpdateUsersConversationField(ctx, req.OwnerUserID, req.ConversationID,
map[string]any{"min_seq": req.MinSeq}); err != nil {
return nil, err
}
return &pbconversation.SetConversationMinSeqResp{}, nil
}
func (c *conversationServer) GetConversationIDs(ctx context.Context, req *pbconversation.GetConversationIDsReq) (*pbconversation.GetConversationIDsResp, error) {
conversationIDs, err := c.conversationDatabase.GetConversationIDs(ctx, req.UserID)
if err != nil {

@ -358,3 +358,74 @@ func (s *groupServer) webhookAfterSetGroupInfo(ctx context.Context, after *confi
}
s.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &callbackstruct.CallbackAfterSetGroupInfoResp{}, after)
}
func (s *groupServer) webhookBeforeSetGroupInfoEX(ctx context.Context, before *config.BeforeConfig, req *group.SetGroupInfoEXReq) error {
return webhook.WithCondition(ctx, before, func(ctx context.Context) error {
cbReq := &callbackstruct.CallbackBeforeSetGroupInfoEXReq{
CallbackCommand: callbackstruct.CallbackBeforeSetGroupInfoCommand,
GroupID: req.GroupInfoForSetEX.GroupID,
GroupName: req.GroupInfoForSetEX.GroupName,
Notification: req.GroupInfoForSetEX.Notification,
Introduction: req.GroupInfoForSetEX.Introduction,
FaceURL: req.GroupInfoForSetEX.FaceURL,
}
if req.GroupInfoForSetEX.Ex != nil {
cbReq.Ex = req.GroupInfoForSetEX.Ex
}
log.ZDebug(ctx, "debug CallbackBeforeSetGroupInfoEX", "ex", cbReq.Ex)
if req.GroupInfoForSetEX.NeedVerification != nil {
cbReq.NeedVerification = req.GroupInfoForSetEX.NeedVerification
}
if req.GroupInfoForSetEX.LookMemberInfo != nil {
cbReq.LookMemberInfo = req.GroupInfoForSetEX.LookMemberInfo
}
if req.GroupInfoForSetEX.ApplyMemberFriend != nil {
cbReq.ApplyMemberFriend = req.GroupInfoForSetEX.ApplyMemberFriend
}
resp := &callbackstruct.CallbackBeforeSetGroupInfoEXResp{}
if err := s.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil {
return err
}
datautil.NotNilReplace(&req.GroupInfoForSetEX.GroupID, &resp.GroupID)
datautil.NotNilReplace(&req.GroupInfoForSetEX.GroupName, &resp.GroupName)
datautil.NotNilReplace(&req.GroupInfoForSetEX.FaceURL, &resp.FaceURL)
datautil.NotNilReplace(&req.GroupInfoForSetEX.Introduction, &resp.Introduction)
datautil.NotNilReplace(&req.GroupInfoForSetEX.Ex, &resp.Ex)
datautil.NotNilReplace(&req.GroupInfoForSetEX.NeedVerification, &resp.NeedVerification)
datautil.NotNilReplace(&req.GroupInfoForSetEX.LookMemberInfo, &resp.LookMemberInfo)
datautil.NotNilReplace(&req.GroupInfoForSetEX.ApplyMemberFriend, &resp.ApplyMemberFriend)
return nil
})
}
func (s *groupServer) webhookAfterSetGroupInfoEX(ctx context.Context, after *config.AfterConfig, req *group.SetGroupInfoEXReq) {
cbReq := &callbackstruct.CallbackAfterSetGroupInfoEXReq{
CallbackCommand: callbackstruct.CallbackAfterSetGroupInfoCommand,
GroupID: req.GroupInfoForSetEX.GroupID,
GroupName: req.GroupInfoForSetEX.GroupName,
Notification: req.GroupInfoForSetEX.Notification,
Introduction: req.GroupInfoForSetEX.Introduction,
FaceURL: req.GroupInfoForSetEX.FaceURL,
}
if req.GroupInfoForSetEX.Ex != nil {
cbReq.Ex = req.GroupInfoForSetEX.Ex
}
if req.GroupInfoForSetEX.NeedVerification != nil {
cbReq.NeedVerification = req.GroupInfoForSetEX.NeedVerification
}
if req.GroupInfoForSetEX.LookMemberInfo != nil {
cbReq.LookMemberInfo = req.GroupInfoForSetEX.LookMemberInfo
}
if req.GroupInfoForSetEX.ApplyMemberFriend != nil {
cbReq.ApplyMemberFriend = req.GroupInfoForSetEX.ApplyMemberFriend
}
s.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &callbackstruct.CallbackAfterSetGroupInfoEXResp{}, after)
}

@ -54,6 +54,39 @@ func UpdateGroupInfoMap(ctx context.Context, group *sdkws.GroupInfoForSet) map[s
return m
}
func UpdateGroupInfoEXMap(ctx context.Context, group *sdkws.GroupInfoForSetEX) map[string]any {
m := make(map[string]any)
if group.GroupName != "" {
m["group_name"] = group.GroupName
}
if group.Notification != nil {
m["notification"] = group.Notification.Value
m["notification_update_time"] = time.Now()
m["notification_user_id"] = mcontext.GetOpUserID(ctx)
}
if group.Introduction != nil {
m["introduction"] = group.Introduction.Value
}
if group.FaceURL != nil {
m["face_url"] = group.FaceURL.Value
}
if group.NeedVerification != nil {
m["need_verification"] = group.NeedVerification.Value
}
if group.LookMemberInfo != nil {
m["look_member_info"] = group.LookMemberInfo.Value
}
if group.ApplyMemberFriend != nil {
m["apply_member_friend"] = group.ApplyMemberFriend.Value
}
if group.Ex != nil {
m["ex"] = group.Ex.Value
}
return m
}
func UpdateGroupStatusMap(status int) map[string]any {
return map[string]any{
"status": status,

File diff suppressed because it is too large Load Diff

@ -21,7 +21,9 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/versionctx"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
@ -43,12 +45,22 @@ const (
adminReceiver
)
func NewGroupNotificationSender(db controller.GroupDatabase, msgRpcClient *rpcclient.MessageRpcClient, userRpcClient *rpcclient.UserRpcClient, config *Config, fn func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error)) *GroupNotificationSender {
func NewGroupNotificationSender(
db controller.GroupDatabase,
msgRpcClient *rpcclient.MessageRpcClient,
userRpcClient *rpcclient.UserRpcClient,
conversationRpcClient *rpcclient.ConversationRpcClient,
config *Config,
fn func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error),
) *GroupNotificationSender {
return &GroupNotificationSender{
NotificationSender: rpcclient.NewNotificationSender(&config.NotificationConfig, rpcclient.WithRpcClient(msgRpcClient), rpcclient.WithUserRpcClient(userRpcClient)),
getUsersInfo: fn,
db: db,
config: config,
conversationRpcClient: conversationRpcClient,
msgRpcClient: msgRpcClient,
}
}
@ -57,6 +69,9 @@ type GroupNotificationSender struct {
getUsersInfo func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error)
db controller.GroupDatabase
config *Config
conversationRpcClient *rpcclient.ConversationRpcClient
msgRpcClient *rpcclient.MessageRpcClient
}
func (g *GroupNotificationSender) PopulateGroupMember(ctx context.Context, members ...*model.GroupMember) error {
@ -494,50 +509,67 @@ func (g *GroupNotificationSender) MemberKickedNotification(ctx context.Context,
g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.MemberKickedNotification, tips)
}
func (g *GroupNotificationSender) MemberInvitedNotification(ctx context.Context, groupID, reason string, invitedUserIDList []string) {
func (g *GroupNotificationSender) MemberEnterNotification(ctx context.Context, groupID string, entrantUserID ...string) error {
var err error
defer func() {
if err != nil {
log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err)
}
}()
var group *sdkws.GroupInfo
group, err = g.getGroupInfo(ctx, groupID)
if err != nil {
return
}
var users []*sdkws.GroupMemberFullInfo
users, err = g.getGroupMembers(ctx, groupID, invitedUserIDList)
if !g.config.RpcConfig.EnableHistoryForNewMembers {
conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, groupID)
maxSeq, err := g.msgRpcClient.GetConversationMaxSeq(ctx, conversationID)
if err != nil {
return
return err
}
if _, err = g.msgRpcClient.SetUserConversationsMinSeq(ctx, &msg.SetUserConversationsMinSeqReq{
UserIDs: entrantUserID,
ConversationID: conversationID,
Seq: maxSeq,
}); err != nil {
return err
}
tips := &sdkws.MemberInvitedTips{Group: group, InvitedUserList: users}
err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID)
g.setVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID)
g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.MemberInvitedNotification, tips)
}
func (g *GroupNotificationSender) MemberEnterNotification(ctx context.Context, groupID string, entrantUserID string) {
var err error
defer func() {
if err := g.conversationRpcClient.GroupChatFirstCreateConversation(ctx, groupID, entrantUserID); err != nil {
return err
}
opUserID := mcontext.GetOpUserID(ctx)
var opUser *sdkws.GroupMemberFullInfo
if authverify.IsAppManagerUid(ctx, g.config.Share.IMAdminUserID) {
opUser = &sdkws.GroupMemberFullInfo{
GroupID: groupID,
UserID: opUserID,
AppMangerLevel: constant.AppAdmin,
}
} else {
users, err := g.getGroupMembers(ctx, groupID, []string{opUserID})
if err != nil {
log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err)
return err
}
if len(users) == 0 {
opUser = &sdkws.GroupMemberFullInfo{
GroupID: groupID,
UserID: opUserID,
}
} else {
opUser = users[0]
}
}
}()
var group *sdkws.GroupInfo
group, err = g.getGroupInfo(ctx, groupID)
if err != nil {
return
return err
}
var user *sdkws.GroupMemberFullInfo
user, err = g.getGroupMember(ctx, groupID, entrantUserID)
users, err := g.getGroupMembers(ctx, groupID, entrantUserID)
if err != nil {
return
return err
}
tips := &sdkws.MemberEnterTips{Group: group, EntrantUser: user}
tips := &sdkws.MemberInvitedTips{Group: group, InvitedUserList: users, OpUser: opUser}
g.setVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID)
g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.MemberEnterNotification, tips)
g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.MemberInvitedNotification, tips)
return nil
}
func (g *GroupNotificationSender) GroupDismissedNotification(ctx context.Context, tips *sdkws.GroupDismissedTips) {

@ -53,3 +53,12 @@ func (m *msgServer) GetMsgByConversationIDs(ctx context.Context, req *pbmsg.GetM
}
return &pbmsg.GetMsgByConversationIDsResp{MsgDatas: Msgs}, nil
}
func (m *msgServer) SetUserConversationsMinSeq(ctx context.Context, req *pbmsg.SetUserConversationsMinSeqReq) (*pbmsg.SetUserConversationsMinSeqResp, error) {
for _, userID := range req.UserIDs {
if err := m.MsgDatabase.SetUserConversationsMinSeqs(ctx, userID, map[string]int64{req.ConversationID: req.Seq}); err != nil {
return nil, err
}
}
return &pbmsg.SetUserConversationsMinSeqResp{}, nil
}

@ -17,6 +17,11 @@ package user
import (
"context"
"errors"
"math/rand"
"strings"
"sync"
"time"
"github.com/openimsdk/open-im-server/v3/internal/rpc/relation"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
@ -29,10 +34,6 @@ import (
"github.com/openimsdk/protocol/group"
friendpb "github.com/openimsdk/protocol/relation"
"github.com/openimsdk/tools/db/redisutil"
"math/rand"
"strings"
"sync"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
@ -147,41 +148,35 @@ func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserI
return nil, err
}
s.friendNotificationSender.UserInfoUpdatedNotification(ctx, req.UserInfo.UserID)
//friends, err := s.friendRpcClient.GetFriendIDs(ctx, req.UserInfo.UserID)
//if err != nil {
// return nil, err
//}
//if req.UserInfo.Nickname != "" || req.UserInfo.FaceURL != "" {
// if err = s.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID,oldUser); err != nil {
// return nil, err
// }
//}
//for _, friendID := range friends {
// s.friendNotificationSender.FriendInfoUpdatedNotification(ctx, req.UserInfo.UserID, friendID)
//}
s.webhookAfterUpdateUserInfo(ctx, &s.config.WebhooksConfig.AfterUpdateUserInfo, req)
if err = s.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID, oldUser); err != nil {
return nil, err
}
return resp, nil
}
func (s *userServer) UpdateUserInfoEx(ctx context.Context, req *pbuser.UpdateUserInfoExReq) (resp *pbuser.UpdateUserInfoExResp, err error) {
resp = &pbuser.UpdateUserInfoExResp{}
err = authverify.CheckAccessV3(ctx, req.UserInfo.UserID, s.config.Share.IMAdminUserID)
if err != nil {
return nil, err
}
if err = s.webhookBeforeUpdateUserInfoEx(ctx, &s.config.WebhooksConfig.BeforeUpdateUserInfoEx, req); err != nil {
return nil, err
}
oldUser, err := s.db.GetUserByID(ctx, req.UserInfo.UserID)
if err != nil {
return nil, err
}
data := convert.UserPb2DBMapEx(req.UserInfo)
if err = s.db.UpdateByMap(ctx, req.UserInfo.UserID, data); err != nil {
return nil, err
}
s.friendNotificationSender.UserInfoUpdatedNotification(ctx, req.UserInfo.UserID)
//friends, err := s.friendRpcClient.GetFriendIDs(ctx, req.UserInfo.UserID)
//if err != nil {
@ -199,6 +194,7 @@ func (s *userServer) UpdateUserInfoEx(ctx context.Context, req *pbuser.UpdateUse
if err := s.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID, oldUser); err != nil {
return nil, err
}
return resp, nil
}
func (s *userServer) SetGlobalRecvMessageOpt(ctx context.Context, req *pbuser.SetGlobalRecvMessageOptReq) (resp *pbuser.SetGlobalRecvMessageOptResp, err error) {

@ -0,0 +1 @@
package apistruct

@ -18,7 +18,9 @@ const (
CallbackBeforeInviteJoinGroupCommand = "callbackBeforeInviteJoinGroupCommand"
CallbackAfterJoinGroupCommand = "callbackAfterJoinGroupCommand"
CallbackAfterSetGroupInfoCommand = "callbackAfterSetGroupInfoCommand"
CallbackAfterSetGroupInfoEXCommand = "callbackAfterSetGroupInfoCommandEX"
CallbackBeforeSetGroupInfoCommand = "callbackBeforeSetGroupInfoCommand"
CallbackBeforeSetGroupInfoEXCommand = "callbackBeforeSetGroupInfoEXCommand"
CallbackAfterRevokeMsgCommand = "callbackBeforeAfterMsgCommand"
CallbackBeforeAddBlackCommand = "callbackBeforeAddBlackCommand"
CallbackAfterAddFriendCommand = "callbackAfterAddFriendCommand"

@ -17,6 +17,7 @@ package callbackstruct
import (
"github.com/openimsdk/open-im-server/v3/pkg/apistruct"
common "github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/protocol/wrapperspb"
)
type CallbackCommand string
@ -242,3 +243,48 @@ type CallbackAfterSetGroupInfoReq struct {
type CallbackAfterSetGroupInfoResp struct {
CommonCallbackResp
}
type CallbackBeforeSetGroupInfoEXReq struct {
CallbackCommand `json:"callbackCommand"`
OperationID string `json:"operationID"`
GroupID string `json:"groupID"`
GroupName string `json:"groupName"`
Notification *wrapperspb.StringValue `json:"notification"`
Introduction *wrapperspb.StringValue `json:"introduction"`
FaceURL *wrapperspb.StringValue `json:"faceURL"`
Ex *wrapperspb.StringValue `json:"ex"`
NeedVerification *wrapperspb.Int32Value `json:"needVerification"`
LookMemberInfo *wrapperspb.Int32Value `json:"lookMemberInfo"`
ApplyMemberFriend *wrapperspb.Int32Value `json:"applyMemberFriend"`
}
type CallbackBeforeSetGroupInfoEXResp struct {
CommonCallbackResp
GroupID string `json:"groupID"`
GroupName string `json:"groupName"`
Notification *wrapperspb.StringValue `json:"notification"`
Introduction *wrapperspb.StringValue `json:"introduction"`
FaceURL *wrapperspb.StringValue `json:"faceURL"`
Ex *wrapperspb.StringValue `json:"ex"`
NeedVerification *wrapperspb.Int32Value `json:"needVerification"`
LookMemberInfo *wrapperspb.Int32Value `json:"lookMemberInfo"`
ApplyMemberFriend *wrapperspb.Int32Value `json:"applyMemberFriend"`
}
type CallbackAfterSetGroupInfoEXReq struct {
CallbackCommand `json:"callbackCommand"`
OperationID string `json:"operationID"`
GroupID string `json:"groupID"`
GroupName string `json:"groupName"`
Notification *wrapperspb.StringValue `json:"notification"`
Introduction *wrapperspb.StringValue `json:"introduction"`
FaceURL *wrapperspb.StringValue `json:"faceURL"`
Ex *wrapperspb.StringValue `json:"ex"`
NeedVerification *wrapperspb.Int32Value `json:"needVerification"`
LookMemberInfo *wrapperspb.Int32Value `json:"lookMemberInfo"`
ApplyMemberFriend *wrapperspb.Int32Value `json:"applyMemberFriend"`
}
type CallbackAfterSetGroupInfoEXResp struct {
CommonCallbackResp
}

@ -259,6 +259,7 @@ type Group struct {
Ports []int `mapstructure:"ports"`
} `mapstructure:"rpc"`
Prometheus Prometheus `mapstructure:"prometheus"`
EnableHistoryForNewMembers bool `mapstructure:"enableHistoryForNewMembers"`
}
type Msg struct {
@ -421,6 +422,8 @@ type Webhooks struct {
BeforeInviteUserToGroup BeforeConfig `mapstructure:"beforeInviteUserToGroup"`
AfterSetGroupInfo AfterConfig `mapstructure:"afterSetGroupInfo"`
BeforeSetGroupInfo BeforeConfig `mapstructure:"beforeSetGroupInfo"`
AfterSetGroupInfoEX AfterConfig `mapstructure:"afterSetGroupInfoEX"`
BeforeSetGroupInfoEX BeforeConfig `mapstructure:"beforeSetGroupInfoEX"`
AfterRevokeMsg AfterConfig `mapstructure:"afterRevokeMsg"`
BeforeAddBlack BeforeConfig `mapstructure:"beforeAddBlack"`
AfterAddFriend AfterConfig `mapstructure:"afterAddFriend"`

@ -8,6 +8,7 @@ import (
"github.com/openimsdk/tools/log"
"github.com/redis/go-redis/v9"
"strconv"
"strings"
"time"
)
@ -66,11 +67,10 @@ func (s *userOnline) SetUserOnline(ctx context.Context, userID string, online, o
local change = (num1 ~= num2) or (num2 ~= num3)
if change then
local members = redis.call("ZRANGE", key, 0, -1)
table.insert(members, KEYS[2])
redis.call("PUBLISH", KEYS[3], table.concat(members, ":"))
return 1
table.insert(members, "1")
return members
else
return 0
return {"0"}
end
`
now := time.Now()
@ -82,12 +82,24 @@ func (s *userOnline) SetUserOnline(ctx context.Context, userID string, online, o
for _, platformID := range online {
argv = append(argv, platformID)
}
keys := []string{s.getUserOnlineKey(userID), userID, s.channelName}
status, err := s.rdb.Eval(ctx, script, keys, argv).Result()
keys := []string{s.getUserOnlineKey(userID)}
platformIDs, err := s.rdb.Eval(ctx, script, keys, argv).StringSlice()
if err != nil {
log.ZError(ctx, "redis SetUserOnline", err, "userID", userID, "online", online, "offline", offline)
return err
}
log.ZDebug(ctx, "redis SetUserOnline", "userID", userID, "online", online, "offline", offline, "status", status)
if len(platformIDs) == 0 {
return errs.ErrInternalServer.WrapMsg("SetUserOnline redis lua invalid return value")
}
if platformIDs[len(platformIDs)-1] != "0" {
log.ZDebug(ctx, "redis SetUserOnline push", "userID", userID, "online", online, "offline", offline, "platformIDs", platformIDs[:len(platformIDs)-1])
platformIDs[len(platformIDs)-1] = userID
msg := strings.Join(platformIDs, ":")
if err := s.rdb.Publish(ctx, s.channelName, msg).Err(); err != nil {
return errs.Wrap(err)
}
} else {
log.ZDebug(ctx, "redis SetUserOnline not push", "userID", userID, "online", online, "offline", offline)
}
return nil
}

@ -0,0 +1,51 @@
package redis
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/db/redisutil"
"testing"
"time"
)
/*
address: [ 172.16.8.48:7001, 172.16.8.48:7002, 172.16.8.48:7003, 172.16.8.48:7004, 172.16.8.48:7005, 172.16.8.48:7006 ]
username:
password: passwd123
clusterMode: true
db: 0
maxRetry: 10
*/
func TestName111111(t *testing.T) {
conf := config.Redis{
Address: []string{
"172.16.8.124:7001",
"172.16.8.124:7002",
"172.16.8.124:7003",
"172.16.8.124:7004",
"172.16.8.124:7005",
"172.16.8.124:7006",
},
ClusterMode: true,
Password: "passwd123",
//Address: []string{"localhost:16379"},
//Password: "openIM123",
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1000)
defer cancel()
rdb, err := redisutil.NewRedisClient(ctx, conf.Build())
if err != nil {
panic(err)
}
online := NewUserOnline(rdb)
userID := "a123456"
t.Log(online.GetOnline(ctx, userID))
t.Log(online.SetUserOnline(ctx, userID, []int32{1, 2, 3, 4}, nil))
t.Log(online.GetOnline(ctx, userID))
}
func TestName111(t *testing.T) {
}

@ -74,17 +74,22 @@ func (s *seqUserCacheRedis) GetUserReadSeq(ctx context.Context, conversationID s
}
func (s *seqUserCacheRedis) SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
if seq%s.readSeqWriteRatio == 0 {
if err := s.mgo.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil {
dbSeq, err := s.GetUserReadSeq(ctx, conversationID, userID)
if err != nil {
return err
}
}
if dbSeq < seq {
if err := s.rocks.RawSet(ctx, s.getSeqUserReadSeqKey(conversationID, userID), strconv.Itoa(int(seq)), s.readExpireTime); err != nil {
return errs.Wrap(err)
}
}
return nil
}
func (s *seqUserCacheRedis) SetUserReadSeqToDB(ctx context.Context, conversationID string, userID string, seq int64) error {
return s.mgo.SetUserReadSeq(ctx, conversationID, userID, seq)
}
func (s *seqUserCacheRedis) SetUserMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error {
keys := make([]string, 0, len(seqs))
for conversationID, seq := range seqs {
@ -128,13 +133,6 @@ func (s *seqUserCacheRedis) SetUserReadSeqs(ctx context.Context, userID string,
if err := s.setUserRedisReadSeqs(ctx, userID, seqs); err != nil {
return err
}
for conversationID, seq := range seqs {
if seq%s.readSeqWriteRatio == 0 {
if err := s.mgo.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil {
return err
}
}
}
return nil
}

@ -9,6 +9,7 @@ type SeqUser interface {
SetUserMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error
GetUserReadSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error
SetUserReadSeqToDB(ctx context.Context, conversationID string, userID string, seq int64) error
SetUserMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error
SetUserReadSeqs(ctx context.Context, userID string, seqs map[string]int64) error
GetUserReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error)

@ -77,6 +77,7 @@ type CommonMsgDatabase interface {
SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) (err error)
SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error
SetHasReadSeqToDB(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error
GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error)
GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error)
UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error
@ -808,6 +809,10 @@ func (db *commonMsgDatabase) SetHasReadSeq(ctx context.Context, userID string, c
return db.seqUser.SetUserReadSeq(ctx, conversationID, userID, hasReadSeq)
}
func (db *commonMsgDatabase) SetHasReadSeqToDB(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error {
return db.seqUser.SetUserReadSeqToDB(ctx, conversationID, userID, hasReadSeq)
}
func (db *commonMsgDatabase) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) {
return db.seqUser.GetUserReadSeqs(ctx, userID, conversationIDs)
}

@ -115,5 +115,12 @@ func (s *seqUserMongo) GetUserReadSeqs(ctx context.Context, userID string, conve
}
func (s *seqUserMongo) SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
dbSeq, err := s.GetUserReadSeq(ctx, conversationID, userID)
if err != nil {
return err
}
if dbSeq > seq {
return nil
}
return s.setSeq(ctx, conversationID, userID, seq, "read_seq")
}

@ -77,6 +77,11 @@ func (c *ConversationRpcClient) SetConversationMaxSeq(ctx context.Context, owner
return err
}
func (c *ConversationRpcClient) SetConversationMinSeq(ctx context.Context, ownerUserIDs []string, conversationID string, minSeq int64) error {
_, err := c.Client.SetConversationMinSeq(ctx, &pbconversation.SetConversationMinSeqReq{OwnerUserID: ownerUserIDs, ConversationID: conversationID, MinSeq: minSeq})
return err
}
func (c *ConversationRpcClient) SetConversations(ctx context.Context, userIDs []string, conversation *pbconversation.ConversationReq) error {
_, err := c.Client.SetConversations(ctx, &pbconversation.SetConversationsReq{UserIDs: userIDs, Conversation: conversation})
return err

@ -159,6 +159,15 @@ func (m *MessageRpcClient) SendMsg(ctx context.Context, req *msg.SendMsgReq) (*m
return resp, nil
}
// SetUserConversationsMinSeq set min seq
func (m *MessageRpcClient) SetUserConversationsMinSeq(ctx context.Context, req *msg.SetUserConversationsMinSeqReq) (*msg.SetUserConversationsMinSeqResp, error) {
resp, err := m.Client.SetUserConversationsMinSeq(ctx, req)
if err != nil {
return nil, err
}
return resp, nil
}
// GetMaxSeq retrieves the maximum sequence number from the gRPC client.
// Errors during the gRPC call are wrapped to provide additional context.
func (m *MessageRpcClient) GetMaxSeq(ctx context.Context, req *sdkws.GetMaxSeqReq) (*sdkws.GetMaxSeqResp, error) {
@ -346,7 +355,9 @@ func (s *NotificationSender) send(ctx context.Context, sendID, recvID string, co
}
func (s *NotificationSender) NotificationWithSessionType(ctx context.Context, sendID, recvID string, contentType, sessionType int32, m proto.Message, opts ...NotificationOptions) {
s.queue.Push(func() { s.send(ctx, sendID, recvID, contentType, sessionType, m, opts...) })
if err := s.queue.Push(func() { s.send(ctx, sendID, recvID, contentType, sessionType, m, opts...) }); err != nil {
log.ZWarn(ctx, "Push to queue failed", err, "sendID", sendID, "recvID", recvID, "msg", jsonutil.StructToJsonString(m))
}
}
func (s *NotificationSender) Notification(ctx context.Context, sendID, recvID string, contentType int32, m proto.Message, opts ...NotificationOptions) {

@ -3,7 +3,7 @@ serviceBinaries:
openim-crontask: 1
openim-rpc-user: 1
openim-msggateway: 1
openim-push: 1
openim-push: 4
openim-msgtransfer: 4
openim-rpc-conversation: 1
openim-rpc-auth: 1

Loading…
Cancel
Save