Merge remote-tracking branch 'origin/v3.8-js-sdk-only'

# Conflicts:
#	go.mod
#	go.sum
pull/2856/head
withchao 10 months ago
commit 789a215d37

@ -23,4 +23,5 @@ longConnSvr:
# WebSocket connection handshake timeout in seconds # WebSocket connection handshake timeout in seconds
websocketTimeout: 10 websocketTimeout: 10
# 1: For Android, iOS, Windows, Mac, and web platforms, only one instance can be online at a time
multiLoginPolicy: 1

@ -10,7 +10,7 @@ prometheus:
# Enable or disable Prometheus monitoring # Enable or disable Prometheus monitoring
enable: true enable: true
# List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup
ports: [ 12170, 12171, 12172, 12173, 12174, 12175, 12176, 12177, 12178, 12179, 12180, 12182, 12183, 12184, 12185, 12186 ] ports: [ 12170, 12171, 12172, 12173, 12174, 12175, 12176, 12177, 12178, 12179, 12180, 12181, 12182, 12183, 12184, 12185 ]
maxConcurrentWorkers: 3 maxConcurrentWorkers: 3
#Use geTui for offline push notifications, or choose fcm or jpns; corresponding configuration settings must be specified. #Use geTui for offline push notifications, or choose fcm or jpns; corresponding configuration settings must be specified.

@ -46,7 +46,7 @@ scrape_configs:
- job_name: openimserver-openim-push - job_name: openimserver-openim-push
static_configs: static_configs:
- targets: [ internal_ip:12170, internal_ip:12171, internal_ip:12172, internal_ip:12173, internal_ip:12174, internal_ip:12175, internal_ip:12176, internal_ip:12177 ] - targets: [ internal_ip:12170, internal_ip:12171, internal_ip:12172, internal_ip:12173, internal_ip:12174, internal_ip:12175, internal_ip:12176, internal_ip:12177 ]
# - targets: [ internal_ip:12170, internal_ip:12171, internal_ip:12172, internal_ip:12173, internal_ip:12174, internal_ip:12175, internal_ip:12176, internal_ip:12177, internal_ip:12178, internal_ip:12179, internal_ip:12180, internal_ip:12182, internal_ip:12183, internal_ip:12184, internal_ip:12185, internal_ip:12186 ] # - targets: [ internal_ip:12170, internal_ip:12171, internal_ip:12172, internal_ip:12173, internal_ip:12174, internal_ip:12175, internal_ip:12176, internal_ip:12177, internal_ip:12178, internal_ip:12179, internal_ip:12180, internal_ip:12181, internal_ip:12182, internal_ip:12183, internal_ip:12184, internal_ip:12185 ]
labels: labels:
namespace: default namespace: default
- job_name: openimserver-openim-rpc-auth - job_name: openimserver-openim-rpc-auth

@ -16,6 +16,7 @@ package msggateway
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"runtime/debug" "runtime/debug"
"sync" "sync"
@ -69,6 +70,8 @@ type Client struct {
IsCompress bool `json:"isCompress"` IsCompress bool `json:"isCompress"`
UserID string `json:"userID"` UserID string `json:"userID"`
IsBackground bool `json:"isBackground"` IsBackground bool `json:"isBackground"`
SDKType string `json:"sdkType"`
Encoder Encoder
ctx *UserConnContext ctx *UserConnContext
longConnServer LongConnServer longConnServer LongConnServer
closed atomic.Bool closed atomic.Bool
@ -81,7 +84,7 @@ type Client struct {
} }
// ResetClient updates the client's state with new connection and context information. // ResetClient updates the client's state with new connection and context information.
func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, longConnServer LongConnServer) { func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, longConnServer LongConnServer, sdkType string) {
c.w = new(sync.Mutex) c.w = new(sync.Mutex)
c.conn = conn c.conn = conn
c.PlatformID = stringutil.StringToInt(ctx.GetPlatformID()) c.PlatformID = stringutil.StringToInt(ctx.GetPlatformID())
@ -94,11 +97,20 @@ func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, longConnServer
c.closed.Store(false) c.closed.Store(false)
c.closedErr = nil c.closedErr = nil
c.token = ctx.GetToken() c.token = ctx.GetToken()
c.SDKType = sdkType
c.hbCtx, c.hbCancel = context.WithCancel(c.ctx) c.hbCtx, c.hbCancel = context.WithCancel(c.ctx)
c.subLock = new(sync.Mutex) c.subLock = new(sync.Mutex)
if c.subUserIDs != nil { if c.subUserIDs != nil {
clear(c.subUserIDs) clear(c.subUserIDs)
} }
if c.SDKType == "" {
c.SDKType = GoSDK
}
if c.SDKType == GoSDK {
c.Encoder = NewGobEncoder()
} else {
c.Encoder = NewJsonEncoder()
}
c.subUserIDs = make(map[string]struct{}) c.subUserIDs = make(map[string]struct{})
} }
@ -159,9 +171,12 @@ func (c *Client) readMessage() {
return return
} }
case MessageText: case MessageText:
c.closedErr = ErrNotSupportMessageProtocol _ = c.conn.SetReadDeadline(pongWait)
return parseDataErr := c.handlerTextMessage(message)
if parseDataErr != nil {
c.closedErr = parseDataErr
return
}
case PingMessage: case PingMessage:
err := c.writePongMsg("") err := c.writePongMsg("")
log.ZError(c.ctx, "writePongMsg", err) log.ZError(c.ctx, "writePongMsg", err)
@ -188,7 +203,7 @@ func (c *Client) handleMessage(message []byte) error {
var binaryReq = getReq() var binaryReq = getReq()
defer freeReq(binaryReq) defer freeReq(binaryReq)
err := c.longConnServer.Decode(message, binaryReq) err := c.Encoder.Decode(message, binaryReq)
if err != nil { if err != nil {
return err return err
} }
@ -335,7 +350,7 @@ func (c *Client) writeBinaryMsg(resp Resp) error {
return nil return nil
} }
encodedBuf, err := c.longConnServer.Encode(resp) encodedBuf, err := c.Encoder.Encode(resp)
if err != nil { if err != nil {
return err return err
} }
@ -419,3 +434,26 @@ func (c *Client) writePongMsg(appData string) error {
return errs.Wrap(err) return errs.Wrap(err)
} }
func (c *Client) handlerTextMessage(b []byte) error {
var msg TextMessage
if err := json.Unmarshal(b, &msg); err != nil {
return err
}
switch msg.Type {
case TextPong:
return nil
case TextPing:
msg.Type = TextPong
msgData, err := json.Marshal(msg)
if err != nil {
return err
}
if err := c.conn.SetWriteDeadline(writeWait); err != nil {
return err
}
return c.conn.WriteMessage(MessageText, msgData)
default:
return fmt.Errorf("not support message type %s", msg.Type)
}
}

@ -27,6 +27,12 @@ const (
GzipCompressionProtocol = "gzip" GzipCompressionProtocol = "gzip"
BackgroundStatus = "isBackground" BackgroundStatus = "isBackground"
SendResponse = "isMsgResp" SendResponse = "isMsgResp"
SDKType = "sdkType"
)
const (
GoSDK = "go"
JsSDK = "js"
) )
const ( const (

@ -193,7 +193,11 @@ func (c *UserConnContext) ParseEssentialArgs() error {
_, err := strconv.Atoi(platformIDStr) _, err := strconv.Atoi(platformIDStr)
if err != nil { if err != nil {
return servererrs.ErrConnArgsErr.WrapMsg("platformID is not int") return servererrs.ErrConnArgsErr.WrapMsg("platformID is not int")
}
switch sdkType, _ := c.Query(SDKType); sdkType {
case "", GoSDK, JsSDK:
default:
return servererrs.ErrConnArgsErr.WrapMsg("sdkType is not go or js")
} }
return nil return nil
} }

@ -17,6 +17,7 @@ package msggateway
import ( import (
"bytes" "bytes"
"encoding/gob" "encoding/gob"
"encoding/json"
"github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/errs"
) )
@ -28,12 +29,12 @@ type Encoder interface {
type GobEncoder struct{} type GobEncoder struct{}
func NewGobEncoder() *GobEncoder { func NewGobEncoder() Encoder {
return &GobEncoder{} return GobEncoder{}
} }
func (g *GobEncoder) Encode(data any) ([]byte, error) { func (g GobEncoder) Encode(data any) ([]byte, error) {
buff := bytes.Buffer{} var buff bytes.Buffer
enc := gob.NewEncoder(&buff) enc := gob.NewEncoder(&buff)
if err := enc.Encode(data); err != nil { if err := enc.Encode(data); err != nil {
return nil, errs.WrapMsg(err, "GobEncoder.Encode failed", "action", "encode") return nil, errs.WrapMsg(err, "GobEncoder.Encode failed", "action", "encode")
@ -41,7 +42,7 @@ func (g *GobEncoder) Encode(data any) ([]byte, error) {
return buff.Bytes(), nil return buff.Bytes(), nil
} }
func (g *GobEncoder) Decode(encodeData []byte, decodeData any) error { func (g GobEncoder) Decode(encodeData []byte, decodeData any) error {
buff := bytes.NewBuffer(encodeData) buff := bytes.NewBuffer(encodeData)
dec := gob.NewDecoder(buff) dec := gob.NewDecoder(buff)
if err := dec.Decode(decodeData); err != nil { if err := dec.Decode(decodeData); err != nil {
@ -49,3 +50,25 @@ func (g *GobEncoder) Decode(encodeData []byte, decodeData any) error {
} }
return nil return nil
} }
type JsonEncoder struct{}
func NewJsonEncoder() Encoder {
return JsonEncoder{}
}
func (g JsonEncoder) Encode(data any) ([]byte, error) {
b, err := json.Marshal(data)
if err != nil {
return nil, errs.New("JsonEncoder.Encode failed", "action", "encode")
}
return b, nil
}
func (g JsonEncoder) Decode(encodeData []byte, decodeData any) error {
err := json.Unmarshal(encodeData, decodeData)
if err != nil {
return errs.New("JsonEncoder.Decode failed", "action", "decode")
}
return nil
}

@ -0,0 +1,48 @@
package msggateway
import (
"testing"
)
func TestGobEncoder_Encode(t *testing.T) {
encoder := NewGobEncoder()
// 测试用例1: 编码 []byte 数据
inputData := []byte("test data")
encodedData, err := encoder.Encode(inputData)
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
if string(encodedData) != string(inputData) {
t.Fatalf("expected encoded data to be '%s', got '%s'", inputData, encodedData)
}
// 测试用例2: 编码非 []byte 数据
nonByteData := "string data"
_, err = encoder.Encode(nonByteData)
if err == nil {
t.Fatalf("expected an error when encoding non-byte data, got none")
}
}
func TestGobEncoder_Decode(t *testing.T) {
encoder := NewGobEncoder()
// 测试用例1: 解码到 []byte 数据
encodedData := []byte("test data")
var decodedData []byte
err := encoder.Decode(encodedData, &decodedData)
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
if string(decodedData) != string(encodedData) {
t.Fatalf("expected decoded data to be '%s', got '%s'", encodedData, decodedData)
}
// 测试用例2: 解码到非 []byte 数据
var nonByteData string
err = encoder.Decode(encodedData, &nonByteData)
if err == nil {
t.Fatalf("expected an error when decoding to non-byte data, got none")
}
}

@ -83,17 +83,11 @@ func NewServer(rpcPort int, longConnServer LongConnServer, conf *Config, ready f
return s return s
} }
func (s *Server) OnlinePushMsg( func (s *Server) OnlinePushMsg(context context.Context, req *msggateway.OnlinePushMsgReq) (*msggateway.OnlinePushMsgResp, error) {
context context.Context,
req *msggateway.OnlinePushMsgReq,
) (*msggateway.OnlinePushMsgResp, error) {
panic("implement me") panic("implement me")
} }
func (s *Server) GetUsersOnlineStatus( func (s *Server) GetUsersOnlineStatus(ctx context.Context, req *msggateway.GetUsersOnlineStatusReq) (*msggateway.GetUsersOnlineStatusResp, error) {
ctx context.Context,
req *msggateway.GetUsersOnlineStatusReq,
) (*msggateway.GetUsersOnlineStatusResp, error) {
if !authverify.IsAppManagerUid(ctx, s.config.Share.IMAdminUserID) { if !authverify.IsAppManagerUid(ctx, s.config.Share.IMAdminUserID) {
return nil, errs.ErrNoPermission.WrapMsg("only app manager") return nil, errs.ErrNoPermission.WrapMsg("only app manager")
} }
@ -155,6 +149,7 @@ func (s *Server) pushToUser(ctx context.Context, userID string, msgData *sdkws.M
(client.IsBackground && client.PlatformID != constant.IOSPlatformID) { (client.IsBackground && client.PlatformID != constant.IOSPlatformID) {
err := client.PushMessage(ctx, msgData) err := client.PushMessage(ctx, msgData)
if err != nil { if err != nil {
log.ZWarn(ctx, "online push msg failed", err, "userID", userID, "platformID", client.PlatformID)
userPlatform.ResultCode = int64(servererrs.ErrPushMsgErr.Code()) userPlatform.ResultCode = int64(servererrs.ErrPushMsgErr.Code())
} else { } else {
if _, ok := s.pushTerminal[client.PlatformID]; ok { if _, ok := s.pushTerminal[client.PlatformID]; ok {
@ -220,10 +215,7 @@ func (s *Server) SuperGroupOnlineBatchPushOneMsg(ctx context.Context, req *msgga
} }
} }
func (s *Server) KickUserOffline( func (s *Server) KickUserOffline(ctx context.Context, req *msggateway.KickUserOfflineReq) (*msggateway.KickUserOfflineResp, error) {
ctx context.Context,
req *msggateway.KickUserOfflineReq,
) (*msggateway.KickUserOfflineResp, error) {
for _, v := range req.KickUserIDList { for _, v := range req.KickUserIDList {
clients, _, ok := s.LongConnServer.GetUserPlatformCons(v, int(req.PlatformID)) clients, _, ok := s.LongConnServer.GetUserPlatformCons(v, int(req.PlatformID))
if !ok { if !ok {

@ -16,6 +16,7 @@ package msggateway
import ( import (
"context" "context"
"encoding/json"
"sync" "sync"
"github.com/go-playground/validator/v10" "github.com/go-playground/validator/v10"
@ -31,6 +32,16 @@ import (
"github.com/openimsdk/tools/utils/jsonutil" "github.com/openimsdk/tools/utils/jsonutil"
) )
const (
TextPing = "ping"
TextPong = "pong"
)
type TextMessage struct {
Type string `json:"type"`
Body json.RawMessage `json:"body"`
}
type Req struct { type Req struct {
ReqIdentifier int32 `json:"reqIdentifier" validate:"required"` ReqIdentifier int32 `json:"reqIdentifier" validate:"required"`
Token string `json:"token"` Token string `json:"token"`

@ -37,7 +37,6 @@ type LongConnServer interface {
SetKickHandlerInfo(i *kickHandler) SetKickHandlerInfo(i *kickHandler)
SubUserOnlineStatus(ctx context.Context, client *Client, data *Req) ([]byte, error) SubUserOnlineStatus(ctx context.Context, client *Client, data *Req) ([]byte, error)
Compressor Compressor
Encoder
MessageHandler MessageHandler
} }
@ -61,7 +60,7 @@ type WsServer struct {
authClient *rpcclient.Auth authClient *rpcclient.Auth
disCov discovery.SvcDiscoveryRegistry disCov discovery.SvcDiscoveryRegistry
Compressor Compressor
Encoder //Encoder
MessageHandler MessageHandler
webhookClient *webhook.Client webhookClient *webhook.Client
} }
@ -135,7 +134,6 @@ func NewWsServer(msgGatewayConfig *Config, opts ...Option) *WsServer {
clients: newUserMap(), clients: newUserMap(),
subscription: newSubscription(), subscription: newSubscription(),
Compressor: NewGzipCompressor(), Compressor: NewGzipCompressor(),
Encoder: NewGobEncoder(),
webhookClient: webhook.NewWebhookClient(msgGatewayConfig.WebhooksConfig.URL), webhookClient: webhook.NewWebhookClient(msgGatewayConfig.WebhooksConfig.URL),
} }
} }
@ -278,14 +276,7 @@ func (ws *WsServer) registerClient(client *Client) {
wg.Wait() wg.Wait()
log.ZDebug( log.ZDebug(client.ctx, "user online", "online user Num", ws.onlineUserNum.Load(), "online user conn Num", ws.onlineUserConnNum.Load())
client.ctx,
"user online",
"online user Num",
ws.onlineUserNum.Load(),
"online user conn Num",
ws.onlineUserConnNum.Load(),
)
} }
func getRemoteAdders(client []*Client) string { func getRemoteAdders(client []*Client) string {
@ -464,7 +455,8 @@ func (ws *WsServer) wsHandler(w http.ResponseWriter, r *http.Request) {
// Retrieve a client object from the client pool, reset its state, and associate it with the current WebSocket long connection // Retrieve a client object from the client pool, reset its state, and associate it with the current WebSocket long connection
client := ws.clientPool.Get().(*Client) client := ws.clientPool.Get().(*Client)
client.ResetClient(connContext, wsLongConn, ws) sdkType, _ := connContext.Query(SDKType)
client.ResetClient(connContext, wsLongConn, ws, sdkType)
// Register the client with the server and start message processing // Register the client with the server and start message processing
ws.registerChan <- client ws.registerChan <- client

@ -2,15 +2,14 @@ package controller
import ( import (
"context" "context"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/tools/log"
"github.com/golang-jwt/jwt/v4" "github.com/golang-jwt/jwt/v4"
"github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/tokenverify" "github.com/openimsdk/tools/tokenverify"
) )

Loading…
Cancel
Save