Merge branch 'seq38' into online

# Conflicts:
#	go.mod
#	go.sum
#	internal/tools/cron_task.go
#	pkg/common/config/config.go
pull/2393/head
withchao 1 year ago
commit f315177a93

@ -43,7 +43,7 @@ COPY --from=builder $SERVER_DIR/start-config.yml $SERVER_DIR/
COPY --from=builder $SERVER_DIR/go.mod $SERVER_DIR/
COPY --from=builder $SERVER_DIR/go.sum $SERVER_DIR/
RUN go get github.com/openimsdk/gomake@v0.0.13
RUN go get github.com/openimsdk/gomake@v0.0.14-alpha.5
# Set the command to run when the container starts
ENTRYPOINT ["sh", "-c", "mage start && tail -f /dev/null"]

@ -1,2 +1,3 @@
chatRecordsClearTime: "0 2 * * *"
cronExecuteTime: "0 2 * * *"
retainChatRecords: 365
fileExpireTime: 90

@ -13,6 +13,9 @@ afterUpdateUserInfoEx:
afterSendSingleMsg:
enable: false
timeout: 5
# Only the senID/recvID specified in attentionIds will send the callback
# if not set, all user messages will be callback
attentionIds: []
beforeSendGroupMsg:
enable: false
timeout: 5

@ -14,7 +14,7 @@ require (
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect
github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.69-alpha.24
github.com/openimsdk/tools v0.0.49-alpha.25
github.com/openimsdk/tools v0.0.49-alpha.39
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0
github.com/stretchr/testify v1.9.0
@ -35,7 +35,7 @@ require (
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/kelindar/bitmap v1.5.2
github.com/likexian/gokit v0.25.13
github.com/openimsdk/gomake v0.0.13
github.com/openimsdk/gomake v0.0.14-alpha.5
github.com/redis/go-redis/v9 v9.4.0
github.com/robfig/cron/v3 v3.0.1
github.com/shirou/gopsutil v3.21.11+incompatible
@ -176,5 +176,3 @@ require (
golang.org/x/crypto v0.21.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
)
//replace github.com/openimsdk/protocol => /Users/chao/Desktop/project/protocol

@ -268,12 +268,12 @@ github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
github.com/openimsdk/gomake v0.0.13 h1:xLDe/moqgWpRoptHzI4packAWzs4C16b+sVY+txNJp0=
github.com/openimsdk/gomake v0.0.13/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.69-alpha.24 h1:TYcNJeWOTuE40UQ54eNPdDdy0KTOh9rAOgax8lCyhDc=
github.com/openimsdk/protocol v0.0.69-alpha.24/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/tools v0.0.49-alpha.25 h1:OpRPwDZ2xWX7Zj5kyfZhryu/NfZTrsRVr2GFwu1HQHI=
github.com/openimsdk/tools v0.0.49-alpha.25/go.mod h1:rwsFI1G/nBHNfiNapbven41akRDPBbH4df0Cgy6xueU=
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.69-alpha.22 h1:kifZWVNDkg9diXFJUJ/Q9xFc80cveBhc+1dUXcE9xHQ=
github.com/openimsdk/protocol v0.0.69-alpha.22/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/tools v0.0.49-alpha.39 h1:bl5+q7xHsc/j1NnkN8/gYmn23RsNNbRizDY58d2EY1w=
github.com/openimsdk/tools v0.0.49-alpha.39/go.mod h1:zc0maZ2ohXlHd0ylY5JnCE8uqq/hslhcfcKa6iO5PCU=
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=

@ -67,6 +67,7 @@ func (o *GroupApi) GetGroupUsersReqApplicationList(c *gin.Context) {
func (o *GroupApi) GetGroupsInfo(c *gin.Context) {
a2r.Call(group.GroupClient.GetGroupsInfo, o.Client, c)
//a2r.Call(group.GroupClient.GetGroupsInfo, o.Client, c, a2r.NewNilReplaceOption(group.GroupClient.GetGroupsInfo))
}
func (o *GroupApi) KickGroupMember(c *gin.Context) {
@ -75,6 +76,7 @@ func (o *GroupApi) KickGroupMember(c *gin.Context) {
func (o *GroupApi) GetGroupMembersInfo(c *gin.Context) {
a2r.Call(group.GroupClient.GetGroupMembersInfo, o.Client, c)
//a2r.Call(group.GroupClient.GetGroupMembersInfo, o.Client, c, a2r.NewNilReplaceOption(group.GroupClient.GetGroupMembersInfo))
}
func (o *GroupApi) GetGroupMemberList(c *gin.Context) {

@ -101,6 +101,7 @@ func (m MessageApi) newUserSendMsgReq(_ *gin.Context, params *apistruct.SendMsg)
SendTime: params.SendTime,
Options: options,
OfflinePushInfo: params.OfflinePushInfo,
Ex: params.Ex,
},
}
return &pbData

@ -215,7 +215,7 @@ func (c *ConsumerHandler) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.
}
func (c *ConsumerHandler) Push2Group(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) {
log.ZDebug(ctx, "Get super group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID)
log.ZDebug(ctx, "Get group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID)
var pushToUserIDs []string
if err = c.webhookBeforeGroupOnlinePush(ctx, &c.config.WebhooksConfig.BeforeGroupOnlinePush, groupID, msg,
&pushToUserIDs); err != nil {

@ -61,7 +61,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
userRpcClient: &userRpcClient,
RegisterCenter: client,
authDatabase: controller.NewAuthDatabase(
redis2.NewTokenCacheModel(rdb),
redis2.NewTokenCacheModel(rdb, config.RpcConfig.TokenPolicy.Expire),
config.Share.Secret,
config.RpcConfig.TokenPolicy.Expire,
),

@ -17,17 +17,18 @@ package group
import (
"context"
"fmt"
"math/big"
"math/rand"
"strconv"
"strings"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/common"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
"math/big"
"math/rand"
"strconv"
"strings"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
@ -531,6 +532,14 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbgroup.KickGrou
if datautil.Contain(opUserID, req.KickedUserIDs...) {
return nil, errs.ErrArgs.WrapMsg("opUserID in KickedUserIDs")
}
owner, err := s.db.TakeGroupOwner(ctx, req.GroupID)
if err != nil {
return nil, err
}
if datautil.Contain(owner.UserID, req.KickedUserIDs...) {
return nil, errs.ErrArgs.WrapMsg("ownerUID can not Kick")
}
members, err := s.db.FindGroupMembers(ctx, req.GroupID, append(req.KickedUserIDs, opUserID))
if err != nil {
return nil, err
@ -590,7 +599,7 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbgroup.KickGrou
FaceURL: group.FaceURL,
OwnerUserID: ownerUserID,
CreateTime: group.CreateTime.UnixMilli(),
MemberCount: num,
MemberCount: num - uint32(len(req.KickedUserIDs)),
Ex: group.Ex,
Status: group.Status,
CreatorUserID: group.CreatorUserID,

@ -36,6 +36,12 @@ import (
"github.com/openimsdk/tools/utils/stringutil"
)
// GroupApplicationReceiver
const (
applicantReceiver = iota
adminReceiver
)
func NewGroupNotificationSender(db controller.GroupDatabase, msgRpcClient *rpcclient.MessageRpcClient, userRpcClient *rpcclient.UserRpcClient, config *Config, fn func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error)) *GroupNotificationSender {
return &GroupNotificationSender{
NotificationSender: rpcclient.NewNotificationSender(&config.NotificationConfig, rpcclient.WithRpcClient(msgRpcClient), rpcclient.WithUserRpcClient(userRpcClient)),
@ -418,15 +424,17 @@ func (g *GroupNotificationSender) GroupApplicationAcceptedNotification(ctx conte
if err != nil {
return
}
tips := &sdkws.GroupApplicationAcceptedTips{Group: group, HandleMsg: req.HandledMsg}
if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
var opUser *sdkws.GroupMemberFullInfo
if err = g.fillOpUser(ctx, &opUser, group.GroupID); err != nil {
return
}
for _, userID := range append(userIDs, req.FromUserID) {
tips := &sdkws.GroupApplicationAcceptedTips{Group: group, OpUser: opUser, HandleMsg: req.HandledMsg}
if userID == req.FromUserID {
tips.ReceiverAs = 0
tips.ReceiverAs = applicantReceiver
} else {
tips.ReceiverAs = 1
tips.ReceiverAs = adminReceiver
}
g.Notification(ctx, mcontext.GetOpUserID(ctx), userID, constant.GroupApplicationAcceptedNotification, tips)
}
@ -449,15 +457,17 @@ func (g *GroupNotificationSender) GroupApplicationRejectedNotification(ctx conte
if err != nil {
return
}
tips := &sdkws.GroupApplicationRejectedTips{Group: group, HandleMsg: req.HandledMsg}
if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
var opUser *sdkws.GroupMemberFullInfo
if err = g.fillOpUser(ctx, &opUser, group.GroupID); err != nil {
return
}
for _, userID := range append(userIDs, req.FromUserID) {
tips := &sdkws.GroupApplicationAcceptedTips{Group: group, OpUser: opUser, HandleMsg: req.HandledMsg}
if userID == req.FromUserID {
tips.ReceiverAs = 0
tips.ReceiverAs = applicantReceiver
} else {
tips.ReceiverAs = 1
tips.ReceiverAs = adminReceiver
}
g.Notification(ctx, mcontext.GetOpUserID(ctx), userID, constant.GroupApplicationRejectedNotification, tips)
}

@ -82,6 +82,9 @@ func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgrou
}
return false
})
if vl.LogLen > 0 {
hasGroupUpdate = true
}
return vl, nil
},
CacheMaxVersion: s.db.FindMaxGroupMemberVersionCache,

@ -83,6 +83,11 @@ func (m *msgServer) webhookAfterSendSingleMsg(ctx context.Context, after *config
if msg.MsgData.ContentType == constant.Typing {
return
}
// According to the attentionIds configuration, only some users are sent
attentionIds := after.AttentionIds
if attentionIds != nil && !datautil.Contain(msg.MsgData.RecvID, attentionIds...) && !datautil.Contain(msg.MsgData.SendID, attentionIds...) {
return
}
cbReq := &cbapi.CallbackAfterSendSingleMsgReq{
CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand),
RecvID: msg.MsgData.RecvID,

@ -16,13 +16,15 @@ package msg
import (
"context"
"github.com/openimsdk/tools/errs"
"github.com/redis/go-redis/v9"
pbmsg "github.com/openimsdk/protocol/msg"
)
func (m *msgServer) GetConversationMaxSeq(ctx context.Context, req *pbmsg.GetConversationMaxSeqReq) (*pbmsg.GetConversationMaxSeqResp, error) {
maxSeq, err := m.MsgDatabase.GetMaxSeq(ctx, req.ConversationID)
if err != nil {
if err != nil && errs.Unwrap(err) != redis.Nil {
return nil, err
}
return &pbmsg.GetConversationMaxSeqResp{MaxSeq: maxSeq}, nil

@ -19,13 +19,17 @@ import (
"encoding/base64"
"encoding/hex"
"encoding/json"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"path"
"strconv"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"go.mongodb.org/mongo-driver/mongo"
"github.com/google/uuid"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/protocol/third"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
@ -283,6 +287,52 @@ func (t *thirdServer) apiAddress(prefix, name string) string {
return prefix + name
}
func (t *thirdServer) DeleteOutdatedData(ctx context.Context, req *third.DeleteOutdatedDataReq) (*third.DeleteOutdatedDataResp, error) {
var conf config.Third
expireTime := time.UnixMilli(req.ExpireTime)
findPagination := &sdkws.RequestPagination{
PageNumber: 1,
ShowNumber: 1000,
}
for {
total, models, err := t.s3dataBase.FindByExpires(ctx, expireTime, findPagination)
if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments {
return nil, errs.Wrap(err)
}
needDelObjectKeys := make([]string, 0)
for _, model := range models {
needDelObjectKeys = append(needDelObjectKeys, model.Key)
}
needDelObjectKeys = datautil.Distinct(needDelObjectKeys)
for _, key := range needDelObjectKeys {
count, err := t.s3dataBase.FindNotDelByS3(ctx, key, expireTime)
if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments {
return nil, errs.Wrap(err)
}
if int(count) < 1 {
thumbnailKey, err := t.s3dataBase.GetImageThumbnailKey(ctx, key)
if err != nil {
return nil, errs.Wrap(err)
}
t.s3dataBase.DeleteObject(ctx, thumbnailKey)
t.s3dataBase.DelS3Key(ctx, conf.Object.Enable, needDelObjectKeys...)
t.s3dataBase.DeleteObject(ctx, key)
}
}
for _, model := range models {
err := t.s3dataBase.DeleteSpecifiedData(ctx, model.Engine, model.Name)
if err != nil {
return nil, errs.Wrap(err)
}
}
if total < int64(findPagination.ShowNumber) {
break
}
}
return &third.DeleteOutdatedDataResp{}, nil
}
type FormDataMate struct {
Name string `json:"name"`
Size int64 `json:"size"`

@ -62,6 +62,11 @@ type userServer struct {
webhookClient *webhook.Client
}
func (s *userServer) SetUserOnlineStatus(ctx context.Context, req *pbuser.SetUserOnlineStatusReq) (*pbuser.SetUserOnlineStatusResp, error) {
//TODO implement me
panic("implement me")
}
type Config struct {
RpcConfig config.User
RedisConfig config.Redis

@ -20,6 +20,7 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/third"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/mw"
"google.golang.org/grpc"
@ -39,7 +40,7 @@ type CronTaskConfig struct {
}
func Start(ctx context.Context, config *CronTaskConfig) error {
log.CInfo(ctx, "CRON-TASK server is initializing", "chatRecordsClearTime", config.CronTask.ChatRecordsClearTime, "msgDestructTime", config.CronTask.RetainChatRecords)
log.CInfo(ctx, "CRON-TASK server is initializing", "chatRecordsClearTime", config.CronTask.CronExecuteTime, "msgDestructTime", config.CronTask.RetainChatRecords)
if config.CronTask.RetainChatRecords < 1 {
return errs.New("msg destruct time must be greater than 1").Wrap()
}
@ -66,16 +67,31 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
}
log.ZInfo(ctx, "cron clear chat records success", "deltime", deltime, "cont", time.Since(now))
}
delFile := func() {
if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, clearFunc); err != nil {
return errs.Wrap(err)
}
tConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Third)
if err != nil {
return err
}
if _, err := crontab.AddFunc(config.CronTask.ChatRecordsClearTime, clearFunc); err != nil {
return errs.Wrap(err)
thirdClient := third.NewThirdClient(tConn)
deleteFunc := func() {
now := time.Now()
deleteTime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.FileExpireTime))
ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deleteTime.UnixMilli()))
log.ZInfo(ctx, "deleteoutDatedData ", "deletetime", deleteTime, "timestamp", deleteTime.UnixMilli())
if _, err := thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: deleteTime.UnixMilli()}); err != nil {
log.ZError(ctx, "cron deleteoutDatedData failed", err, "deleteTime", deleteTime, "cont", time.Since(now))
return
}
log.ZInfo(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(now))
}
if _, err := crontab.AddFunc(config.CronTask.ChatRecordsClearTime, delFile); err != nil {
if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, deleteFunc); err != nil {
return errs.Wrap(err)
}
log.ZInfo(ctx, "start cron task", "chatRecordsClearTime", config.CronTask.ChatRecordsClearTime)
log.ZInfo(ctx, "start cron task", "CronExecuteTime", config.CronTask.CronExecuteTime)
crontab.Start()
<-ctx.Done()
return nil

@ -15,7 +15,7 @@
package apistruct
import (
sdkws "github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/protocol/sdkws"
)
// SendMsg defines the structure for sending messages with various metadata.
@ -55,6 +55,9 @@ type SendMsg struct {
// OfflinePushInfo contains information for offline push notifications.
OfflinePushInfo *sdkws.OfflinePushInfo `json:"offlinePushInfo"`
// Ex stores extended fields
Ex string `json:"ex"`
}
// SendMsgReq extends SendMsg with the requirement of RecvID when SessionType indicates a one-on-one or notification chat.

@ -105,9 +105,9 @@ type API struct {
}
type CronTask struct {
ChatRecordsClearTime string `mapstructure:"chatRecordsClearTime"`
RetainChatRecords int `mapstructure:"retainChatRecords"`
FileTime int
CronExecuteTime string `mapstructure:"cronExecuteTime"`
RetainChatRecords int `mapstructure:"retainChatRecords"`
FileExpireTime int `mapstructure:"fileExpireTime"`
}
type OfflinePushConfig struct {
@ -340,8 +340,9 @@ type BeforeConfig struct {
}
type AfterConfig struct {
Enable bool `mapstructure:"enable"`
Timeout int `mapstructure:"timeout"`
Enable bool `mapstructure:"enable"`
Timeout int `mapstructure:"timeout"`
AttentionIds []string `mapstructure:"attentionIds"`
}
type Share struct {

@ -158,7 +158,6 @@ func Start[T any](ctx context.Context, discovery *config2.Discovery, prometheusC
}
return nil
case <-netDone:
close(netDone)
return netErr
}
}

@ -21,22 +21,36 @@ import (
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils/stringutil"
"github.com/redis/go-redis/v9"
"time"
)
type tokenCache struct {
rdb redis.UniversalClient
rdb redis.UniversalClient
accessExpire time.Duration
}
func NewTokenCacheModel(rdb redis.UniversalClient) cache.TokenModel {
return &tokenCache{
rdb: rdb,
}
func NewTokenCacheModel(rdb redis.UniversalClient, accessExpire int64) cache.TokenModel {
c := &tokenCache{rdb: rdb}
c.accessExpire = c.getExpireTime(accessExpire)
return c
}
func (c *tokenCache) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error {
func (c *tokenCache) SetTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error {
return errs.Wrap(c.rdb.HSet(ctx, cachekey.GetTokenKey(userID, platformID), token, flag).Err())
}
// SetTokenFlagEx set token and flag with expire time
func (c *tokenCache) SetTokenFlagEx(ctx context.Context, userID string, platformID int, token string, flag int) error {
key := cachekey.GetTokenKey(userID, platformID)
if err := c.rdb.HSet(ctx, key, token, flag).Err(); err != nil {
return errs.Wrap(err)
}
if err := c.rdb.Expire(ctx, key, c.accessExpire).Err(); err != nil {
return errs.Wrap(err)
}
return nil
}
func (c *tokenCache) GetTokensWithoutError(ctx context.Context, userID string, platformID int) (map[string]int, error) {
m, err := c.rdb.HGetAll(ctx, cachekey.GetTokenKey(userID, platformID)).Result()
if err != nil {
@ -61,3 +75,7 @@ func (c *tokenCache) SetTokenMapByUidPid(ctx context.Context, userID string, pla
func (c *tokenCache) DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error {
return errs.Wrap(c.rdb.HDel(ctx, cachekey.GetTokenKey(userID, platformID), fields...).Err())
}
func (c *tokenCache) getExpireTime(t int64) time.Duration {
return time.Hour * 24 * time.Duration(t)
}

@ -5,7 +5,9 @@ import (
)
type TokenModel interface {
AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error
SetTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error
// SetTokenFlagEx set token and flag with expire time
SetTokenFlagEx(ctx context.Context, userID string, platformID int, token string, flag int) error
GetTokensWithoutError(ctx context.Context, userID string, platformID int) (map[string]int, error)
SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error
DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error

@ -55,6 +55,7 @@ func (a *authDatabase) SetTokenMapByUidPid(ctx context.Context, userID string, p
// Create Token.
func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformID int) (string, error) {
isCreate := true // flag is create or update
tokens, err := a.cache.GetTokensWithoutError(ctx, userID, platformID)
if err != nil {
return "", err
@ -65,6 +66,9 @@ func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformI
if err != nil || v != constant.NormalToken {
deleteTokenKey = append(deleteTokenKey, k)
}
if v == constant.NormalToken {
isCreate = false
}
}
if len(deleteTokenKey) != 0 {
err = a.cache.DeleteTokenByUidPid(ctx, userID, platformID, deleteTokenKey)
@ -79,5 +83,17 @@ func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformI
if err != nil {
return "", errs.WrapMsg(err, "token.SignedString")
}
return tokenString, a.cache.AddTokenFlag(ctx, userID, platformID, tokenString, constant.NormalToken)
if isCreate {
// should create,should specify expiration time
if err = a.cache.SetTokenFlagEx(ctx, userID, platformID, tokenString, constant.NormalToken); err != nil {
return "", err
}
} else {
// should update
if err = a.cache.SetTokenFlag(ctx, userID, platformID, tokenString, constant.NormalToken); err != nil {
return "", err
}
}
return tokenString, nil
}

@ -335,9 +335,6 @@ func (f *friendDatabase) FindFriendsWithError(ctx context.Context, ownerUserID s
if err != nil {
return
}
if len(friends) != len(friendUserIDs) {
err = errs.ErrRecordNotFound.Wrap()
}
return
}

@ -378,6 +378,7 @@ func (g *groupDatabase) DeleteGroupMember(ctx context.Context, groupID string, u
DelGroupMembersInfo(groupID, userIDs...).
DelGroupAllRoleLevel(groupID).
DelMaxGroupMemberVersion(groupID).
DelMaxJoinGroupVersion(userIDs...).
ChainExecDel(ctx)
})
}
@ -400,10 +401,7 @@ func (g *groupDatabase) MapGroupMemberNum(ctx context.Context, groupIDs []string
func (g *groupDatabase) TransferGroupOwner(ctx context.Context, groupID string, oldOwnerUserID, newOwnerUserID string, roleLevel int32) error {
return g.ctxTx.Transaction(ctx, func(ctx context.Context) error {
if err := g.groupMemberDB.UpdateRoleLevel(ctx, groupID, oldOwnerUserID, roleLevel); err != nil {
return err
}
if err := g.groupMemberDB.UpdateRoleLevel(ctx, groupID, newOwnerUserID, constant.GroupOwner); err != nil {
if err := g.groupMemberDB.UpdateUserRoleLevels(ctx, groupID, oldOwnerUserID, roleLevel, newOwnerUserID, constant.GroupOwner); err != nil {
return err
}
c := g.cache.CloneGroupCache()

@ -16,13 +16,15 @@ package controller
import (
"context"
redis2 "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"path/filepath"
"time"
redisCache "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/tools/db/pagination"
"github.com/openimsdk/tools/s3"
"github.com/openimsdk/tools/s3/cont"
"github.com/redis/go-redis/v9"
@ -38,20 +40,28 @@ type S3Database interface {
SetObject(ctx context.Context, info *model.Object) error
StatObject(ctx context.Context, name string) (*s3.ObjectInfo, error)
FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error)
FindByExpires(ctx context.Context, duration time.Time, pagination pagination.Pagination) (total int64, objects []*model.Object, err error)
DeleteObject(ctx context.Context, name string) error
DeleteSpecifiedData(ctx context.Context, engine string, name string) error
FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error)
DelS3Key(ctx context.Context, engine string, keys ...string) error
GetImageThumbnailKey(ctx context.Context, name string) (string, error)
}
func NewS3Database(rdb redis.UniversalClient, s3 s3.Interface, obj database.ObjectInfo) S3Database {
return &s3Database{
s3: cont.New(redis2.NewS3Cache(rdb, s3), s3),
cache: redis2.NewObjectCacheRedis(rdb, obj),
db: obj,
s3: cont.New(redisCache.NewS3Cache(rdb, s3), s3),
cache: redisCache.NewObjectCacheRedis(rdb, obj),
s3cache: redisCache.NewS3Cache(rdb, s3),
db: obj,
}
}
type s3Database struct {
s3 *cont.Controller
cache cache.ObjectCache
db database.ObjectInfo
s3 *cont.Controller
cache cache.ObjectCache
s3cache cont.S3Cache
db database.ObjectInfo
}
func (s *s3Database) PartSize(ctx context.Context, size int64) (int64, error) {
@ -111,3 +121,26 @@ func (s *s3Database) StatObject(ctx context.Context, name string) (*s3.ObjectInf
func (s *s3Database) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) {
return s.s3.FormData(ctx, name, size, contentType, duration)
}
func (s *s3Database) FindByExpires(ctx context.Context, duration time.Time, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) {
return s.db.FindByExpires(ctx, duration, pagination)
}
func (s *s3Database) DeleteObject(ctx context.Context, name string) error {
return s.s3.DeleteObject(ctx, name)
}
func (s *s3Database) DeleteSpecifiedData(ctx context.Context, engine string, name string) error {
return s.db.Delete(ctx, engine, name)
}
func (s *s3Database) FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error) {
return s.db.FindNotDelByS3(ctx, key, duration)
}
func (s *s3Database) DelS3Key(ctx context.Context, engine string, keys ...string) error {
return s.s3cache.DelS3Key(ctx, engine, keys...)
}
func (s *s3Database) GetImageThumbnailKey(ctx context.Context, name string) (string, error) {
return s.s3.GetImageThumbnailKey(ctx, name)
}

@ -16,9 +16,10 @@ package controller
import (
"context"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/tools/db/pagination"

@ -25,6 +25,7 @@ type GroupMember interface {
Delete(ctx context.Context, groupID string, userIDs []string) (err error)
Update(ctx context.Context, groupID string, userID string, data map[string]any) (err error)
UpdateRoleLevel(ctx context.Context, groupID string, userID string, roleLevel int32) error
UpdateUserRoleLevels(ctx context.Context, groupID string, firstUserID string, firstUserRoleLevel int32, secondUserID string, secondUserRoleLevel int32) error
FindMemberUserID(ctx context.Context, groupID string) (userIDs []string, err error)
Take(ctx context.Context, groupID string, userID string) (groupMember *model.GroupMember, err error)
TakeOwner(ctx context.Context, groupID string) (groupMember *model.GroupMember, err error)

@ -115,11 +115,28 @@ func (g *GroupMemberMgo) Delete(ctx context.Context, groupID string, userIDs []s
func (g *GroupMemberMgo) UpdateRoleLevel(ctx context.Context, groupID string, userID string, roleLevel int32) error {
return mongoutil.IncrVersion(func() error {
return g.Update(ctx, groupID, userID, bson.M{"role_level": roleLevel})
return mongoutil.UpdateOne(ctx, g.coll, bson.M{"group_id": groupID, "user_id": userID},
bson.M{"$set": bson.M{"role_level": roleLevel}}, true)
}, func() error {
return g.member.IncrVersion(ctx, groupID, []string{userID}, model.VersionStateUpdate)
})
}
func (g *GroupMemberMgo) UpdateUserRoleLevels(ctx context.Context, groupID string, firstUserID string, firstUserRoleLevel int32, secondUserID string, secondUserRoleLevel int32) error {
return mongoutil.IncrVersion(func() error {
if err := mongoutil.UpdateOne(ctx, g.coll, bson.M{"group_id": groupID, "user_id": firstUserID},
bson.M{"$set": bson.M{"role_level": firstUserRoleLevel}}, true); err != nil {
return err
}
if err := mongoutil.UpdateOne(ctx, g.coll, bson.M{"group_id": groupID, "user_id": secondUserID},
bson.M{"$set": bson.M{"role_level": secondUserRoleLevel}}, true); err != nil {
return err
}
return nil
}, func() error {
return g.member.IncrVersion(ctx, groupID, []string{firstUserID, secondUserID}, model.VersionStateUpdate)
})
}
func (g *GroupMemberMgo) Update(ctx context.Context, groupID string, userID string, data map[string]any) (err error) {
if len(data) == 0 {

@ -16,10 +16,13 @@ package mgo
import (
"context"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/pagination"
"github.com/openimsdk/tools/errs"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
@ -68,3 +71,14 @@ func (o *S3Mongo) Take(ctx context.Context, engine string, name string) (*model.
func (o *S3Mongo) Delete(ctx context.Context, engine string, name string) error {
return mongoutil.DeleteOne(ctx, o.coll, bson.M{"name": name, "engine": engine})
}
func (o *S3Mongo) FindByExpires(ctx context.Context, duration time.Time, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) {
return mongoutil.FindPage[*model.Object](ctx, o.coll, bson.M{
"create_time": bson.M{"$lt": duration},
}, pagination)
}
func (o *S3Mongo) FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error) {
return mongoutil.Count(ctx, o.coll, bson.M{
"key": key,
"create_time": bson.M{"$gt": duration},
})
}

@ -0,0 +1,39 @@
package mgo
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"testing"
"time"
)
func Result[V any](val V, err error) V {
if err != nil {
panic(err)
}
return val
}
func Check(err error) {
if err != nil {
panic(err)
}
}
func TestName(t *testing.T) {
cli := Result(mongo.Connect(context.Background(), options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second)))
coll := cli.Database("openim_v3").Collection("version_test")
tmp, err := NewVersionLog(coll)
if err != nil {
panic(err)
}
vl := tmp.(*VersionLogMgo)
res, err := vl.writeLogBatch2(context.Background(), "100", []string{"1000", "1001", "1003"}, model.VersionStateInsert, time.Now())
if err != nil {
t.Log(err)
return
}
t.Logf("%+v", res)
}

@ -16,11 +16,16 @@ package database
import (
"context"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/db/pagination"
)
type ObjectInfo interface {
SetObject(ctx context.Context, obj *model.Object) error
Take(ctx context.Context, engine string, name string) (*model.Object, error)
Delete(ctx context.Context, engine string, name string) error
FindByExpires(ctx context.Context, duration time.Time, pagination pagination.Pagination) (total int64, objects []*model.Object, err error)
FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error)
}

@ -16,15 +16,19 @@ package rpccache
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
pbconversation "github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/redis/go-redis/v9"
"golang.org/x/sync/errgroup"
)
const (
conversationWorkerCount = 20
)
func NewConversationLocalCache(client rpcclient.ConversationRpcClient, localCache *config.LocalCache, cli redis.UniversalClient) *ConversationLocalCache {
@ -90,15 +94,33 @@ func (c *ConversationLocalCache) GetSingleConversationRecvMsgOpt(ctx context.Con
}
func (c *ConversationLocalCache) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*pbconversation.Conversation, error) {
conversations := make([]*pbconversation.Conversation, 0, len(conversationIDs))
var (
conversations = make([]*pbconversation.Conversation, 0, len(conversationIDs))
conversationsChan = make(chan *pbconversation.Conversation, len(conversationIDs))
)
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(conversationWorkerCount)
for _, conversationID := range conversationIDs {
conversation, err := c.GetConversation(ctx, ownerUserID, conversationID)
if err != nil {
if errs.ErrRecordNotFound.Is(err) {
continue
conversationID := conversationID
g.Go(func() error {
conversation, err := c.GetConversation(ctx, ownerUserID, conversationID)
if err != nil {
if errs.ErrRecordNotFound.Is(err) {
return nil
}
return err
}
return nil, err
}
conversationsChan <- conversation
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
close(conversationsChan)
for conversation := range conversationsChan {
conversations = append(conversations, conversation)
}
return conversations, nil

@ -41,3 +41,7 @@ func NewThird(discov discovery.SvcDiscoveryRegistry, rpcRegisterName, grafanaUrl
}
return &Third{discov: discov, Client: client, conn: conn, GrafanaUrl: grafanaUrl}
}
func (t *Third) DeleteOutdatedData(ctx context.Context, expires int64) error {
_, err := t.Client.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: expires})
return err
}

Loading…
Cancel
Save