diff --git a/.github/workflows/go-build-test.yml b/.github/workflows/go-build-test.yml index 4033603e6..9e2aa3f1c 100644 --- a/.github/workflows/go-build-test.yml +++ b/.github/workflows/go-build-test.yml @@ -12,6 +12,10 @@ jobs: go-build: name: Test with go ${{ matrix.go_version }} on ${{ matrix.os }} runs-on: ${{ matrix.os }} + + env: + SHARE_CONFIG_PATH: config/share.yml + permissions: contents: write pull-requests: write @@ -40,6 +44,10 @@ jobs: with: compose-file: "./docker-compose.yml" + - name: Modify Server Configuration + run: | + yq e '.secret = 123456' -i ${{ env.SHARE_CONFIG_PATH }} + # - name: Get Internal IP Address # id: get-ip # run: | @@ -71,6 +79,11 @@ jobs: go mod download go install github.com/magefile/mage@latest + - name: Modify Chat Configuration + run: | + cd ${{ github.workspace }}/chat-repo + yq e '.openIM.secret = 123456' -i ${{ env.SHARE_CONFIG_PATH }} + - name: Build and test Chat Services run: | cd ${{ github.workspace }}/chat-repo @@ -132,7 +145,7 @@ jobs: # Test get admin token get_admin_token_response=$(curl -X POST -H "Content-Type: application/json" -H "operationID: imAdmin" -d '{ - "secret": "openIM123", + "secret": "123456", "platformID": 2, "userID": "imAdmin" }' http://127.0.0.1:10002/auth/get_admin_token) @@ -169,7 +182,8 @@ jobs: contents: write env: SDK_DIR: openim-sdk-core - CONFIG_PATH: config/notification.yml + NOTIFICATION_CONFIG_PATH: config/notification.yml + SHARE_CONFIG_PATH: config/share.yml strategy: matrix: @@ -184,7 +198,7 @@ jobs: uses: actions/checkout@v4 with: repository: "openimsdk/openim-sdk-core" - ref: "release-v3.8" + ref: "main" path: ${{ env.SDK_DIR }} - name: Set up Go ${{ matrix.go_version }} @@ -199,8 +213,9 @@ jobs: - name: Modify Server Configuration run: | - yq e '.groupCreated.isSendMsg = true' -i ${{ env.CONFIG_PATH }} - yq e '.friendApplicationApproved.isSendMsg = true' -i ${{ env.CONFIG_PATH }} + yq e '.groupCreated.isSendMsg = true' -i ${{ env.NOTIFICATION_CONFIG_PATH }} + yq e '.friendApplicationApproved.isSendMsg = true' -i ${{ env.NOTIFICATION_CONFIG_PATH }} + yq e '.secret = 123456' -i ${{ env.SHARE_CONFIG_PATH }} - name: Start Server Services run: | diff --git a/config/share.yml b/config/share.yml index a5fbeac75..0913c1e88 100644 --- a/config/share.yml +++ b/config/share.yml @@ -1,9 +1,9 @@ secret: openIM123 -imAdminUserID: [ imAdmin ] +imAdminUserID: [imAdmin] # 1: For Android, iOS, Windows, Mac, and web platforms, only one instance can be online at a time multiLogin: policy: 1 # max num of tokens in one end - maxNumOneEnd: 30 \ No newline at end of file + maxNumOneEnd: 30 diff --git a/go.mod b/go.mod index 0a9de4010..d65977757 100644 --- a/go.mod +++ b/go.mod @@ -12,8 +12,8 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.72-alpha.79 - github.com/openimsdk/tools v0.0.50-alpha.74 + github.com/openimsdk/protocol v0.0.73-alpha.6 + github.com/openimsdk/tools v0.0.50-alpha.79 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 @@ -219,3 +219,5 @@ require ( golang.org/x/crypto v0.27.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect ) + +//replace github.com/openimsdk/protocol => /Users/chao/Desktop/code/protocol diff --git a/go.sum b/go.sum index 66af77379..390a51c4a 100644 --- a/go.sum +++ b/go.sum @@ -345,12 +345,12 @@ github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To= github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= -github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= -github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.72-alpha.79 h1:e46no8WVAsmTzyy405klrdoUiG7u+1ohDsXvQuFng4s= -github.com/openimsdk/protocol v0.0.72-alpha.79/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= -github.com/openimsdk/tools v0.0.50-alpha.74 h1:yh10SiMiivMEjicEQg+QAsH4pvaO+4noMPdlw+ew0Kc= -github.com/openimsdk/tools v0.0.50-alpha.74/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= +github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrkXcDACCqtyM= +github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= +github.com/openimsdk/protocol v0.0.73-alpha.6 h1:sna9coWG7HN1zObBPtvG0Ki/vzqHXiB4qKbA5P3w7kc= +github.com/openimsdk/protocol v0.0.73-alpha.6/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= +github.com/openimsdk/tools v0.0.50-alpha.79 h1:jxYEbrzaze4Z2r4NrKad816buZ690ix0L9MTOOOH3ik= +github.com/openimsdk/tools v0.0.50-alpha.79/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/internal/api/init.go b/internal/api/init.go index 4bd29c9e0..1e0f1075f 100644 --- a/internal/api/init.go +++ b/internal/api/init.go @@ -144,24 +144,23 @@ func Start(ctx context.Context, index int, config *Config) error { } }() - if config.Discovery.Enable == conf.ETCD { - cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), config.GetConfigNames()) - cm.Watch(ctx) - } - - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGTERM) - - shutdown := func() error { - ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) - defer cancel() - err := server.Shutdown(ctx) - if err != nil { - return errs.WrapMsg(err, "shutdown err") - } - return nil - } - disetcd.RegisterShutDown(shutdown) + //if config.Discovery.Enable == conf.ETCD { + // cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), config.GetConfigNames()) + // cm.Watch(ctx) + //} + //sigs := make(chan os.Signal, 1) + //signal.Notify(sigs, syscall.SIGTERM) + //select { + //case val := <-sigs: + // log.ZDebug(ctx, "recv exit", "signal", val.String()) + // cancel(fmt.Errorf("signal %s", val.String())) + //case <-ctx.Done(): + //} + <-apiCtx.Done() + exitCause := context.Cause(apiCtx) + log.ZWarn(ctx, "api server exit", exitCause) + timer := time.NewTimer(time.Second * 15) + defer timer.Stop() select { case <-sigs: program.SIGTERMExit() diff --git a/internal/api/router.go b/internal/api/router.go index 216a43363..c7bc3c724 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -124,6 +124,11 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cf userRouterGroup.POST("/add_notification_account", u.AddNotificationAccount) userRouterGroup.POST("/update_notification_account", u.UpdateNotificationAccountInfo) userRouterGroup.POST("/search_notification_account", u.SearchNotificationAccount) + + userRouterGroup.POST("/get_user_client_config", u.GetUserClientConfig) + userRouterGroup.POST("/set_user_client_config", u.SetUserClientConfig) + userRouterGroup.POST("/del_user_client_config", u.DelUserClientConfig) + userRouterGroup.POST("/page_user_client_config", u.PageUserClientConfig) } // friend routing group { diff --git a/internal/api/user.go b/internal/api/user.go index a88f8f65a..7f256f5dd 100644 --- a/internal/api/user.go +++ b/internal/api/user.go @@ -242,3 +242,19 @@ func (u *UserApi) UpdateNotificationAccountInfo(c *gin.Context) { func (u *UserApi) SearchNotificationAccount(c *gin.Context) { a2r.Call(c, user.UserClient.SearchNotificationAccount, u.Client) } + +func (u *UserApi) GetUserClientConfig(c *gin.Context) { + a2r.Call(c, user.UserClient.GetUserClientConfig, u.Client) +} + +func (u *UserApi) SetUserClientConfig(c *gin.Context) { + a2r.Call(c, user.UserClient.SetUserClientConfig, u.Client) +} + +func (u *UserApi) DelUserClientConfig(c *gin.Context) { + a2r.Call(c, user.UserClient.DelUserClientConfig, u.Client) +} + +func (u *UserApi) PageUserClientConfig(c *gin.Context) { + a2r.Call(c, user.UserClient.PageUserClientConfig, u.Client) +} diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 602c4f3ee..fe7995058 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -1369,6 +1369,7 @@ func (g *groupServer) DismissGroup(ctx context.Context, req *pbgroup.DismissGrou if err != nil { return nil, err } + group.Status = constant.GroupStatusDismissed tips := &sdkws.GroupDismissedTips{ Group: g.groupDB2PB(group, owner.UserID, num), OpUser: &sdkws.GroupMemberFullInfo{}, diff --git a/internal/rpc/group/notification.go b/internal/rpc/group/notification.go index 1aa5333b4..29c4ef634 100644 --- a/internal/rpc/group/notification.go +++ b/internal/rpc/group/notification.go @@ -283,7 +283,8 @@ func (g *NotificationSender) fillOpUserByUserID(ctx context.Context, userID stri func (g *NotificationSender) setVersion(ctx context.Context, version *uint64, versionID *string, collName string, id string) { versions := versionctx.GetVersionLog(ctx).Get() - for _, coll := range versions { + for i := len(versions) - 1; i >= 0; i-- { + coll := versions[i] if coll.Name == collName && coll.Doc.DID == id { *version = uint64(coll.Doc.Version) *versionID = coll.Doc.ID.Hex() diff --git a/internal/rpc/msg/as_read.go b/internal/rpc/msg/as_read.go index de1879438..b25eae6b1 100644 --- a/internal/rpc/msg/as_read.go +++ b/internal/rpc/msg/as_read.go @@ -61,6 +61,13 @@ func (m *msgServer) GetConversationsHasReadAndMaxSeq(ctx context.Context, req *m return nil, err } resp := &msg.GetConversationsHasReadAndMaxSeqResp{Seqs: make(map[string]*msg.Seqs)} + if req.ReturnPinned { + pinnedConversationIDs, err := m.ConversationLocalCache.GetPinnedConversationIDs(ctx, req.UserID) + if err != nil { + return nil, err + } + resp.PinnedConversationIDs = pinnedConversationIDs + } for conversationID, maxSeq := range maxSeqs { resp.Seqs[conversationID] = &msg.Seqs{ HasReadSeq: hasReadSeqs[conversationID], diff --git a/internal/rpc/third/s3.go b/internal/rpc/third/s3.go index 97206dd6d..757320dac 100644 --- a/internal/rpc/third/s3.go +++ b/internal/rpc/third/s3.go @@ -62,7 +62,7 @@ func (t *thirdServer) InitiateMultipartUpload(ctx context.Context, req *third.In return nil, err } expireTime := time.Now().Add(t.defaultExpire) - result, err := t.s3dataBase.InitiateMultipartUpload(ctx, req.Hash, req.Size, t.defaultExpire, int(req.MaxParts)) + result, err := t.s3dataBase.InitiateMultipartUpload(ctx, req.Hash, req.Size, t.defaultExpire, int(req.MaxParts), req.ContentType) if err != nil { if haErr, ok := errs.Unwrap(err).(*cont.HashAlreadyExistsError); ok { obj := &model.Object{ diff --git a/internal/rpc/user/config.go b/internal/rpc/user/config.go new file mode 100644 index 000000000..5a9a46359 --- /dev/null +++ b/internal/rpc/user/config.go @@ -0,0 +1,71 @@ +package user + +import ( + "context" + + "github.com/openimsdk/open-im-server/v3/pkg/authverify" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + pbuser "github.com/openimsdk/protocol/user" + "github.com/openimsdk/tools/utils/datautil" +) + +func (s *userServer) GetUserClientConfig(ctx context.Context, req *pbuser.GetUserClientConfigReq) (*pbuser.GetUserClientConfigResp, error) { + if req.UserID != "" { + if err := authverify.CheckAccessV3(ctx, req.UserID, s.config.Share.IMAdminUserID); err != nil { + return nil, err + } + if _, err := s.db.GetUserByID(ctx, req.UserID); err != nil { + return nil, err + } + } + res, err := s.clientConfig.GetUserConfig(ctx, req.UserID) + if err != nil { + return nil, err + } + return &pbuser.GetUserClientConfigResp{Configs: res}, nil +} + +func (s *userServer) SetUserClientConfig(ctx context.Context, req *pbuser.SetUserClientConfigReq) (*pbuser.SetUserClientConfigResp, error) { + if err := authverify.CheckAdmin(ctx, s.config.Share.IMAdminUserID); err != nil { + return nil, err + } + if req.UserID != "" { + if _, err := s.db.GetUserByID(ctx, req.UserID); err != nil { + return nil, err + } + } + if err := s.clientConfig.SetUserConfig(ctx, req.UserID, req.Configs); err != nil { + return nil, err + } + return &pbuser.SetUserClientConfigResp{}, nil +} + +func (s *userServer) DelUserClientConfig(ctx context.Context, req *pbuser.DelUserClientConfigReq) (*pbuser.DelUserClientConfigResp, error) { + if err := authverify.CheckAdmin(ctx, s.config.Share.IMAdminUserID); err != nil { + return nil, err + } + if err := s.clientConfig.DelUserConfig(ctx, req.UserID, req.Keys); err != nil { + return nil, err + } + return &pbuser.DelUserClientConfigResp{}, nil +} + +func (s *userServer) PageUserClientConfig(ctx context.Context, req *pbuser.PageUserClientConfigReq) (*pbuser.PageUserClientConfigResp, error) { + if err := authverify.CheckAdmin(ctx, s.config.Share.IMAdminUserID); err != nil { + return nil, err + } + total, res, err := s.clientConfig.GetUserConfigPage(ctx, req.UserID, req.Key, req.Pagination) + if err != nil { + return nil, err + } + return &pbuser.PageUserClientConfigResp{ + Total: total, + Configs: datautil.Slice(res, func(e *model.ClientConfig) *pbuser.ClientConfig { + return &pbuser.ClientConfig{ + UserID: e.UserID, + Key: e.Key, + Value: e.Value, + } + }), + }, nil +} diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index d4fe7ecc4..0e35aba6e 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -51,6 +51,10 @@ import ( "google.golang.org/grpc" ) +const ( + defaultSecret = "openIM123" +) + type userServer struct { pbuser.UnimplementedUserServer online cache.OnlineCache @@ -62,6 +66,7 @@ type userServer struct { webhookClient *webhook.Client groupClient *rpcli.GroupClient relationClient *rpcli.RelationClient + clientConfig controller.ClientConfigDatabase } type Config struct { @@ -94,6 +99,10 @@ func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegi if err != nil { return err } + clientConfigDB, err := mgo.NewClientConfig(mgocli.GetDB()) + if err != nil { + return err + } msgConn, err := client.GetConn(ctx, config.Discovery.RpcService.Msg) if err != nil { return err @@ -118,9 +127,9 @@ func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegi userNotificationSender: NewUserNotificationSender(config, msgClient, WithUserFunc(database.FindWithError)), config: config, webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL), - - groupClient: rpcli.NewGroupClient(groupConn), - relationClient: rpcli.NewRelationClient(friendConn), + clientConfig: controller.NewClientConfigDatabase(clientConfigDB, redis.NewClientConfigCache(rdb, clientConfigDB), mgocli.GetTx()), + groupClient: rpcli.NewGroupClient(groupConn), + relationClient: rpcli.NewRelationClient(friendConn), } pbuser.RegisterUserServer(server, u) return u.db.InitOnce(context.Background(), users) @@ -273,6 +282,10 @@ func (s *userServer) UserRegister(ctx context.Context, req *pbuser.UserRegisterR if len(req.Users) == 0 { return nil, errs.ErrArgs.WrapMsg("users is empty") } + // check if secret is changed + if s.config.Share.Secret == defaultSecret { + return nil, servererrs.ErrSecretNotChanged.Wrap() + } if err = authverify.CheckAdmin(ctx, s.config.Share.IMAdminUserID); err != nil { return nil, err diff --git a/pkg/common/servererrs/code.go b/pkg/common/servererrs/code.go index 3d0aa4a71..906f890a5 100644 --- a/pkg/common/servererrs/code.go +++ b/pkg/common/servererrs/code.go @@ -37,7 +37,8 @@ const ( // General error codes. const ( - NoError = 0 // No error + NoError = 0 // No error + DatabaseError = 90002 // Database error (redis/mysql, etc.) NetworkError = 90004 // Network error DataError = 90007 // Data error @@ -45,11 +46,12 @@ const ( CallbackError = 80000 // General error codes. - ServerInternalError = 500 // Server internal error - ArgsError = 1001 // Input parameter error - NoPermissionError = 1002 // Insufficient permission - DuplicateKeyError = 1003 - RecordNotFoundError = 1004 // Record does not exist + ServerInternalError = 500 // Server internal error + ArgsError = 1001 // Input parameter error + NoPermissionError = 1002 // Insufficient permission + DuplicateKeyError = 1003 + RecordNotFoundError = 1004 // Record does not exist + SecretNotChangedError = 1050 // secret not changed // Account error codes. UserIDNotFoundError = 1101 // UserID does not exist or is not registered diff --git a/pkg/common/servererrs/predefine.go b/pkg/common/servererrs/predefine.go index ab09aa512..b1d6b06a9 100644 --- a/pkg/common/servererrs/predefine.go +++ b/pkg/common/servererrs/predefine.go @@ -17,6 +17,8 @@ package servererrs import "github.com/openimsdk/tools/errs" var ( + ErrSecretNotChanged = errs.NewCodeError(SecretNotChangedError, "secret not changed, please change secret in config/share.yml for security reasons") + ErrDatabase = errs.NewCodeError(DatabaseError, "DatabaseError") ErrNetwork = errs.NewCodeError(NetworkError, "NetworkError") ErrCallback = errs.NewCodeError(CallbackError, "CallbackError") diff --git a/pkg/common/storage/cache/cachekey/client_config.go b/pkg/common/storage/cache/cachekey/client_config.go new file mode 100644 index 000000000..16770adef --- /dev/null +++ b/pkg/common/storage/cache/cachekey/client_config.go @@ -0,0 +1,10 @@ +package cachekey + +const ClientConfig = "CLIENT_CONFIG" + +func GetClientConfigKey(userID string) string { + if userID == "" { + return ClientConfig + } + return ClientConfig + ":" + userID +} diff --git a/pkg/common/storage/cache/client_config.go b/pkg/common/storage/cache/client_config.go new file mode 100644 index 000000000..329f25c59 --- /dev/null +++ b/pkg/common/storage/cache/client_config.go @@ -0,0 +1,8 @@ +package cache + +import "context" + +type ClientConfigCache interface { + DeleteUserCache(ctx context.Context, userIDs []string) error + GetUserConfig(ctx context.Context, userID string) (map[string]string, error) +} diff --git a/pkg/common/storage/cache/redis/client_config.go b/pkg/common/storage/cache/redis/client_config.go new file mode 100644 index 000000000..c5a455146 --- /dev/null +++ b/pkg/common/storage/cache/redis/client_config.go @@ -0,0 +1,69 @@ +package redis + +import ( + "context" + "time" + + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" + "github.com/redis/go-redis/v9" +) + +func NewClientConfigCache(rdb redis.UniversalClient, mgo database.ClientConfig) cache.ClientConfigCache { + rc := newRocksCacheClient(rdb) + return &ClientConfigCache{ + mgo: mgo, + rcClient: rc, + delete: rc.GetBatchDeleter(), + } +} + +type ClientConfigCache struct { + mgo database.ClientConfig + rcClient *rocksCacheClient + delete cache.BatchDeleter +} + +func (x *ClientConfigCache) getExpireTime(userID string) time.Duration { + if userID == "" { + return time.Hour * 24 + } else { + return time.Hour + } +} + +func (x *ClientConfigCache) getClientConfigKey(userID string) string { + return cachekey.GetClientConfigKey(userID) +} + +func (x *ClientConfigCache) GetConfig(ctx context.Context, userID string) (map[string]string, error) { + return getCache(ctx, x.rcClient, x.getClientConfigKey(userID), x.getExpireTime(userID), func(ctx context.Context) (map[string]string, error) { + return x.mgo.Get(ctx, userID) + }) +} + +func (x *ClientConfigCache) DeleteUserCache(ctx context.Context, userIDs []string) error { + keys := make([]string, 0, len(userIDs)) + for _, userID := range userIDs { + keys = append(keys, x.getClientConfigKey(userID)) + } + return x.delete.ExecDelWithKeys(ctx, keys) +} + +func (x *ClientConfigCache) GetUserConfig(ctx context.Context, userID string) (map[string]string, error) { + config, err := x.GetConfig(ctx, "") + if err != nil { + return nil, err + } + if userID != "" { + userConfig, err := x.GetConfig(ctx, userID) + if err != nil { + return nil, err + } + for k, v := range userConfig { + config[k] = v + } + } + return config, nil +} diff --git a/pkg/common/storage/controller/client_config.go b/pkg/common/storage/controller/client_config.go new file mode 100644 index 000000000..1c3787634 --- /dev/null +++ b/pkg/common/storage/controller/client_config.go @@ -0,0 +1,58 @@ +package controller + +import ( + "context" + + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/tools/db/pagination" + "github.com/openimsdk/tools/db/tx" +) + +type ClientConfigDatabase interface { + SetUserConfig(ctx context.Context, userID string, config map[string]string) error + GetUserConfig(ctx context.Context, userID string) (map[string]string, error) + DelUserConfig(ctx context.Context, userID string, keys []string) error + GetUserConfigPage(ctx context.Context, userID string, key string, pagination pagination.Pagination) (int64, []*model.ClientConfig, error) +} + +func NewClientConfigDatabase(db database.ClientConfig, cache cache.ClientConfigCache, tx tx.Tx) ClientConfigDatabase { + return &clientConfigDatabase{ + tx: tx, + db: db, + cache: cache, + } +} + +type clientConfigDatabase struct { + tx tx.Tx + db database.ClientConfig + cache cache.ClientConfigCache +} + +func (x *clientConfigDatabase) SetUserConfig(ctx context.Context, userID string, config map[string]string) error { + return x.tx.Transaction(ctx, func(ctx context.Context) error { + if err := x.db.Set(ctx, userID, config); err != nil { + return err + } + return x.cache.DeleteUserCache(ctx, []string{userID}) + }) +} + +func (x *clientConfigDatabase) GetUserConfig(ctx context.Context, userID string) (map[string]string, error) { + return x.cache.GetUserConfig(ctx, userID) +} + +func (x *clientConfigDatabase) DelUserConfig(ctx context.Context, userID string, keys []string) error { + return x.tx.Transaction(ctx, func(ctx context.Context) error { + if err := x.db.Del(ctx, userID, keys); err != nil { + return err + } + return x.cache.DeleteUserCache(ctx, []string{userID}) + }) +} + +func (x *clientConfigDatabase) GetUserConfigPage(ctx context.Context, userID string, key string, pagination pagination.Pagination) (int64, []*model.ClientConfig, error) { + return x.db.GetPage(ctx, userID, key, pagination) +} diff --git a/pkg/common/storage/controller/s3.go b/pkg/common/storage/controller/s3.go index 30d8d20ec..9ab31c5a6 100644 --- a/pkg/common/storage/controller/s3.go +++ b/pkg/common/storage/controller/s3.go @@ -33,7 +33,7 @@ type S3Database interface { PartLimit() (*s3.PartLimit, error) PartSize(ctx context.Context, size int64) (int64, error) AuthSign(ctx context.Context, uploadID string, partNumbers []int) (*s3.AuthSignResult, error) - InitiateMultipartUpload(ctx context.Context, hash string, size int64, expire time.Duration, maxParts int) (*cont.InitiateUploadResult, error) + InitiateMultipartUpload(ctx context.Context, hash string, size int64, expire time.Duration, maxParts int, contentType string) (*cont.InitiateUploadResult, error) CompleteMultipartUpload(ctx context.Context, uploadID string, parts []string) (*cont.UploadResult, error) AccessURL(ctx context.Context, name string, expire time.Duration, opt *s3.AccessURLOption) (time.Time, string, error) SetObject(ctx context.Context, info *model.Object) error @@ -73,8 +73,8 @@ func (s *s3Database) AuthSign(ctx context.Context, uploadID string, partNumbers return s.s3.AuthSign(ctx, uploadID, partNumbers) } -func (s *s3Database) InitiateMultipartUpload(ctx context.Context, hash string, size int64, expire time.Duration, maxParts int) (*cont.InitiateUploadResult, error) { - return s.s3.InitiateUpload(ctx, hash, size, expire, maxParts) +func (s *s3Database) InitiateMultipartUpload(ctx context.Context, hash string, size int64, expire time.Duration, maxParts int, contentType string) (*cont.InitiateUploadResult, error) { + return s.s3.InitiateUploadContentType(ctx, hash, size, expire, maxParts, contentType) } func (s *s3Database) CompleteMultipartUpload(ctx context.Context, uploadID string, parts []string) (*cont.UploadResult, error) { diff --git a/pkg/common/storage/database/client_config.go b/pkg/common/storage/database/client_config.go new file mode 100644 index 000000000..7fa888d24 --- /dev/null +++ b/pkg/common/storage/database/client_config.go @@ -0,0 +1,15 @@ +package database + +import ( + "context" + + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/tools/db/pagination" +) + +type ClientConfig interface { + Set(ctx context.Context, userID string, config map[string]string) error + Get(ctx context.Context, userID string) (map[string]string, error) + Del(ctx context.Context, userID string, keys []string) error + GetPage(ctx context.Context, userID string, key string, pagination pagination.Pagination) (int64, []*model.ClientConfig, error) +} diff --git a/pkg/common/storage/database/mgo/client_config.go b/pkg/common/storage/database/mgo/client_config.go new file mode 100644 index 000000000..0aa462899 --- /dev/null +++ b/pkg/common/storage/database/mgo/client_config.go @@ -0,0 +1,99 @@ +// Copyright © 2023 OpenIM open source community. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mgo + +import ( + "context" + + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/tools/db/mongoutil" + "github.com/openimsdk/tools/db/pagination" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + + "github.com/openimsdk/tools/errs" +) + +func NewClientConfig(db *mongo.Database) (database.ClientConfig, error) { + coll := db.Collection("config") + _, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{ + { + Keys: bson.D{ + {Key: "key", Value: 1}, + {Key: "user_id", Value: 1}, + }, + Options: options.Index().SetUnique(true), + }, + }) + if err != nil { + return nil, errs.Wrap(err) + } + return &ClientConfig{ + coll: coll, + }, nil +} + +type ClientConfig struct { + coll *mongo.Collection +} + +func (x *ClientConfig) Set(ctx context.Context, userID string, config map[string]string) error { + if len(config) == 0 { + return nil + } + for key, value := range config { + filter := bson.M{"key": key, "user_id": userID} + update := bson.M{ + "value": value, + } + err := mongoutil.UpdateOne(ctx, x.coll, filter, bson.M{"$set": update}, false, options.Update().SetUpsert(true)) + if err != nil { + return err + } + } + return nil +} + +func (x *ClientConfig) Get(ctx context.Context, userID string) (map[string]string, error) { + cs, err := mongoutil.Find[*model.ClientConfig](ctx, x.coll, bson.M{"user_id": userID}) + if err != nil { + return nil, err + } + cm := make(map[string]string) + for _, config := range cs { + cm[config.Key] = config.Value + } + return cm, nil +} + +func (x *ClientConfig) Del(ctx context.Context, userID string, keys []string) error { + if len(keys) == 0 { + return nil + } + return mongoutil.DeleteMany(ctx, x.coll, bson.M{"key": bson.M{"$in": keys}, "user_id": userID}) +} + +func (x *ClientConfig) GetPage(ctx context.Context, userID string, key string, pagination pagination.Pagination) (int64, []*model.ClientConfig, error) { + filter := bson.M{} + if userID != "" { + filter["user_id"] = userID + } + if key != "" { + filter["key"] = key + } + return mongoutil.FindPage[*model.ClientConfig](ctx, x.coll, filter, pagination) +} diff --git a/pkg/common/storage/model/client_config.go b/pkg/common/storage/model/client_config.go new file mode 100644 index 000000000..f06e29102 --- /dev/null +++ b/pkg/common/storage/model/client_config.go @@ -0,0 +1,7 @@ +package model + +type ClientConfig struct { + Key string `bson:"key"` + UserID string `bson:"user_id"` + Value string `bson:"value"` +} diff --git a/pkg/rpccache/conversation.go b/pkg/rpccache/conversation.go index 70f5acfd1..162fda596 100644 --- a/pkg/rpccache/conversation.go +++ b/pkg/rpccache/conversation.go @@ -16,6 +16,7 @@ package rpccache import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/localcache" @@ -153,6 +154,26 @@ func (c *ConversationLocalCache) getConversationNotReceiveMessageUserIDs(ctx con })) } +func (c *ConversationLocalCache) getPinnedConversationIDs(ctx context.Context, userID string) (val []string, err error) { + log.ZDebug(ctx, "ConversationLocalCache getPinnedConversations req", "userID", userID) + defer func() { + if err == nil { + log.ZDebug(ctx, "ConversationLocalCache getPinnedConversations return", "userID", userID, "value", val) + } else { + log.ZError(ctx, "ConversationLocalCache getPinnedConversations return", err, "userID", userID) + } + }() + var cache cacheProto[pbconversation.GetPinnedConversationIDsResp] + resp, err := cache.Unmarshal(c.local.Get(ctx, cachekey.GetPinnedConversationIDs(userID), func(ctx context.Context) ([]byte, error) { + log.ZDebug(ctx, "ConversationLocalCache getConversationNotReceiveMessageUserIDs rpc", "userID", userID) + return cache.Marshal(c.client.ConversationClient.GetPinnedConversationIDs(ctx, &pbconversation.GetPinnedConversationIDsReq{UserID: userID})) + })) + if err != nil { + return nil, err + } + return resp.ConversationIDs, nil +} + func (c *ConversationLocalCache) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) { res, err := c.getConversationNotReceiveMessageUserIDs(ctx, conversationID) if err != nil { @@ -168,3 +189,7 @@ func (c *ConversationLocalCache) GetConversationNotReceiveMessageUserIDMap(ctx c } return datautil.SliceSet(res.UserIDs), nil } + +func (c *ConversationLocalCache) GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error) { + return c.getPinnedConversationIDs(ctx, userID) +} diff --git a/tools/s3/internal/conversion.go b/tools/s3/internal/conversion.go index ba2174535..af391ec42 100644 --- a/tools/s3/internal/conversion.go +++ b/tools/s3/internal/conversion.go @@ -4,6 +4,11 @@ import ( "context" "errors" "fmt" + "log" + "net/http" + "path/filepath" + "time" + "github.com/mitchellh/mapstructure" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" @@ -19,10 +24,6 @@ import ( "github.com/openimsdk/tools/s3/oss" "github.com/spf13/viper" "go.mongodb.org/mongo-driver/mongo" - "log" - "net/http" - "path/filepath" - "time" ) const defaultTimeout = time.Second * 10 @@ -159,7 +160,7 @@ func doObject(db database.ObjectInfo, newS3, oldS3 s3.Interface, skip int) (*Res if err != nil { return nil, err } - putURL, err := newS3.PresignedPutObject(ctx, obj.Key, time.Hour) + putURL, err := newS3.PresignedPutObject(ctx, obj.Key, time.Hour, &s3.PutOption{ContentType: obj.ContentType}) if err != nil { return nil, err } @@ -176,7 +177,7 @@ func doObject(db database.ObjectInfo, newS3, oldS3 s3.Interface, skip int) (*Res return nil, fmt.Errorf("download object failed %s", downloadResp.Status) } log.Printf("file size %d", obj.Size) - request, err := http.NewRequest(http.MethodPut, putURL, downloadResp.Body) + request, err := http.NewRequest(http.MethodPut, putURL.URL, downloadResp.Body) if err != nil { return nil, err } diff --git a/tools/stress-test/README.md b/tools/stress-test/README.md new file mode 100644 index 000000000..531233a20 --- /dev/null +++ b/tools/stress-test/README.md @@ -0,0 +1,25 @@ +# Stress Test + +## Usage + +You need set `TestTargetUserList` and `DefaultGroupID` variables. + +### Build + +```bash +go build -o _output/bin/tools/linux/amd64/stress-test tools/stress-test/main.go + +# or + +go build -o tools/stress-test/stress-test tools/stress-test/main.go +``` + +### Excute + +```bash +_output/bin/tools/linux/amd64/stress-test -c config/ + +#or + +tools/stress-test/stress-test -c config/ +``` diff --git a/tools/stress-test/main.go b/tools/stress-test/main.go index 6adbd12ee..f845b5e93 100755 --- a/tools/stress-test/main.go +++ b/tools/stress-test/main.go @@ -384,6 +384,7 @@ func main() { os.Exit(1) return } + // Invite To Group if err = st.InviteToGroup(st.Ctx, userCreatedID); err != nil { log.ZError(st.Ctx, "Invite To Group failed.", err, "UserID", userCreatedID) @@ -416,8 +417,8 @@ func main() { case <-ticker.C: // Send Message - if err = st.SendMsg(st.Ctx, st.DefaultUserID); err != nil { - log.ZError(st.Ctx, "Send Message failed.", err, "UserID", st.DefaultUserID) + if err = st.SendMsg(st.Ctx, st.DefaultSendUserID); err != nil { + log.ZError(st.Ctx, "Send Message failed.", err, "UserID", st.DefaultSendUserID) continue } } diff --git a/version/version.go b/version/version.go index 23b3a82f5..32ad27808 100644 --- a/version/version.go +++ b/version/version.go @@ -1,6 +1,14 @@ package version -import _ "embed" +import ( + _ "embed" + "strings" +) //go:embed version var Version string + +func init() { + Version = strings.Trim(Version, "\n") + Version = strings.TrimSpace(Version) +}