feat: support app update service (#2794)

* fix: GroupApplicationAcceptedNotification

* fix: GroupApplicationAcceptedNotification

* fix: NotificationUserInfoUpdate

* cicd: robot automated Change

* fix: component

* fix: getConversationInfo

* feat: cron task

* feat: cron task

* feat: cron task

* feat: cron task

* feat: cron task

* fix: minio config url recognition error

* update gomake version

* update gomake version

* fix: seq conversion bug

* fix: redis pipe exec

* fix: ImportFriends

* fix: A large number of logs keysAndValues ​​length is not even

* feat: mark read aggregate write

* feat: online status supports redis cluster

* feat: online status supports redis cluster

* feat: online status supports redis cluster

* merge

* merge

* read seq is written to mongo

* read seq is written to mongo

* fix: invitation to join group notification

* fix: friend op_user_id

* feat: optimizing asynchronous context

* feat: optimizing memamq size

* feat: add GetSeqMessage

* feat: GroupApplicationAgreeMemberEnterNotification

* feat: GroupApplicationAgreeMemberEnterNotification

* feat: go.mod

* feat: go.mod

* feat: join group notification and get seq

* feat: join group notification and get seq

* feat: avoid pulling messages from sessions with a large number of max seq values of 0

* feat: API supports gzip

* go.mod

* fix: nil pointer error on close

* fix: listen error

* fix: listen error

* update go.mod

* feat: add log

* fix: token parse token value

* fix: GetMsgBySeqs boundary issues

* fix: sn_ not sort

* fix: sn_ not sort

* fix: sn_ not sort

* fix: jssdk add

* fix: jssdk support

* fix: jssdk support

* fix: jssdk support

* fix: the message I sent is not set to read seq in mongodb

* fix: cannot modify group member avatars

* fix: MemberEnterNotification

* fix: MemberEnterNotification

* fix: MsgData status

* feat: add ApplicationVersion

---------

Co-authored-by: withchao <withchao@users.noreply.github.com>
pull/2796/head
chao 2 months ago committed by GitHub
parent 312c8ba9d6
commit 649250b7c6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -12,7 +12,7 @@ require (
github.com/gorilla/websocket v1.5.1
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.72-alpha.46
github.com/openimsdk/protocol v0.0.72-alpha.47
github.com/openimsdk/tools v0.0.50-alpha.16
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0

@ -319,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.72-alpha.46 h1:1LZlfEHLzw1F4afFmqBczmXKJWm5rUQ+yr8rJ4oyEAc=
github.com/openimsdk/protocol v0.0.72-alpha.46/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/protocol v0.0.72-alpha.47 h1:FGHnEwsA05GxT3vnz7YH3fbVkuoO3P71ZZgkQQ71MjA=
github.com/openimsdk/protocol v0.0.72-alpha.47/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/tools v0.0.50-alpha.16 h1:bC1AQvJMuOHtZm8LZRvN8L5mH1Ws2VYdL+TLTs1iGSc=
github.com/openimsdk/tools v0.0.50-alpha.16/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=

@ -198,6 +198,13 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En
objectGroup.POST("/initiate_form_data", t.InitiateFormData)
objectGroup.POST("/complete_form_data", t.CompleteFormData)
objectGroup.GET("/*name", t.ObjectRedirect)
applicationGroup := r.Group("application")
applicationGroup.POST("/add_version", t.AddApplicationVersion)
applicationGroup.POST("/update_version", t.UpdateApplicationVersion)
applicationGroup.POST("/delete_version", t.DeleteApplicationVersion)
applicationGroup.POST("/latest_version", t.LatestApplicationVersion)
applicationGroup.POST("/page_versions", t.PageApplicationVersion)
}
// Message
msgGroup := r.Group("/msg")
@ -290,4 +297,6 @@ func GinParseToken(authRPC *rpcclient.Auth) gin.HandlerFunc {
var Whitelist = []string{
"/auth/get_admin_token",
"/auth/parse_token",
"/application/latest_version",
"/application/page_versions",
}

@ -170,3 +170,23 @@ func (o *ThirdApi) SearchLogs(c *gin.Context) {
func (o *ThirdApi) GetPrometheus(c *gin.Context) {
c.Redirect(http.StatusFound, o.GrafanaUrl)
}
func (o *ThirdApi) LatestApplicationVersion(c *gin.Context) {
a2r.Call(third.ThirdClient.LatestApplicationVersion, o.Client, c)
}
func (o *ThirdApi) AddApplicationVersion(c *gin.Context) {
a2r.Call(third.ThirdClient.AddApplicationVersion, o.Client, c)
}
func (o *ThirdApi) UpdateApplicationVersion(c *gin.Context) {
a2r.Call(third.ThirdClient.UpdateApplicationVersion, o.Client, c)
}
func (o *ThirdApi) DeleteApplicationVersion(c *gin.Context) {
a2r.Call(third.ThirdClient.DeleteApplicationVersion, o.Client, c)
}
func (o *ThirdApi) PageApplicationVersion(c *gin.Context) {
a2r.Call(third.ThirdClient.PageApplicationVersion, o.Client, c)
}

@ -0,0 +1,117 @@
package third
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/protocol/third"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils/datautil"
"github.com/redis/go-redis/v9"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"time"
)
func IsNotFound(err error) bool {
switch errs.Unwrap(err) {
case redis.Nil, mongo.ErrNoDocuments:
return true
default:
return false
}
}
func (t *thirdServer) db2pbApplication(val *model.Application) *third.ApplicationVersion {
return &third.ApplicationVersion{
Id: val.ID.Hex(),
Platform: val.Platform,
Version: val.Version,
Url: val.Url,
Text: val.Text,
Force: val.Force,
Latest: val.Latest,
CreateTime: val.CreateTime.UnixMilli(),
}
}
func (t *thirdServer) LatestApplicationVersion(ctx context.Context, req *third.LatestApplicationVersionReq) (*third.LatestApplicationVersionResp, error) {
res, err := t.applicationDatabase.LatestVersion(ctx, req.Platform)
if err == nil {
return &third.LatestApplicationVersionResp{Version: t.db2pbApplication(res)}, nil
} else if IsNotFound(err) {
return &third.LatestApplicationVersionResp{}, nil
} else {
return nil, err
}
}
func (t *thirdServer) AddApplicationVersion(ctx context.Context, req *third.AddApplicationVersionReq) (*third.AddApplicationVersionResp, error) {
if err := authverify.CheckAdmin(ctx, t.config.Share.IMAdminUserID); err != nil {
return nil, err
}
val := &model.Application{
ID: primitive.NewObjectID(),
Platform: req.Platform,
Version: req.Version,
Url: req.Url,
Text: req.Text,
Force: req.Force,
Latest: req.Latest,
CreateTime: time.Now(),
}
if err := t.applicationDatabase.AddVersion(ctx, val); err != nil {
return nil, err
}
return &third.AddApplicationVersionResp{}, nil
}
func (t *thirdServer) UpdateApplicationVersion(ctx context.Context, req *third.UpdateApplicationVersionReq) (*third.UpdateApplicationVersionResp, error) {
if err := authverify.CheckAdmin(ctx, t.config.Share.IMAdminUserID); err != nil {
return nil, err
}
oid, err := primitive.ObjectIDFromHex(req.Id)
if err != nil {
return nil, errs.ErrArgs.WrapMsg("invalid id " + err.Error())
}
update := make(map[string]any)
putUpdate(update, "platform", req.Platform)
putUpdate(update, "version", req.Version)
putUpdate(update, "url", req.Url)
putUpdate(update, "text", req.Text)
putUpdate(update, "force", req.Force)
putUpdate(update, "latest", req.Latest)
if err := t.applicationDatabase.UpdateVersion(ctx, oid, update); err != nil {
return nil, err
}
return &third.UpdateApplicationVersionResp{}, nil
}
func (t *thirdServer) DeleteApplicationVersion(ctx context.Context, req *third.DeleteApplicationVersionReq) (*third.DeleteApplicationVersionResp, error) {
if err := authverify.CheckAdmin(ctx, t.config.Share.IMAdminUserID); err != nil {
return nil, err
}
ids := make([]primitive.ObjectID, 0, len(req.Id))
for _, id := range req.Id {
oid, err := primitive.ObjectIDFromHex(id)
if err != nil {
return nil, errs.ErrArgs.WrapMsg("invalid id " + err.Error())
}
ids = append(ids, oid)
}
if err := t.applicationDatabase.DeleteVersion(ctx, ids); err != nil {
return nil, err
}
return &third.DeleteApplicationVersionResp{}, nil
}
func (t *thirdServer) PageApplicationVersion(ctx context.Context, req *third.PageApplicationVersionReq) (*third.PageApplicationVersionResp, error) {
total, res, err := t.applicationDatabase.PageVersion(ctx, req.Platform, req.Pagination)
if err != nil {
return nil, err
}
return &third.PageApplicationVersionResp{
Total: total,
Versions: datautil.Slice(res, t.db2pbApplication),
}, nil
}

@ -38,12 +38,13 @@ import (
)
type thirdServer struct {
thirdDatabase controller.ThirdDatabase
s3dataBase controller.S3Database
userRpcClient rpcclient.UserRpcClient
defaultExpire time.Duration
config *Config
minio *minio.Minio
thirdDatabase controller.ThirdDatabase
s3dataBase controller.S3Database
userRpcClient rpcclient.UserRpcClient
defaultExpire time.Duration
config *Config
minio *minio.Minio
applicationDatabase controller.ApplicationDatabase
}
type Config struct {
@ -74,6 +75,11 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
if err != nil {
return err
}
applicationMgo, err := mgo.NewApplicationMgo(mgocli.GetDB())
if err != nil {
return err
}
// Select the oss method according to the profile policy
enable := config.RpcConfig.Object.Enable
var (
@ -98,12 +104,13 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
}
localcache.InitLocalCache(&config.LocalCacheConfig)
third.RegisterThirdServer(server, &thirdServer{
thirdDatabase: controller.NewThirdDatabase(redis.NewThirdCache(rdb), logdb),
userRpcClient: rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID),
s3dataBase: controller.NewS3Database(rdb, o, s3db),
defaultExpire: time.Hour * 24 * 7,
config: config,
minio: minioCli,
thirdDatabase: controller.NewThirdDatabase(redis.NewThirdCache(rdb), logdb),
userRpcClient: rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID),
s3dataBase: controller.NewS3Database(rdb, o, s3db),
defaultExpire: time.Hour * 24 * 7,
config: config,
minio: minioCli,
applicationDatabase: controller.NewApplicationDatabase(applicationMgo, redis.NewApplicationRedisCache(applicationMgo, rdb)),
})
return nil
}

@ -82,3 +82,11 @@ func checkValidObjectName(objectName string) error {
func (t *thirdServer) IsManagerUserID(opUserID string) bool {
return authverify.IsManagerUserID(opUserID, t.config.Share.IMAdminUserID)
}
func putUpdate[T any](update map[string]any, name string, val interface{ GetValuePtr() *T }) {
ptrVal := val.GetValuePtr()
if ptrVal == nil {
return
}
update[name] = *ptrVal
}

@ -0,0 +1,11 @@
package cache
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
)
type ApplicationCache interface {
LatestVersion(ctx context.Context, platform string) (*model.Application, error)
DeleteCache(ctx context.Context, platforms []string) error
}

@ -0,0 +1,9 @@
package cachekey
const (
ApplicationLatestVersion = "APPLICATION_LATEST_VERSION:"
)
func GetApplicationLatestVersionKey(platform string) string {
return ApplicationLatestVersion + platform
}

@ -0,0 +1,43 @@
package redis
import (
"context"
"github.com/dtm-labs/rockscache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/utils/datautil"
"github.com/redis/go-redis/v9"
"time"
)
func NewApplicationRedisCache(db database.Application, rdb redis.UniversalClient) *ApplicationRedisCache {
return &ApplicationRedisCache{
db: db,
rcClient: rockscache.NewClient(rdb, *GetRocksCacheOptions()),
deleter: NewBatchDeleterRedis(rdb, GetRocksCacheOptions(), nil),
expireTime: time.Hour * 24 * 7,
}
}
type ApplicationRedisCache struct {
db database.Application
rcClient *rockscache.Client
deleter *BatchDeleterRedis
expireTime time.Duration
}
func (a *ApplicationRedisCache) LatestVersion(ctx context.Context, platform string) (*model.Application, error) {
return getCache(ctx, a.rcClient, cachekey.GetApplicationLatestVersionKey(platform), a.expireTime, func(ctx context.Context) (*model.Application, error) {
return a.db.LatestVersion(ctx, platform)
})
}
func (a *ApplicationRedisCache) DeleteCache(ctx context.Context, platforms []string) error {
if len(platforms) == 0 {
return nil
}
return a.deleter.ExecDelWithKeys(ctx, datautil.Slice(platforms, func(platform string) string {
return cachekey.GetApplicationLatestVersionKey(platform)
}))
}

@ -0,0 +1,69 @@
package controller
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/db/pagination"
"go.mongodb.org/mongo-driver/bson/primitive"
)
type ApplicationDatabase interface {
LatestVersion(ctx context.Context, platform string) (*model.Application, error)
AddVersion(ctx context.Context, val *model.Application) error
UpdateVersion(ctx context.Context, id primitive.ObjectID, update map[string]any) error
DeleteVersion(ctx context.Context, id []primitive.ObjectID) error
PageVersion(ctx context.Context, platforms []string, page pagination.Pagination) (int64, []*model.Application, error)
}
func NewApplicationDatabase(db database.Application, cache cache.ApplicationCache) ApplicationDatabase {
return &applicationDatabase{db: db, cache: cache}
}
type applicationDatabase struct {
db database.Application
cache cache.ApplicationCache
}
func (a *applicationDatabase) LatestVersion(ctx context.Context, platform string) (*model.Application, error) {
return a.cache.LatestVersion(ctx, platform)
}
func (a *applicationDatabase) AddVersion(ctx context.Context, val *model.Application) error {
if err := a.db.AddVersion(ctx, val); err != nil {
return err
}
return a.cache.DeleteCache(ctx, []string{val.Platform})
}
func (a *applicationDatabase) UpdateVersion(ctx context.Context, id primitive.ObjectID, update map[string]any) error {
platforms, err := a.db.FindPlatform(ctx, []primitive.ObjectID{id})
if err != nil {
return err
}
if err := a.db.UpdateVersion(ctx, id, update); err != nil {
return err
}
if p, ok := update["platform"]; ok {
if val, ok := p.(string); ok {
platforms = append(platforms, val)
}
}
return a.cache.DeleteCache(ctx, platforms)
}
func (a *applicationDatabase) DeleteVersion(ctx context.Context, id []primitive.ObjectID) error {
platforms, err := a.db.FindPlatform(ctx, id)
if err != nil {
return err
}
if err := a.db.DeleteVersion(ctx, id); err != nil {
return err
}
return a.cache.DeleteCache(ctx, platforms)
}
func (a *applicationDatabase) PageVersion(ctx context.Context, platforms []string, page pagination.Pagination) (int64, []*model.Application, error) {
return a.db.PageVersion(ctx, platforms, page)
}

@ -0,0 +1,17 @@
package database
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/db/pagination"
"go.mongodb.org/mongo-driver/bson/primitive"
)
type Application interface {
LatestVersion(ctx context.Context, platform string) (*model.Application, error)
AddVersion(ctx context.Context, val *model.Application) error
UpdateVersion(ctx context.Context, id primitive.ObjectID, update map[string]any) error
DeleteVersion(ctx context.Context, id []primitive.ObjectID) error
PageVersion(ctx context.Context, platforms []string, page pagination.Pagination) (int64, []*model.Application, error)
FindPlatform(ctx context.Context, id []primitive.ObjectID) ([]string, error)
}

@ -0,0 +1,82 @@
package mgo
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/pagination"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
func NewApplicationMgo(db *mongo.Database) (*ApplicationMgo, error) {
coll := db.Collection("application")
_, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
{
Keys: bson.D{
{Key: "platform", Value: 1},
{Key: "version", Value: 1},
},
Options: options.Index().SetUnique(true),
},
{
Keys: bson.D{
{Key: "latest", Value: -1},
},
},
})
if err != nil {
return nil, err
}
return &ApplicationMgo{coll: coll}, nil
}
type ApplicationMgo struct {
coll *mongo.Collection
}
func (a *ApplicationMgo) sort() any {
return bson.D{{"latest", -1}, {"_id", -1}}
}
func (a *ApplicationMgo) LatestVersion(ctx context.Context, platform string) (*model.Application, error) {
return mongoutil.FindOne[*model.Application](ctx, a.coll, bson.M{"platform": platform}, options.FindOne().SetSort(a.sort()))
}
func (a *ApplicationMgo) AddVersion(ctx context.Context, val *model.Application) error {
if val.ID.IsZero() {
val.ID = primitive.NewObjectID()
}
return mongoutil.InsertMany(ctx, a.coll, []*model.Application{val})
}
func (a *ApplicationMgo) UpdateVersion(ctx context.Context, id primitive.ObjectID, update map[string]any) error {
if len(update) == 0 {
return nil
}
return mongoutil.UpdateOne(ctx, a.coll, bson.M{"_id": id}, bson.M{"$set": update}, true)
}
func (a *ApplicationMgo) DeleteVersion(ctx context.Context, id []primitive.ObjectID) error {
if len(id) == 0 {
return nil
}
return mongoutil.DeleteMany(ctx, a.coll, bson.M{"_id": bson.M{"$in": id}})
}
func (a *ApplicationMgo) PageVersion(ctx context.Context, platforms []string, page pagination.Pagination) (int64, []*model.Application, error) {
filter := bson.M{}
if len(platforms) > 0 {
filter["platform"] = bson.M{"$in": platforms}
}
return mongoutil.FindPage[*model.Application](ctx, a.coll, filter, page, options.Find().SetSort(a.sort()))
}
func (a *ApplicationMgo) FindPlatform(ctx context.Context, id []primitive.ObjectID) ([]string, error) {
if len(id) == 0 {
return nil, nil
}
return mongoutil.Find[string](ctx, a.coll, bson.M{"_id": bson.M{"$in": id}}, options.Find().SetProjection(bson.M{"_id": 0, "platform": 1}))
}

@ -0,0 +1,17 @@
package model
import (
"go.mongodb.org/mongo-driver/bson/primitive"
"time"
)
type Application struct {
ID primitive.ObjectID `bson:"_id"`
Platform string `bson:"platform"`
Version string `bson:"version"`
Url string `bson:"url"`
Text string `bson:"text"`
Force bool `bson:"force"`
Latest bool `bson:"latest"`
CreateTime time.Time `bson:"create_time"`
}
Loading…
Cancel
Save