diff --git a/cmd/openim-rpc/openim-rpc-conversation/main.go b/cmd/openim-rpc/openim-rpc-conversation/main.go index 5b2e66c95..5b3077ccb 100644 --- a/cmd/openim-rpc/openim-rpc-conversation/main.go +++ b/cmd/openim-rpc/openim-rpc-conversation/main.go @@ -17,9 +17,13 @@ package main import ( "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/tools/system/program" + "os" ) func main() { + if len(os.Args) == 1 { + os.Args = []string{os.Args[0], "-i", "0", "-c", "/Users/chao/Desktop/project/open-im-server/config"} + } if err := cmd.NewConversationRpcCmd().Exec(); err != nil { program.ExitWithError(err) } diff --git a/cmd/openim-rpc/openim-rpc-group/main.go b/cmd/openim-rpc/openim-rpc-group/main.go index 5badf934e..44e5509df 100644 --- a/cmd/openim-rpc/openim-rpc-group/main.go +++ b/cmd/openim-rpc/openim-rpc-group/main.go @@ -17,9 +17,13 @@ package main import ( "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/tools/system/program" + "os" ) func main() { + if len(os.Args) == 1 { + os.Args = []string{os.Args[0], "-i", "0", "-c", "/Users/chao/Desktop/project/open-im-server/config"} + } if err := cmd.NewGroupRpcCmd().Exec(); err != nil { program.ExitWithError(err) } diff --git a/config/discovery.yml b/config/discovery.yml index 3d96ff9b6..9cd23c590 100644 --- a/config/discovery.yml +++ b/config/discovery.yml @@ -1,13 +1,13 @@ enable: "etcd" etcd: rootDirectory: openim - address: [ localhost:12379 ] + address: [ 172.16.8.48:12379 ] username: '' password: '' zookeeper: schema: openim - address: [ localhost:12181 ] + address: [ 172.16.8.48:12181 ] username: '' password: '' diff --git a/config/kafka.yml b/config/kafka.yml index d412e1be0..d9b7ffa3c 100644 --- a/config/kafka.yml +++ b/config/kafka.yml @@ -7,7 +7,7 @@ producerAck: "" # Compression type to use (e.g., none, gzip, snappy) compressType: "none" # List of Kafka broker addresses -address: [ localhost:19094 ] +address: [ 172.16.8.48:19094 ] # Kafka topic for Redis integration toRedisTopic: "toRedis" # Kafka topic for MongoDB integration diff --git a/config/minio.yml b/config/minio.yml index 11a9ace35..d143a1da3 100644 --- a/config/minio.yml +++ b/config/minio.yml @@ -7,9 +7,9 @@ secretAccessKey: "openIM123" # Session token for MinIO authentication (optional) sessionToken: '' # Internal address of the MinIO server -internalAddress: "localhost:10005" +internalAddress: "172.16.8.48:10005" # External address of the MinIO server, accessible from outside. Supports both HTTP and HTTPS using a domain name -externalAddress: "http://external_ip:10005" +externalAddress: "http://172.16.8.48:10005" # Flag to enable or disable public read access to the bucket publicRead: false diff --git a/config/mongodb.yml b/config/mongodb.yml index 98f5694e4..53969298b 100644 --- a/config/mongodb.yml +++ b/config/mongodb.yml @@ -1,7 +1,7 @@ # URI for database connection, leave empty if using address and credential settings directly uri: '' # List of MongoDB server addresses -address: [ localhost:37017 ] +address: [ 172.16.8.48:37017 ] # Name of the database database: openim_v3 # Username for database authentication diff --git a/config/redis.yml b/config/redis.yml index 87abed0e1..404d18953 100644 --- a/config/redis.yml +++ b/config/redis.yml @@ -1,4 +1,4 @@ -address: [ localhost:16379 ] +address: [ 172.16.8.48:16379 ] username: '' password: openIM123 clusterMode: false diff --git a/internal/msggateway/n_ws_server.go b/internal/msggateway/n_ws_server.go index defec16df..17e550fef 100644 --- a/internal/msggateway/n_ws_server.go +++ b/internal/msggateway/n_ws_server.go @@ -309,7 +309,7 @@ func getRemoteAdders(client []*Client) string { } func (ws *WsServer) KickUserConn(client *Client) error { - ws.clients.deleteClients(client.UserID, []*Client{client}) + ws.clients.DeleteClients(client.UserID, []*Client{client}) return client.KickOnlineMessage() } @@ -325,7 +325,7 @@ func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Clien if !clientOK { return } - ws.clients.deleteClients(newClient.UserID, oldClients) + ws.clients.DeleteClients(newClient.UserID, oldClients) for _, c := range oldClients { err := c.KickOnlineMessage() if err != nil { @@ -345,7 +345,7 @@ func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Clien func (ws *WsServer) unregisterClient(client *Client) { defer ws.clientPool.Put(client) - isDeleteUser := ws.clients.delete(client.UserID, client.ctx.GetRemoteAddr()) + isDeleteUser := ws.clients.Delete(client.UserID, client.ctx.GetRemoteAddr()) if isDeleteUser { ws.onlineUserNum.Add(-1) prommetrics.OnlineUserGauge.Dec() diff --git a/internal/msggateway/user_map.go b/internal/msggateway/user_map.go index 79cc53d1b..f8bf69f9a 100644 --- a/internal/msggateway/user_map.go +++ b/internal/msggateway/user_map.go @@ -72,7 +72,7 @@ func (u *UserMap) Set(key string, v *Client) { } } -func (u *UserMap) delete(key string, connRemoteAddr string) (isDeleteUser bool) { +func (u *UserMap) Delete(key string, connRemoteAddr string) (isDeleteUser bool) { // Attempt to load the clients associated with the key. allClients, existed := u.m.Load(key) if !existed { @@ -101,7 +101,7 @@ func (u *UserMap) delete(key string, connRemoteAddr string) (isDeleteUser bool) return false } -func (u *UserMap) deleteClients(key string, clients []*Client) (isDeleteUser bool) { +func (u *UserMap) DeleteClients(key string, clients []*Client) (isDeleteUser bool) { m := datautil.SliceToMapAny(clients, func(c *Client) (string, struct{}) { return c.ctx.GetRemoteAddr(), struct{}{} }) diff --git a/internal/msggateway/user_map2.go b/internal/msggateway/user_map2.go new file mode 100644 index 000000000..1c9a91c6a --- /dev/null +++ b/internal/msggateway/user_map2.go @@ -0,0 +1,32 @@ +package msggateway + +/* + +sorted set + +userID: 10000 + +USER_ONLINE:10000 + + + + + +platformID: 1 + + + + + +key score + +E1 123456789 +O1 234567895 + + + + + + + +*/ diff --git a/pkg/common/storage/cache/redis/online.go b/pkg/common/storage/cache/redis/online.go new file mode 100644 index 000000000..138f9a573 --- /dev/null +++ b/pkg/common/storage/cache/redis/online.go @@ -0,0 +1,58 @@ +package redis + +import ( + "context" + "github.com/redis/go-redis/v9" + "time" +) + +type userOnline struct { + rdb redis.UniversalClient + expire time.Duration + channelName string +} + +func (s *userOnline) getUserOnlineKey(userID string) string { + return "USER_ONLINE:" + userID +} + +func (s *userOnline) SetUserOnline(ctx context.Context, userID string, online, offline []int32) error { + script := ` + local key = KEYS[1] + local score = ARGV[3] + local num1 = redis.call("ZCARD", key) + redis.call("ZREMRANGEBYSCORE", key, "-inf", ARGV[2]) + for i = 5, tonumber(ARGV[4])+4 do + redis.call("ZREM", key, ARGV[i]) + end + local num2 = redis.call("ZCARD", key) + for i = 5+tonumber(ARGV[4]), #ARGV do + redis.call("ZADD", key, score, ARGV[i]) + end + redis.call("EXPIRE", key, ARGV[1]) + local num3 = redis.call("ZCARD", key) + local change = (num1 ~= num2) or (num2 ~= num3) + if change then + local members = redis.call("ZRANGE", key, 0, -1) + table.insert(members, KEYS[2]) + redis.call("PUBLISH", KEYS[3], table.concat(members, ":")) + return 1 + else + return 0 + end +` + now := time.Now() + argv := make([]any, 0, 2+len(online)+len(offline)) + argv = append(argv, int32(s.expire/time.Second), now.Unix(), now.Add(s.expire).Unix(), int32(len(offline))) + for _, platformID := range offline { + argv = append(argv, platformID) + } + for _, platformID := range online { + argv = append(argv, platformID) + } + keys := []string{s.getUserOnlineKey(userID), userID, s.channelName} + if err := s.rdb.Eval(ctx, script, keys, argv).Err(); err != nil { + return err + } + return nil +} diff --git a/pkg/common/storage/cache/redis/seq_user_test.go b/pkg/common/storage/cache/redis/seq_user_test.go new file mode 100644 index 000000000..04a5d49cb --- /dev/null +++ b/pkg/common/storage/cache/redis/seq_user_test.go @@ -0,0 +1,70 @@ +package redis + +import ( + "context" + "fmt" + "github.com/redis/go-redis/v9" + "log" + "testing" + "time" +) + +func newTestOnline() *userOnline { + opt := &redis.Options{ + Addr: "172.16.8.48:16379", + Password: "openIM123", + DB: 1, + } + rdb := redis.NewClient(opt) + if err := rdb.Ping(context.Background()).Err(); err != nil { + panic(err) + } + return &userOnline{rdb: rdb, expire: time.Hour, channelName: "user_online"} +} + +func TestOnline(t *testing.T) { + ts := newTestOnline() + + //err := ts.SetUserOnline(context.Background(), "1000", []int32{1, 2, 3}, []int32{4, 5, 6}) + err := ts.SetUserOnline(context.Background(), "1000", nil, []int32{1, 2, 3}) + + t.Log(err) + +} + +/* + +local function tableToString(tbl, separator) + local result = {} + for _, v in ipairs(tbl) do + table.insert(result, tostring(v)) + end + return table.concat(result, separator) +end + +local myTable = {"one", "two", "three"} +local result = tableToString(myTable, ":") + +print(result) + +*/ + +func TestRecvOnline(t *testing.T) { + ts := newTestOnline() + ctx := context.Background() + pubsub := ts.rdb.Subscribe(ctx, "user_online") + + // 等待订阅确认 + _, err := pubsub.Receive(ctx) + if err != nil { + log.Fatalf("Could not subscribe: %v", err) + } + + // 创建一个通道来接收消息 + ch := pubsub.Channel() + + // 处理接收到的消息 + for msg := range ch { + fmt.Printf("Received message from channel %s: %s\n", msg.Channel, msg.Payload) + } +} diff --git a/pkg/common/storage/cache/redis/user.go b/pkg/common/storage/cache/redis/user.go index 3de01563b..c05cd3895 100644 --- a/pkg/common/storage/cache/redis/user.go +++ b/pkg/common/storage/cache/redis/user.go @@ -131,6 +131,15 @@ func (u *UserCacheRedis) DelUsersGlobalRecvMsgOpt(userIDs ...string) cache.UserC return cache } +/* + + */ + +type RedisUserOnline struct { + // 平台id, 平台更新时间 + PlatformIDs map[int32]int64 +} + // GetUserStatus get user status. func (u *UserCacheRedis) GetUserStatus(ctx context.Context, userIDs []string) ([]*user.OnlineStatus, error) { userStatus := make([]*user.OnlineStatus, 0, len(userIDs))