Merge branch 'openimsdk:main' into main

pull/2622/head
icey-yu 1 year ago committed by GitHub
commit 79c40a8527
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -198,6 +198,7 @@ func (c *ConsumerHandler) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.
offlineUserIDs = append(offlineUserIDs, userID) offlineUserIDs = append(offlineUserIDs, userID)
} }
} }
log.ZDebug(ctx, "GetConnsAndOnlinePush online cache", "sendID", msg.SendID, "recvID", msg.RecvID, "groupID", msg.GroupID, "sessionType", msg.SessionType, "clientMsgID", msg.ClientMsgID, "serverMsgID", msg.ServerMsgID, "offlineUserIDs", offlineUserIDs, "onlineUserIDs", onlineUserIDs)
var result []*msggateway.SingleMsgToUserResults var result []*msggateway.SingleMsgToUserResults
if len(onlineUserIDs) > 0 { if len(onlineUserIDs) > 0 {
var err error var err error

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package friend package relation
import ( import (
"context" "context"

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package friend package relation
import ( import (
"context" "context"

@ -12,10 +12,11 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package friend package relation
import ( import (
"context" "context"
"github.com/openimsdk/tools/mq/memamq" "github.com/openimsdk/tools/mq/memamq"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package friend package relation
import ( import (
"context" "context"

@ -1,4 +1,4 @@
package friend package relation
import ( import (
"context" "context"

@ -17,7 +17,7 @@ package user
import ( import (
"context" "context"
"errors" "errors"
"github.com/openimsdk/open-im-server/v3/internal/rpc/friend" "github.com/openimsdk/open-im-server/v3/internal/rpc/relation"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
@ -54,7 +54,7 @@ import (
type userServer struct { type userServer struct {
online cache.OnlineCache online cache.OnlineCache
db controller.UserDatabase db controller.UserDatabase
friendNotificationSender *friend.FriendNotificationSender friendNotificationSender *relation.FriendNotificationSender
userNotificationSender *UserNotificationSender userNotificationSender *UserNotificationSender
friendRpcClient *rpcclient.FriendRpcClient friendRpcClient *rpcclient.FriendRpcClient
groupRpcClient *rpcclient.GroupRpcClient groupRpcClient *rpcclient.GroupRpcClient
@ -105,7 +105,7 @@ func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegi
RegisterCenter: client, RegisterCenter: client,
friendRpcClient: &friendRpcClient, friendRpcClient: &friendRpcClient,
groupRpcClient: &groupRpcClient, groupRpcClient: &groupRpcClient,
friendNotificationSender: friend.NewFriendNotificationSender(&config.NotificationConfig, &msgRpcClient, friend.WithDBFunc(database.FindWithError)), friendNotificationSender: relation.NewFriendNotificationSender(&config.NotificationConfig, &msgRpcClient, relation.WithDBFunc(database.FindWithError)),
userNotificationSender: NewUserNotificationSender(config, &msgRpcClient, WithUserFunc(database.FindWithError)), userNotificationSender: NewUserNotificationSender(config, &msgRpcClient, WithUserFunc(database.FindWithError)),
config: config, config: config,
webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL), webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL),

@ -17,7 +17,7 @@ package cmd
import ( import (
"context" "context"
"github.com/openimsdk/open-im-server/v3/internal/rpc/friend" "github.com/openimsdk/open-im-server/v3/internal/rpc/relation"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
"github.com/openimsdk/open-im-server/v3/version" "github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/system/program" "github.com/openimsdk/tools/system/program"
@ -28,21 +28,21 @@ type FriendRpcCmd struct {
*RootCmd *RootCmd
ctx context.Context ctx context.Context
configMap map[string]any configMap map[string]any
friendConfig *friend.Config relationConfig *relation.Config
} }
func NewFriendRpcCmd() *FriendRpcCmd { func NewFriendRpcCmd() *FriendRpcCmd {
var friendConfig friend.Config var relationConfig relation.Config
ret := &FriendRpcCmd{friendConfig: &friendConfig} ret := &FriendRpcCmd{relationConfig: &relationConfig}
ret.configMap = map[string]any{ ret.configMap = map[string]any{
OpenIMRPCFriendCfgFileName: &friendConfig.RpcConfig, OpenIMRPCFriendCfgFileName: &relationConfig.RpcConfig,
RedisConfigFileName: &friendConfig.RedisConfig, RedisConfigFileName: &relationConfig.RedisConfig,
MongodbConfigFileName: &friendConfig.MongodbConfig, MongodbConfigFileName: &relationConfig.MongodbConfig,
ShareFileName: &friendConfig.Share, ShareFileName: &relationConfig.Share,
NotificationFileName: &friendConfig.NotificationConfig, NotificationFileName: &relationConfig.NotificationConfig,
WebhooksConfigFileName: &friendConfig.WebhooksConfig, WebhooksConfigFileName: &relationConfig.WebhooksConfig,
LocalCacheConfigFileName: &friendConfig.LocalCacheConfig, LocalCacheConfigFileName: &relationConfig.LocalCacheConfig,
DiscoveryConfigFilename: &friendConfig.Discovery, DiscoveryConfigFilename: &relationConfig.Discovery,
} }
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", version.Version) ret.ctx = context.WithValue(context.Background(), "version", version.Version)
@ -57,7 +57,7 @@ func (a *FriendRpcCmd) Exec() error {
} }
func (a *FriendRpcCmd) runE() error { func (a *FriendRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.friendConfig.Discovery, &a.friendConfig.RpcConfig.Prometheus, a.friendConfig.RpcConfig.RPC.ListenIP, return startrpc.Start(a.ctx, &a.relationConfig.Discovery, &a.relationConfig.RpcConfig.Prometheus, a.relationConfig.RpcConfig.RPC.ListenIP,
a.friendConfig.RpcConfig.RPC.RegisterIP, a.friendConfig.RpcConfig.RPC.Ports, a.relationConfig.RpcConfig.RPC.RegisterIP, a.relationConfig.RpcConfig.RPC.Ports,
a.Index(), a.friendConfig.Share.RpcRegisterName.Friend, &a.friendConfig.Share, a.friendConfig, friend.Start) a.Index(), a.relationConfig.Share.RpcRegisterName.Friend, &a.relationConfig.Share, a.relationConfig, relation.Start)
} }

@ -5,6 +5,7 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
"strconv" "strconv"
"time" "time"
@ -82,8 +83,11 @@ func (s *userOnline) SetUserOnline(ctx context.Context, userID string, online, o
argv = append(argv, platformID) argv = append(argv, platformID)
} }
keys := []string{s.getUserOnlineKey(userID), userID, s.channelName} keys := []string{s.getUserOnlineKey(userID), userID, s.channelName}
if err := s.rdb.Eval(ctx, script, keys, argv).Err(); err != nil { status, err := s.rdb.Eval(ctx, script, keys, argv).Result()
if err != nil {
log.ZError(ctx, "redis SetUserOnline", err, "userID", userID, "online", online, "offline", offline)
return err return err
} }
log.ZDebug(ctx, "redis SetUserOnline", "userID", userID, "online", online, "offline", offline, "status", status)
return nil return nil
} }

@ -28,7 +28,7 @@ func NewOnlineCache(user rpcclient.UserRpcClient, group *GroupLocalCache, rdb re
for message := range rdb.Subscribe(ctx, cachekey.OnlineChannel).Channel() { for message := range rdb.Subscribe(ctx, cachekey.OnlineChannel).Channel() {
userID, platformIDs, err := useronline.ParseUserOnlineStatus(message.Payload) userID, platformIDs, err := useronline.ParseUserOnlineStatus(message.Payload)
if err != nil { if err != nil {
log.ZError(ctx, "OnlineCache redis subscribe parseUserOnlineStatus", err, "payload", message.Payload, "channel", message.Channel) log.ZError(ctx, "OnlineCache setUserOnline redis subscribe parseUserOnlineStatus", err, "payload", message.Payload, "channel", message.Channel)
continue continue
} }
storageCache := x.setUserOnline(userID, platformIDs) storageCache := x.setUserOnline(userID, platformIDs)
@ -48,9 +48,15 @@ type OnlineCache struct {
} }
func (o *OnlineCache) GetUserOnlinePlatform(ctx context.Context, userID string) ([]int32, error) { func (o *OnlineCache) GetUserOnlinePlatform(ctx context.Context, userID string) ([]int32, error) {
return o.local.Get(userID, func() ([]int32, error) { platformIDs, err := o.local.Get(userID, func() ([]int32, error) {
return o.user.GetUserOnlinePlatform(ctx, userID) return o.user.GetUserOnlinePlatform(ctx, userID)
}) })
if err != nil {
log.ZError(ctx, "OnlineCache GetUserOnlinePlatform", err, "userID", userID)
return nil, err
}
log.ZDebug(ctx, "OnlineCache GetUserOnlinePlatform", "userID", userID, "platformIDs", platformIDs)
return platformIDs, nil
} }
func (o *OnlineCache) GetUserOnline(ctx context.Context, userID string) (bool, error) { func (o *OnlineCache) GetUserOnline(ctx context.Context, userID string) (bool, error) {
@ -61,39 +67,39 @@ func (o *OnlineCache) GetUserOnline(ctx context.Context, userID string) (bool, e
return len(platformIDs) > 0, nil return len(platformIDs) > 0, nil
} }
func (o *OnlineCache) GetUsersOnline(ctx context.Context, userIDs []string) ([]string, error) { //func (o *OnlineCache) GetUsersOnline(ctx context.Context, userIDs []string) ([]string, error) {
onlineUserIDs := make([]string, 0, len(userIDs)) // onlineUserIDs := make([]string, 0, len(userIDs))
for _, userID := range userIDs { // for _, userID := range userIDs {
online, err := o.GetUserOnline(ctx, userID) // online, err := o.GetUserOnline(ctx, userID)
if err != nil { // if err != nil {
return nil, err // return nil, err
} // }
if online { // if online {
onlineUserIDs = append(onlineUserIDs, userID) // onlineUserIDs = append(onlineUserIDs, userID)
} // }
} // }
log.ZDebug(ctx, "OnlineCache GetUsersOnline", "userIDs", userIDs, "onlineUserIDs", onlineUserIDs) // log.ZDebug(ctx, "OnlineCache GetUsersOnline", "userIDs", userIDs, "onlineUserIDs", onlineUserIDs)
return onlineUserIDs, nil // return onlineUserIDs, nil
} //}
//
func (o *OnlineCache) GetGroupOnline(ctx context.Context, groupID string) ([]string, error) { //func (o *OnlineCache) GetGroupOnline(ctx context.Context, groupID string) ([]string, error) {
userIDs, err := o.group.GetGroupMemberIDs(ctx, groupID) // userIDs, err := o.group.GetGroupMemberIDs(ctx, groupID)
if err != nil { // if err != nil {
return nil, err // return nil, err
} // }
var onlineUserIDs []string // var onlineUserIDs []string
for _, userID := range userIDs { // for _, userID := range userIDs {
online, err := o.GetUserOnline(ctx, userID) // online, err := o.GetUserOnline(ctx, userID)
if err != nil { // if err != nil {
return nil, err // return nil, err
} // }
if online { // if online {
onlineUserIDs = append(onlineUserIDs, userID) // onlineUserIDs = append(onlineUserIDs, userID)
} // }
} // }
log.ZDebug(ctx, "OnlineCache GetGroupOnline", "groupID", groupID, "onlineUserIDs", onlineUserIDs, "allUserID", userIDs) // log.ZDebug(ctx, "OnlineCache GetGroupOnline", "groupID", groupID, "onlineUserIDs", onlineUserIDs, "allUserID", userIDs)
return onlineUserIDs, nil // return onlineUserIDs, nil
} //}
func (o *OnlineCache) setUserOnline(userID string, platformIDs []int32) bool { func (o *OnlineCache) setUserOnline(userID string, platformIDs []int32) bool {
return o.local.SetHas(userID, platformIDs) return o.local.SetHas(userID, platformIDs)

Loading…
Cancel
Save