diff --git a/config/log.yml b/config/log.yml index 38926479c..1f5641622 100644 --- a/config/log.yml +++ b/config/log.yml @@ -1,7 +1,7 @@ # Log storage path, default is acceptable, change to a full path if modification is needed storageLocation: ../../../../logs/ # Log rotation period (in hours), default is acceptable -rotationTime: 1 +rotationTime: 24 # Maximum size of each log file (in MB), default is acceptable, it means unlimited maxSize: 0 # Number of log files to retain, default is acceptable, it means whenever the log file is rotated, the old log file will be deleted diff --git a/config/openim-rpc-third.yml b/config/openim-rpc-third.yml index bde38ccc4..6fb60f47f 100644 --- a/config/openim-rpc-third.yml +++ b/config/openim-rpc-third.yml @@ -29,4 +29,12 @@ object: accessKeyID: '' accessKeySecret: '' sessionToken: '' + publicRead: false + kodo: + endpoint: "http://s3.cn-south-1.qiniucs.com" + bucket: "kodo-bucket-test" + bucketURL: "http://kodo-bucket-test-oetobfb.qiniudns.com" + accessKeyID: '' + accessKeySecret: '' + sessionToken: '' publicRead: false \ No newline at end of file diff --git a/go.mod b/go.mod index 5ea0a62cc..63fe0d47e 100644 --- a/go.mod +++ b/go.mod @@ -53,6 +53,24 @@ require ( cloud.google.com/go/longrunning v0.5.4 // indirect cloud.google.com/go/storage v1.36.0 // indirect github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible // indirect + github.com/aws/aws-sdk-go-v2 v1.23.1 // indirect + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.5.1 // indirect + github.com/aws/aws-sdk-go-v2/config v1.25.4 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.16.3 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.5 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.4 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.4 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.7.1 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.2.4 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.10.1 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.2.4 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.10.4 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.16.4 // indirect + github.com/aws/aws-sdk-go-v2/service/s3 v1.43.1 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.17.3 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.20.1 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.25.4 // indirect + github.com/aws/smithy-go v1.17.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bytedance/sonic v1.9.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect @@ -118,6 +136,7 @@ require ( github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/common v0.45.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect + github.com/qiniu/go-sdk/v7 v7.18.2 // indirect github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect github.com/rs/xid v1.5.0 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index 0581a025b..1270eb978 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -20,6 +20,7 @@ import ( "runtime/debug" "sync" "sync/atomic" + "time" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/protocol/constant" @@ -72,6 +73,8 @@ type Client struct { closed atomic.Bool closedErr error token string + hbCtx context.Context + hbCancel context.CancelFunc } // ResetClient updates the client's state with new connection and context information. @@ -88,6 +91,7 @@ func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, longConnServer c.closed.Store(false) c.closedErr = nil c.token = ctx.GetToken() + c.hbCtx, c.hbCancel = context.WithCancel(c.ctx) } func (c *Client) pingHandler(_ string) error { @@ -98,6 +102,13 @@ func (c *Client) pingHandler(_ string) error { return c.writePongMsg() } +func (c *Client) pongHandler(_ string) error { + if err := c.conn.SetReadDeadline(pongWait); err != nil { + return err + } + return nil +} + // readMessage continuously reads messages from the connection. func (c *Client) readMessage() { defer func() { @@ -110,7 +121,9 @@ func (c *Client) readMessage() { c.conn.SetReadLimit(maxMessageSize) _ = c.conn.SetReadDeadline(pongWait) + c.conn.SetPongHandler(c.pongHandler) c.conn.SetPingHandler(c.pingHandler) + c.activeHeartbeat(c.hbCtx) for { log.ZDebug(c.ctx, "readMessage") @@ -147,6 +160,7 @@ func (c *Client) readMessage() { case CloseMessage: c.closedErr = ErrClientClosed return + default: } } @@ -235,6 +249,7 @@ func (c *Client) close() { c.closed.Store(true) c.conn.Close() + c.hbCancel() // Close server-initiated heartbeat. c.longConnServer.UnRegister(c) } @@ -321,6 +336,44 @@ func (c *Client) writeBinaryMsg(resp Resp) error { return c.conn.WriteMessage(MessageBinary, encodedBuf) } +// Actively initiate Heartbeat when platform in Web. +func (c *Client) activeHeartbeat(ctx context.Context) { + if c.PlatformID == constant.WebPlatformID { + go func() { + log.ZDebug(ctx, "server initiative send heartbeat start.") + ticker := time.NewTicker(pingPeriod) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if err := c.writePingMsg(); err != nil { + log.ZWarn(c.ctx, "send Ping Message error.", err) + return + } + case <-c.hbCtx.Done(): + return + } + } + }() + } +} +func (c *Client) writePingMsg() error { + if c.closed.Load() { + return nil + } + + c.w.Lock() + defer c.w.Unlock() + + err := c.conn.SetWriteDeadline(writeWait) + if err != nil { + return err + } + + return c.conn.WriteMessage(PingMessage, nil) +} + func (c *Client) writePongMsg() error { if c.closed.Load() { return nil diff --git a/internal/msggateway/constant.go b/internal/msggateway/constant.go index 64664ac0a..125be1635 100644 --- a/internal/msggateway/constant.go +++ b/internal/msggateway/constant.go @@ -53,6 +53,9 @@ const ( // Time allowed to read the next pong message from the peer. pongWait = 30 * time.Second + // Send pings to peer with this period. Must be less than pongWait. + pingPeriod = (pongWait * 9) / 10 + // Maximum message size allowed from peer. maxMessageSize = 51200 ) diff --git a/internal/msggateway/long_conn.go b/internal/msggateway/long_conn.go index 7d5bef4c3..c1b3e27c9 100644 --- a/internal/msggateway/long_conn.go +++ b/internal/msggateway/long_conn.go @@ -16,10 +16,11 @@ package msggateway import ( "encoding/json" - "github.com/openimsdk/tools/apiresp" "net/http" "time" + "github.com/openimsdk/tools/apiresp" + "github.com/gorilla/websocket" "github.com/openimsdk/tools/errs" ) diff --git a/internal/msggateway/n_ws_server.go b/internal/msggateway/ws_server.go similarity index 100% rename from internal/msggateway/n_ws_server.go rename to internal/msggateway/ws_server.go diff --git a/internal/rpc/third/third.go b/internal/rpc/third/third.go index d0d88b174..3ae560033 100644 --- a/internal/rpc/third/third.go +++ b/internal/rpc/third/third.go @@ -33,6 +33,7 @@ import ( "github.com/openimsdk/tools/s3/cos" "github.com/openimsdk/tools/s3/minio" "github.com/openimsdk/tools/s3/oss" + "github.com/openimsdk/tools/s3/kodo" "google.golang.org/grpc" ) @@ -82,6 +83,8 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg o, err = cos.NewCos(*config.RpcConfig.Object.Cos.Build()) case "oss": o, err = oss.NewOSS(*config.RpcConfig.Object.Oss.Build()) + case "kodo": + o, err = kodo.NewKodo(*config.RpcConfig.Object.Kodo.Build()) default: err = fmt.Errorf("invalid object enable: %s", enable) } diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 536e4d547..d8916ad04 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -21,6 +21,7 @@ import ( "github.com/openimsdk/tools/s3/cos" "github.com/openimsdk/tools/s3/minio" "github.com/openimsdk/tools/s3/oss" + "github.com/openimsdk/tools/s3/kodo" "strings" "time" ) @@ -281,15 +282,7 @@ type Third struct { Enable string `mapstructure:"enable"` Cos Cos `mapstructure:"cos"` Oss Oss `mapstructure:"oss"` - Kodo struct { - Endpoint string `mapstructure:"endpoint"` - Bucket string `mapstructure:"bucket"` - BucketURL string `mapstructure:"bucketURL"` - AccessKeyID string `mapstructure:"accessKeyID"` - AccessKeySecret string `mapstructure:"accessKeySecret"` - SessionToken string `mapstructure:"sessionToken"` - PublicRead bool `mapstructure:"publicRead"` - } `mapstructure:"kodo"` + Kodo Kodo `mapstructure:"kodo"` Aws struct { Endpoint string `mapstructure:"endpoint"` Region string `mapstructure:"region"` @@ -317,6 +310,16 @@ type Oss struct { PublicRead bool `mapstructure:"publicRead"` } +type Kodo struct { + Endpoint string `mapstructure:"endpoint"` + Bucket string `mapstructure:"bucket"` + BucketURL string `mapstructure:"bucketURL"` + AccessKeyID string `mapstructure:"accessKeyID"` + AccessKeySecret string `mapstructure:"accessKeySecret"` + SessionToken string `mapstructure:"sessionToken"` + PublicRead bool `mapstructure:"publicRead"` +} + type User struct { RPC struct { RegisterIP string `mapstructure:"registerIP"` @@ -528,6 +531,18 @@ func (o *Oss) Build() *oss.Config { } } +func (o *Kodo) Build() *kodo.Config { + return &kodo.Config{ + Endpoint: o.Endpoint, + Bucket: o.Bucket, + BucketURL: o.BucketURL, + AccessKeyID: o.AccessKeyID, + AccessKeySecret: o.AccessKeySecret, + SessionToken: o.SessionToken, + PublicRead: o.PublicRead, + } +} + func (l *CacheConfig) Failed() time.Duration { return time.Second * time.Duration(l.FailedExpire) } diff --git a/pkg/common/storage/controller/auth.go b/pkg/common/storage/controller/auth.go index fbfe30836..b725513d9 100644 --- a/pkg/common/storage/controller/auth.go +++ b/pkg/common/storage/controller/auth.go @@ -55,7 +55,6 @@ func (a *authDatabase) SetTokenMapByUidPid(ctx context.Context, userID string, p // Create Token. func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformID int) (string, error) { - isCreate := true // flag is create or update tokens, err := a.cache.GetTokensWithoutError(ctx, userID, platformID) if err != nil { return "", err @@ -66,9 +65,6 @@ func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformI if err != nil || v != constant.NormalToken { deleteTokenKey = append(deleteTokenKey, k) } - if v == constant.NormalToken { - isCreate = false - } } if len(deleteTokenKey) != 0 { err = a.cache.DeleteTokenByUidPid(ctx, userID, platformID, deleteTokenKey) @@ -84,16 +80,8 @@ func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformI return "", errs.WrapMsg(err, "token.SignedString") } - if isCreate { - // should create,should specify expiration time - if err = a.cache.SetTokenFlagEx(ctx, userID, platformID, tokenString, constant.NormalToken); err != nil { - return "", err - } - } else { - // should update - if err = a.cache.SetTokenFlag(ctx, userID, platformID, tokenString, constant.NormalToken); err != nil { - return "", err - } + if err = a.cache.SetTokenFlagEx(ctx, userID, platformID, tokenString, constant.NormalToken); err != nil { + return "", err } return tokenString, nil }