Merge remote-tracking branch 'origin/main' into convincr

pull/2408/head
withchao 1 year ago
commit 1e5e87b7e3

@ -1,7 +1,7 @@
# Log storage path, default is acceptable, change to a full path if modification is needed
storageLocation: ../../../../logs/
# Log rotation period (in hours), default is acceptable
rotationTime: 1
rotationTime: 24
# Maximum size of each log file (in MB), default is acceptable, it means unlimited
maxSize: 0
# Number of log files to retain, default is acceptable, it means whenever the log file is rotated, the old log file will be deleted

@ -29,4 +29,12 @@ object:
accessKeyID: ''
accessKeySecret: ''
sessionToken: ''
publicRead: false
kodo:
endpoint: "http://s3.cn-south-1.qiniucs.com"
bucket: "kodo-bucket-test"
bucketURL: "http://kodo-bucket-test-oetobfb.qiniudns.com"
accessKeyID: ''
accessKeySecret: ''
sessionToken: ''
publicRead: false

@ -53,6 +53,24 @@ require (
cloud.google.com/go/longrunning v0.5.4 // indirect
cloud.google.com/go/storage v1.36.0 // indirect
github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible // indirect
github.com/aws/aws-sdk-go-v2 v1.23.1 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.5.1 // indirect
github.com/aws/aws-sdk-go-v2/config v1.25.4 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.16.3 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.5 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.4 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.4 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.7.1 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.2.4 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.10.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.2.4 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.10.4 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.16.4 // indirect
github.com/aws/aws-sdk-go-v2/service/s3 v1.43.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.17.3 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.20.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.25.4 // indirect
github.com/aws/smithy-go v1.17.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bytedance/sonic v1.9.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
@ -118,6 +136,7 @@ require (
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.45.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/qiniu/go-sdk/v7 v7.18.2 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/rs/xid v1.5.0 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect

@ -20,6 +20,7 @@ import (
"runtime/debug"
"sync"
"sync/atomic"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/protocol/constant"
@ -72,6 +73,8 @@ type Client struct {
closed atomic.Bool
closedErr error
token string
hbCtx context.Context
hbCancel context.CancelFunc
}
// ResetClient updates the client's state with new connection and context information.
@ -88,6 +91,7 @@ func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, longConnServer
c.closed.Store(false)
c.closedErr = nil
c.token = ctx.GetToken()
c.hbCtx, c.hbCancel = context.WithCancel(c.ctx)
}
func (c *Client) pingHandler(_ string) error {
@ -98,6 +102,13 @@ func (c *Client) pingHandler(_ string) error {
return c.writePongMsg()
}
func (c *Client) pongHandler(_ string) error {
if err := c.conn.SetReadDeadline(pongWait); err != nil {
return err
}
return nil
}
// readMessage continuously reads messages from the connection.
func (c *Client) readMessage() {
defer func() {
@ -110,7 +121,9 @@ func (c *Client) readMessage() {
c.conn.SetReadLimit(maxMessageSize)
_ = c.conn.SetReadDeadline(pongWait)
c.conn.SetPongHandler(c.pongHandler)
c.conn.SetPingHandler(c.pingHandler)
c.activeHeartbeat(c.hbCtx)
for {
log.ZDebug(c.ctx, "readMessage")
@ -147,6 +160,7 @@ func (c *Client) readMessage() {
case CloseMessage:
c.closedErr = ErrClientClosed
return
default:
}
}
@ -235,6 +249,7 @@ func (c *Client) close() {
c.closed.Store(true)
c.conn.Close()
c.hbCancel() // Close server-initiated heartbeat.
c.longConnServer.UnRegister(c)
}
@ -321,6 +336,44 @@ func (c *Client) writeBinaryMsg(resp Resp) error {
return c.conn.WriteMessage(MessageBinary, encodedBuf)
}
// Actively initiate Heartbeat when platform in Web.
func (c *Client) activeHeartbeat(ctx context.Context) {
if c.PlatformID == constant.WebPlatformID {
go func() {
log.ZDebug(ctx, "server initiative send heartbeat start.")
ticker := time.NewTicker(pingPeriod)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := c.writePingMsg(); err != nil {
log.ZWarn(c.ctx, "send Ping Message error.", err)
return
}
case <-c.hbCtx.Done():
return
}
}
}()
}
}
func (c *Client) writePingMsg() error {
if c.closed.Load() {
return nil
}
c.w.Lock()
defer c.w.Unlock()
err := c.conn.SetWriteDeadline(writeWait)
if err != nil {
return err
}
return c.conn.WriteMessage(PingMessage, nil)
}
func (c *Client) writePongMsg() error {
if c.closed.Load() {
return nil

@ -53,6 +53,9 @@ const (
// Time allowed to read the next pong message from the peer.
pongWait = 30 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
// Maximum message size allowed from peer.
maxMessageSize = 51200
)

@ -16,10 +16,11 @@ package msggateway
import (
"encoding/json"
"github.com/openimsdk/tools/apiresp"
"net/http"
"time"
"github.com/openimsdk/tools/apiresp"
"github.com/gorilla/websocket"
"github.com/openimsdk/tools/errs"
)

@ -33,6 +33,7 @@ import (
"github.com/openimsdk/tools/s3/cos"
"github.com/openimsdk/tools/s3/minio"
"github.com/openimsdk/tools/s3/oss"
"github.com/openimsdk/tools/s3/kodo"
"google.golang.org/grpc"
)
@ -82,6 +83,8 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
o, err = cos.NewCos(*config.RpcConfig.Object.Cos.Build())
case "oss":
o, err = oss.NewOSS(*config.RpcConfig.Object.Oss.Build())
case "kodo":
o, err = kodo.NewKodo(*config.RpcConfig.Object.Kodo.Build())
default:
err = fmt.Errorf("invalid object enable: %s", enable)
}

@ -21,6 +21,7 @@ import (
"github.com/openimsdk/tools/s3/cos"
"github.com/openimsdk/tools/s3/minio"
"github.com/openimsdk/tools/s3/oss"
"github.com/openimsdk/tools/s3/kodo"
"strings"
"time"
)
@ -281,15 +282,7 @@ type Third struct {
Enable string `mapstructure:"enable"`
Cos Cos `mapstructure:"cos"`
Oss Oss `mapstructure:"oss"`
Kodo struct {
Endpoint string `mapstructure:"endpoint"`
Bucket string `mapstructure:"bucket"`
BucketURL string `mapstructure:"bucketURL"`
AccessKeyID string `mapstructure:"accessKeyID"`
AccessKeySecret string `mapstructure:"accessKeySecret"`
SessionToken string `mapstructure:"sessionToken"`
PublicRead bool `mapstructure:"publicRead"`
} `mapstructure:"kodo"`
Kodo Kodo `mapstructure:"kodo"`
Aws struct {
Endpoint string `mapstructure:"endpoint"`
Region string `mapstructure:"region"`
@ -317,6 +310,16 @@ type Oss struct {
PublicRead bool `mapstructure:"publicRead"`
}
type Kodo struct {
Endpoint string `mapstructure:"endpoint"`
Bucket string `mapstructure:"bucket"`
BucketURL string `mapstructure:"bucketURL"`
AccessKeyID string `mapstructure:"accessKeyID"`
AccessKeySecret string `mapstructure:"accessKeySecret"`
SessionToken string `mapstructure:"sessionToken"`
PublicRead bool `mapstructure:"publicRead"`
}
type User struct {
RPC struct {
RegisterIP string `mapstructure:"registerIP"`
@ -528,6 +531,18 @@ func (o *Oss) Build() *oss.Config {
}
}
func (o *Kodo) Build() *kodo.Config {
return &kodo.Config{
Endpoint: o.Endpoint,
Bucket: o.Bucket,
BucketURL: o.BucketURL,
AccessKeyID: o.AccessKeyID,
AccessKeySecret: o.AccessKeySecret,
SessionToken: o.SessionToken,
PublicRead: o.PublicRead,
}
}
func (l *CacheConfig) Failed() time.Duration {
return time.Second * time.Duration(l.FailedExpire)
}

@ -55,7 +55,6 @@ func (a *authDatabase) SetTokenMapByUidPid(ctx context.Context, userID string, p
// Create Token.
func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformID int) (string, error) {
isCreate := true // flag is create or update
tokens, err := a.cache.GetTokensWithoutError(ctx, userID, platformID)
if err != nil {
return "", err
@ -66,9 +65,6 @@ func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformI
if err != nil || v != constant.NormalToken {
deleteTokenKey = append(deleteTokenKey, k)
}
if v == constant.NormalToken {
isCreate = false
}
}
if len(deleteTokenKey) != 0 {
err = a.cache.DeleteTokenByUidPid(ctx, userID, platformID, deleteTokenKey)
@ -84,16 +80,8 @@ func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformI
return "", errs.WrapMsg(err, "token.SignedString")
}
if isCreate {
// should create,should specify expiration time
if err = a.cache.SetTokenFlagEx(ctx, userID, platformID, tokenString, constant.NormalToken); err != nil {
return "", err
}
} else {
// should update
if err = a.cache.SetTokenFlag(ctx, userID, platformID, tokenString, constant.NormalToken); err != nil {
return "", err
}
if err = a.cache.SetTokenFlagEx(ctx, userID, platformID, tokenString, constant.NormalToken); err != nil {
return "", err
}
return tokenString, nil
}

Loading…
Cancel
Save