feat: add some logs (#2461)

* fix: GroupApplicationAcceptedNotification

* fix: GroupApplicationAcceptedNotification

* fix: NotificationUserInfoUpdate

* cicd: robot automated Change

* fix: component

* fix: getConversationInfo

* feat: cron task

* feat: cron task

* feat: cron task

* feat: cron task

* feat: cron task

* fix: minio config url recognition error

* new mongo

* new mongo

* new mongo

* new mongo

* new mongo

* new mongo

* new mongo

* new mongo

* friend incr sync

* friend incr sync

* friend incr sync

* friend incr sync

* friend incr sync

* mage

* optimization version log

* optimization version log

* sync

* sync

* sync

* group sync

* sync option

* sync option

* refactor: replace `friend` package with `realtion`.

* refactor: update lastest commit to relation.

* sync option

* sync option

* sync option

* sync

* sync

* go.mod

* seq

* update: go mod

* refactor: change incremental to full

* feat: get full friend user ids

* feat: api and config

* seq

* group version

* merge

* seq

* seq

* seq

* fix: sort by id avoid unstable sort friends.

* group

* group

* group

* fix: sort by id avoid unstable sort friends.

* fix: sort by id avoid unstable sort friends.

* fix: sort by id avoid unstable sort friends.

* user version

* seq

* seq

* seq user

* user online

* implement minio expire delete.

* user online

* config

* fix

* fix

* implement minio expire delete logic.

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* feat: implement scheduled delete outdated object in minio.

* update gomake version

* update gomake version

* implement FindExpires pagination.

* remove unnesseary incr.

* fix uncorrect args call.

* online push

* online push

* online push

* resolving conflicts

* resolving conflicts

* test

* api prommetrics

* api prommetrics

* api prommetrics

* api prommetrics

* api prommetrics

* rpc prommetrics

* rpc prommetrics

* online status

* online status

* online status

* online status

* sub

* conversation version incremental

* merge seq

* merge online

* merge online

* merge online

* merge seq

* GetOwnerConversation

* fix: change incremental syncer router name.

* rockscache batch get

* rockscache seq batch get

* fix: GetMsgDocModelByIndex bug

* update go.mod

* update go.mod

* merge

* feat: prometheus

* feat: prometheus

* group member sort

* sub

* sub

* fix: seq conversion bug

* fix: redis pipe exec

* sort version

* sort version

* sort version

* remove old version online subscription

* remove old version online subscription

* version log index

* version log index

* batch push

* batch push

* seq void filling

* fix: batchGetMaxSeq

* fix: batchGetMaxSeq

* cache db error log

* 111

* fix bug

* fix: ImportFriends

* add online cache

* add some logs

---------

Co-authored-by: withchao <withchao@users.noreply.github.com>
Co-authored-by: Monet Lee <monet_lee@163.com>
Co-authored-by: OpenIM-Gordon <46924906+FGadvancer@users.noreply.github.com>
Co-authored-by: icey-yu <1186114839@qq.com>
pull/2475/head
chao 4 months ago committed by GitHub
parent fb689618d8
commit d2b02dbabe
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -198,6 +198,7 @@ func (c *ConsumerHandler) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.
offlineUserIDs = append(offlineUserIDs, userID) offlineUserIDs = append(offlineUserIDs, userID)
} }
} }
log.ZDebug(ctx, "GetConnsAndOnlinePush online cache", "sendID", msg.SendID, "recvID", msg.RecvID, "groupID", msg.GroupID, "sessionType", msg.SessionType, "clientMsgID", msg.ClientMsgID, "serverMsgID", msg.ServerMsgID, "offlineUserIDs", offlineUserIDs, "onlineUserIDs", onlineUserIDs)
var result []*msggateway.SingleMsgToUserResults var result []*msggateway.SingleMsgToUserResults
if len(onlineUserIDs) > 0 { if len(onlineUserIDs) > 0 {
var err error var err error

@ -5,6 +5,7 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
"strconv" "strconv"
"time" "time"
@ -82,8 +83,11 @@ func (s *userOnline) SetUserOnline(ctx context.Context, userID string, online, o
argv = append(argv, platformID) argv = append(argv, platformID)
} }
keys := []string{s.getUserOnlineKey(userID), userID, s.channelName} keys := []string{s.getUserOnlineKey(userID), userID, s.channelName}
if err := s.rdb.Eval(ctx, script, keys, argv).Err(); err != nil { status, err := s.rdb.Eval(ctx, script, keys, argv).Result()
if err != nil {
log.ZError(ctx, "redis SetUserOnline", err, "userID", userID, "online", online, "offline", offline)
return err return err
} }
log.ZDebug(ctx, "redis SetUserOnline", "userID", userID, "online", online, "offline", offline, "status", status)
return nil return nil
} }

@ -28,7 +28,7 @@ func NewOnlineCache(user rpcclient.UserRpcClient, group *GroupLocalCache, rdb re
for message := range rdb.Subscribe(ctx, cachekey.OnlineChannel).Channel() { for message := range rdb.Subscribe(ctx, cachekey.OnlineChannel).Channel() {
userID, platformIDs, err := useronline.ParseUserOnlineStatus(message.Payload) userID, platformIDs, err := useronline.ParseUserOnlineStatus(message.Payload)
if err != nil { if err != nil {
log.ZError(ctx, "OnlineCache redis subscribe parseUserOnlineStatus", err, "payload", message.Payload, "channel", message.Channel) log.ZError(ctx, "OnlineCache setUserOnline redis subscribe parseUserOnlineStatus", err, "payload", message.Payload, "channel", message.Channel)
continue continue
} }
storageCache := x.setUserOnline(userID, platformIDs) storageCache := x.setUserOnline(userID, platformIDs)
@ -48,9 +48,15 @@ type OnlineCache struct {
} }
func (o *OnlineCache) GetUserOnlinePlatform(ctx context.Context, userID string) ([]int32, error) { func (o *OnlineCache) GetUserOnlinePlatform(ctx context.Context, userID string) ([]int32, error) {
return o.local.Get(userID, func() ([]int32, error) { platformIDs, err := o.local.Get(userID, func() ([]int32, error) {
return o.user.GetUserOnlinePlatform(ctx, userID) return o.user.GetUserOnlinePlatform(ctx, userID)
}) })
if err != nil {
log.ZError(ctx, "OnlineCache GetUserOnlinePlatform", err, "userID", userID)
return nil, err
}
log.ZDebug(ctx, "OnlineCache GetUserOnlinePlatform", "userID", userID, "platformIDs", platformIDs)
return nil, err
} }
func (o *OnlineCache) GetUserOnline(ctx context.Context, userID string) (bool, error) { func (o *OnlineCache) GetUserOnline(ctx context.Context, userID string) (bool, error) {
@ -61,39 +67,39 @@ func (o *OnlineCache) GetUserOnline(ctx context.Context, userID string) (bool, e
return len(platformIDs) > 0, nil return len(platformIDs) > 0, nil
} }
func (o *OnlineCache) GetUsersOnline(ctx context.Context, userIDs []string) ([]string, error) { //func (o *OnlineCache) GetUsersOnline(ctx context.Context, userIDs []string) ([]string, error) {
onlineUserIDs := make([]string, 0, len(userIDs)) // onlineUserIDs := make([]string, 0, len(userIDs))
for _, userID := range userIDs { // for _, userID := range userIDs {
online, err := o.GetUserOnline(ctx, userID) // online, err := o.GetUserOnline(ctx, userID)
if err != nil { // if err != nil {
return nil, err // return nil, err
} // }
if online { // if online {
onlineUserIDs = append(onlineUserIDs, userID) // onlineUserIDs = append(onlineUserIDs, userID)
} // }
} // }
log.ZDebug(ctx, "OnlineCache GetUsersOnline", "userIDs", userIDs, "onlineUserIDs", onlineUserIDs) // log.ZDebug(ctx, "OnlineCache GetUsersOnline", "userIDs", userIDs, "onlineUserIDs", onlineUserIDs)
return onlineUserIDs, nil // return onlineUserIDs, nil
} //}
//
func (o *OnlineCache) GetGroupOnline(ctx context.Context, groupID string) ([]string, error) { //func (o *OnlineCache) GetGroupOnline(ctx context.Context, groupID string) ([]string, error) {
userIDs, err := o.group.GetGroupMemberIDs(ctx, groupID) // userIDs, err := o.group.GetGroupMemberIDs(ctx, groupID)
if err != nil { // if err != nil {
return nil, err // return nil, err
} // }
var onlineUserIDs []string // var onlineUserIDs []string
for _, userID := range userIDs { // for _, userID := range userIDs {
online, err := o.GetUserOnline(ctx, userID) // online, err := o.GetUserOnline(ctx, userID)
if err != nil { // if err != nil {
return nil, err // return nil, err
} // }
if online { // if online {
onlineUserIDs = append(onlineUserIDs, userID) // onlineUserIDs = append(onlineUserIDs, userID)
} // }
} // }
log.ZDebug(ctx, "OnlineCache GetGroupOnline", "groupID", groupID, "onlineUserIDs", onlineUserIDs, "allUserID", userIDs) // log.ZDebug(ctx, "OnlineCache GetGroupOnline", "groupID", groupID, "onlineUserIDs", onlineUserIDs, "allUserID", userIDs)
return onlineUserIDs, nil // return onlineUserIDs, nil
} //}
func (o *OnlineCache) setUserOnline(userID string, platformIDs []int32) bool { func (o *OnlineCache) setUserOnline(userID string, platformIDs []int32) bool {
return o.local.SetHas(userID, platformIDs) return o.local.SetHas(userID, platformIDs)

Loading…
Cancel
Save