diff --git a/.env b/.env index 0c02bf9f6..2b903c949 100644 --- a/.env +++ b/.env @@ -1,6 +1,5 @@ -MONGO_IMAGE=mongo:6.0.2 +MONGO_IMAGE=mongo:7.0 REDIS_IMAGE=redis:7.0.0 -ZOOKEEPER_IMAGE=bitnami/zookeeper:3.8 KAFKA_IMAGE=bitnami/kafka:3.5.1 MINIO_IMAGE=minio/minio:RELEASE.2024-01-11T07-46-16Z ETCD_IMAGE=quay.io/coreos/etcd:v3.5.13 @@ -16,4 +15,3 @@ OPENIM_ADMIN_FRONT_IMAGE=openim/openim-admin-front:release-v1.8.2 #OPENIM_ADMIN_FRONT_IMAGE=registry.cn-hangzhou.aliyuncs.com/openimsdk/openim-admin-front:release-v1.8.2 DATA_DIR=./ - diff --git a/Dockerfile b/Dockerfile index 1c3de867b..3115ffe77 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -# Use Go 1.21 Alpine as the base image for building the application +# Use Go 1.22 Alpine as the base image for building the application FROM golang:1.22-alpine AS builder # Define the base directory for the application as an environment variable diff --git a/config/mongodb.yml b/config/mongodb.yml index 78f85992c..072cb4b8f 100644 --- a/config/mongodb.yml +++ b/config/mongodb.yml @@ -8,6 +8,8 @@ database: openim_v3 username: openIM # Password for database authentication password: openIM123 +# Authentication source for database authentication, if use root user, set it to admin +authSource: openim_v3 # Maximum number of connections in the connection pool maxPoolSize: 100 # Maximum number of retry attempts for a failed database connection diff --git a/config/openim-msggateway.yml b/config/openim-msggateway.yml index 5659c6f9b..6c46b52a8 100644 --- a/config/openim-msggateway.yml +++ b/config/openim-msggateway.yml @@ -22,5 +22,3 @@ longConnSvr: websocketMaxMsgLen: 4096 # WebSocket connection handshake timeout in seconds websocketTimeout: 10 - - diff --git a/config/openim-push.yml b/config/openim-push.yml index 4d2aaca6b..70e67add2 100644 --- a/config/openim-push.yml +++ b/config/openim-push.yml @@ -13,29 +13,29 @@ prometheus: ports: [ 12170, 12171, 12172, 12173, 12174, 12175, 12176, 12177, 12178, 12179, 12180, 12182, 12183, 12184, 12185, 12186 ] maxConcurrentWorkers: 3 -#Use geTui for offline push notifications, or choose fcm or jpns; corresponding configuration settings must be specified. +#Use geTui for offline push notifications, or choose fcm or jpush; corresponding configuration settings must be specified. enable: geTui geTui: pushUrl: https://restapi.getui.com/v2/$appId - masterSecret: - appKey: - intent: - channelID: - channelName: + masterSecret: + appKey: + intent: + channelID: + channelName: fcm: # Prioritize using file paths. If the file path is empty, use URL - filePath: # File path is concatenated with the parameters passed in through - c(`mage` default pass in `config/`) and filePath. + filePath: # File path is concatenated with the parameters passed in through - c(`mage` default pass in `config/`) and filePath. authURL: # Must start with https or http. -jpns: - appKey: - masterSecret: - pushURL: - pushIntent: +jpush: + appKey: + masterSecret: + pushURL: + pushIntent: # iOS system push sound and badge count iosPush: - pushSound: xxx - badgeCount: true - production: false + pushSound: xxx + badgeCount: true + production: false fullUserCache: true diff --git a/deployments/templates/config.yaml b/deployments/templates/config.yaml index fee0bf90a..c108de5e8 100644 --- a/deployments/templates/config.yaml +++ b/deployments/templates/config.yaml @@ -240,11 +240,11 @@ push: channelName: ${GETUI_CHANNEL_NAME} fcm: serviceAccount: "${FCM_SERVICE_ACCOUNT}" - jpns: - appKey: ${JPNS_APP_KEY} - masterSecret: ${JPNS_MASTER_SECRET} - pushUrl: ${JPNS_PUSH_URL} - pushIntent: ${JPNS_PUSH_INTENT} + jpush: + appKey: ${JPUSH_APP_KEY} + masterSecret: ${JPUSH_MASTER_SECRET} + pushUrl: ${JPUSH_PUSH_URL} + pushIntent: ${JPUSH_PUSH_INTENT} # App manager configuration # diff --git a/docker-compose.yml b/docker-compose.yml index 6d88bac10..8d25383bc 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -8,12 +8,35 @@ services: ports: - "37017:27017" container_name: mongo - command: ["/bin/bash", "-c", "/docker-entrypoint-initdb.d/mongo-init.sh; docker-entrypoint.sh mongod --wiredTigerCacheSizeGB 1 --auth"] + command: > + bash -c ' + docker-entrypoint.sh mongod --wiredTigerCacheSizeGB $$wiredTigerCacheSizeGB --auth & + until mongosh -u $$MONGO_INITDB_ROOT_USERNAME -p $$MONGO_INITDB_ROOT_PASSWORD --authenticationDatabase admin --eval "db.runCommand({ ping: 1 })" &>/dev/null; do + echo "Waiting for MongoDB to start..." + sleep 1 + done && + mongosh -u $$MONGO_INITDB_ROOT_USERNAME -p $$MONGO_INITDB_ROOT_PASSWORD --authenticationDatabase admin --eval " + db = db.getSiblingDB(\"$$MONGO_INITDB_DATABASE\"); + if (!db.getUser(\"$$MONGO_OPENIM_USERNAME\")) { + db.createUser({ + user: \"$$MONGO_OPENIM_USERNAME\", + pwd: \"$$MONGO_OPENIM_PASSWORD\", + roles: [{role: \"readWrite\", db: \"$$MONGO_INITDB_DATABASE\"}] + }); + print(\"User created successfully: \"); + print(\"Username: $$MONGO_OPENIM_USERNAME\"); + print(\"Password: $$MONGO_OPENIM_PASSWORD\"); + print(\"Database: $$MONGO_INITDB_DATABASE\"); + } else { + print(\"User already exists in database: $$MONGO_INITDB_DATABASE, Username: $$MONGO_OPENIM_USERNAME\"); + } + " && + tail -f /dev/null + ' volumes: - "${DATA_DIR}/components/mongodb/data/db:/data/db" - "${DATA_DIR}/components/mongodb/data/logs:/data/logs" - "${DATA_DIR}/components/mongodb/data/conf:/etc/mongo" - - "./scripts/mongo-init.sh:/docker-entrypoint-initdb.d/mongo-init.sh:ro" environment: - TZ=Asia/Shanghai - wiredTigerCacheSizeGB=1 @@ -71,10 +94,7 @@ services: ports: - "19094:9094" volumes: - - ./scripts/create-topic.sh:/opt/bitnami/kafka/create-topic.sh - "${DATA_DIR}/components/kafka:/bitnami/kafka" - command: > - bash -c "/opt/bitnami/scripts/kafka/run.sh & /opt/bitnami/kafka/create-topic.sh; wait" environment: #KAFKA_HEAP_OPTS: "-Xms128m -Xmx256m" TZ: Asia/Shanghai @@ -85,10 +105,11 @@ services: KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,EXTERNAL://localhost:19094 KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_NUM_PARTITIONS: 8 + KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true" networks: - openim - minio: image: "${MINIO_IMAGE}" ports: @@ -124,7 +145,7 @@ services: - "11002:80" networks: - openim - + # prometheus: # image: ${PROMETHEUS_IMAGE} # container_name: prometheus @@ -171,4 +192,3 @@ services: # - ${DATA_DIR:-./}/components/grafana:/var/lib/grafana # networks: # - openim - diff --git a/docs/contrib/environment.md b/docs/contrib/environment.md index d2db7cbf3..0b10abc96 100644 --- a/docs/contrib/environment.md +++ b/docs/contrib/environment.md @@ -474,10 +474,10 @@ This section involves setting up additional configuration variables for Websocke | GETUI_CHANNEL_ID | [User Defined] | GeTui Channel ID | | GETUI_CHANNEL_NAME | [User Defined] | GeTui Channel Name | | FCM_SERVICE_ACCOUNT | "x.json" | FCM Service Account | -| JPNS_APP_KEY | [User Defined] | JPNS Application Key | -| JPNS_MASTER_SECRET | [User Defined] | JPNS Master Secret | -| JPNS_PUSH_URL | [User Defined] | JPNS Push Notification URL | -| JPNS_PUSH_INTENT | [User Defined] | JPNS Push Intent | +| JPUSH_APP_KEY | [User Defined] | JPUSH Application Key | +| JPUSH_MASTER_SECRET | [User Defined] | JPUSH Master Secret | +| JPUSH_PUSH_URL | [User Defined] | JPUSH Push Notification URL | +| JPUSH_PUSH_INTENT | [User Defined] | JPUSH Push Intent | | IM_ADMIN_USERID | "imAdmin" | IM Administrator ID | | IM_ADMIN_NAME | "imAdmin" | IM Administrator Nickname | | MULTILOGIN_POLICY | "1" | Multi-login Policy | diff --git a/go.mod b/go.mod index 5ba69f6cb..9a3b44d35 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/protocol v0.0.72-alpha.54 - github.com/openimsdk/tools v0.0.50-alpha.23 + github.com/openimsdk/tools v0.0.50-alpha.32 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 @@ -37,7 +37,7 @@ require ( github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/kelindar/bitmap v1.5.2 github.com/likexian/gokit v0.25.13 - github.com/mo3et/openim-gomake v0.0.18 + github.com/openimsdk/gomake v0.0.14-alpha.5 github.com/redis/go-redis/v9 v9.4.0 github.com/robfig/cron/v3 v3.0.1 github.com/shirou/gopsutil v3.21.11+incompatible diff --git a/go.sum b/go.sum index 2ed7788e9..8d67573e4 100644 --- a/go.sum +++ b/go.sum @@ -325,8 +325,6 @@ github.com/minio/sha256-simd v1.0.1/go.mod h1:Pz6AKMiUdngCLpeTL/RJY1M9rUuPMYujV5 github.com/mitchellh/mapstructure v1.4.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= -github.com/mo3et/openim-gomake v0.0.18 h1:PlX+Sx6UqIBKhe+BlmU8fHs5BbUKqMl4rKqWGDxGQCA= -github.com/mo3et/openim-gomake v0.0.18/go.mod h1:gyCVDWC6L32//+HIf47EeB53S2JYsl3QZDFpC/m6kbE= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -347,10 +345,12 @@ github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To= github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= +github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= +github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/protocol v0.0.72-alpha.54 h1:opato7N4QjjRq/SHD54bDSVBpOEEDp1VLWVk5Os2A9s= github.com/openimsdk/protocol v0.0.72-alpha.54/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= -github.com/openimsdk/tools v0.0.50-alpha.23 h1:gGCOVDFC/wrj2ybLrSrykGlcTfB1Z7MwfiWpEnjJKyU= -github.com/openimsdk/tools v0.0.50-alpha.23/go.mod h1:r5U6RbxcR4xhKb2fhTmKGC9Yt5LcErHBVt3lhXQIHSo= +github.com/openimsdk/tools v0.0.50-alpha.32 h1:JEsUFHFnaYg230TG+Ke3SUnaA2h44t4kABAzEdv5VZw= +github.com/openimsdk/tools v0.0.50-alpha.32/go.mod h1:r5U6RbxcR4xhKb2fhTmKGC9Yt5LcErHBVt3lhXQIHSo= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index bc06fa950..af96e7d46 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -16,6 +16,7 @@ package msggateway import ( "context" + "encoding/json" "fmt" "runtime/debug" "sync" @@ -69,6 +70,8 @@ type Client struct { IsCompress bool `json:"isCompress"` UserID string `json:"userID"` IsBackground bool `json:"isBackground"` + SDKType string `json:"sdkType"` + Encoder Encoder ctx *UserConnContext longConnServer LongConnServer closed atomic.Bool @@ -94,11 +97,17 @@ func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, longConnServer c.closed.Store(false) c.closedErr = nil c.token = ctx.GetToken() + c.SDKType = ctx.GetSDKType() c.hbCtx, c.hbCancel = context.WithCancel(c.ctx) c.subLock = new(sync.Mutex) if c.subUserIDs != nil { clear(c.subUserIDs) } + if c.SDKType == GoSDK { + c.Encoder = NewGobEncoder() + } else { + c.Encoder = NewJsonEncoder() + } c.subUserIDs = make(map[string]struct{}) } @@ -159,9 +168,12 @@ func (c *Client) readMessage() { return } case MessageText: - c.closedErr = ErrNotSupportMessageProtocol - return - + _ = c.conn.SetReadDeadline(pongWait) + parseDataErr := c.handlerTextMessage(message) + if parseDataErr != nil { + c.closedErr = parseDataErr + return + } case PingMessage: err := c.writePongMsg("") log.ZError(c.ctx, "writePongMsg", err) @@ -188,7 +200,7 @@ func (c *Client) handleMessage(message []byte) error { var binaryReq = getReq() defer freeReq(binaryReq) - err := c.longConnServer.Decode(message, binaryReq) + err := c.Encoder.Decode(message, binaryReq) if err != nil { return err } @@ -335,7 +347,7 @@ func (c *Client) writeBinaryMsg(resp Resp) error { return nil } - encodedBuf, err := c.longConnServer.Encode(resp) + encodedBuf, err := c.Encoder.Encode(resp) if err != nil { return err } @@ -419,3 +431,26 @@ func (c *Client) writePongMsg(appData string) error { return errs.Wrap(err) } + +func (c *Client) handlerTextMessage(b []byte) error { + var msg TextMessage + if err := json.Unmarshal(b, &msg); err != nil { + return err + } + switch msg.Type { + case TextPong: + return nil + case TextPing: + msg.Type = TextPong + msgData, err := json.Marshal(msg) + if err != nil { + return err + } + if err := c.conn.SetWriteDeadline(writeWait); err != nil { + return err + } + return c.conn.WriteMessage(MessageText, msgData) + default: + return fmt.Errorf("not support message type %s", msg.Type) + } +} diff --git a/internal/msggateway/constant.go b/internal/msggateway/constant.go index 584cebe1e..a825c0519 100644 --- a/internal/msggateway/constant.go +++ b/internal/msggateway/constant.go @@ -27,6 +27,12 @@ const ( GzipCompressionProtocol = "gzip" BackgroundStatus = "isBackground" SendResponse = "isMsgResp" + SDKType = "sdkType" +) + +const ( + GoSDK = "go" + JsSDK = "js" ) const ( diff --git a/internal/msggateway/context.go b/internal/msggateway/context.go index 3909766b1..d73a96df4 100644 --- a/internal/msggateway/context.go +++ b/internal/msggateway/context.go @@ -153,6 +153,14 @@ func (c *UserConnContext) GetCompression() bool { return false } +func (c *UserConnContext) GetSDKType() string { + sdkType := c.Req.URL.Query().Get(SDKType) + if sdkType == "" { + sdkType = GoSDK + } + return sdkType +} + func (c *UserConnContext) ShouldSendResp() bool { errResp, exists := c.Query(SendResponse) if exists { @@ -193,7 +201,11 @@ func (c *UserConnContext) ParseEssentialArgs() error { _, err := strconv.Atoi(platformIDStr) if err != nil { return servererrs.ErrConnArgsErr.WrapMsg("platformID is not int") - + } + switch sdkType, _ := c.Query(SDKType); sdkType { + case "", GoSDK, JsSDK: + default: + return servererrs.ErrConnArgsErr.WrapMsg("sdkType is not go or js") } return nil } diff --git a/internal/msggateway/encoder.go b/internal/msggateway/encoder.go index 3af266374..6a5936d6d 100644 --- a/internal/msggateway/encoder.go +++ b/internal/msggateway/encoder.go @@ -17,6 +17,7 @@ package msggateway import ( "bytes" "encoding/gob" + "encoding/json" "github.com/openimsdk/tools/errs" ) @@ -28,12 +29,12 @@ type Encoder interface { type GobEncoder struct{} -func NewGobEncoder() *GobEncoder { - return &GobEncoder{} +func NewGobEncoder() Encoder { + return GobEncoder{} } -func (g *GobEncoder) Encode(data any) ([]byte, error) { - buff := bytes.Buffer{} +func (g GobEncoder) Encode(data any) ([]byte, error) { + var buff bytes.Buffer enc := gob.NewEncoder(&buff) if err := enc.Encode(data); err != nil { return nil, errs.WrapMsg(err, "GobEncoder.Encode failed", "action", "encode") @@ -41,7 +42,7 @@ func (g *GobEncoder) Encode(data any) ([]byte, error) { return buff.Bytes(), nil } -func (g *GobEncoder) Decode(encodeData []byte, decodeData any) error { +func (g GobEncoder) Decode(encodeData []byte, decodeData any) error { buff := bytes.NewBuffer(encodeData) dec := gob.NewDecoder(buff) if err := dec.Decode(decodeData); err != nil { @@ -49,3 +50,25 @@ func (g *GobEncoder) Decode(encodeData []byte, decodeData any) error { } return nil } + +type JsonEncoder struct{} + +func NewJsonEncoder() Encoder { + return JsonEncoder{} +} + +func (g JsonEncoder) Encode(data any) ([]byte, error) { + b, err := json.Marshal(data) + if err != nil { + return nil, errs.New("JsonEncoder.Encode failed", "action", "encode") + } + return b, nil +} + +func (g JsonEncoder) Decode(encodeData []byte, decodeData any) error { + err := json.Unmarshal(encodeData, decodeData) + if err != nil { + return errs.New("JsonEncoder.Decode failed", "action", "decode") + } + return nil +} diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go index e96ab4b0d..23d915013 100644 --- a/internal/msggateway/hub_server.go +++ b/internal/msggateway/hub_server.go @@ -83,17 +83,11 @@ func NewServer(rpcPort int, longConnServer LongConnServer, conf *Config, ready f return s } -func (s *Server) OnlinePushMsg( - context context.Context, - req *msggateway.OnlinePushMsgReq, -) (*msggateway.OnlinePushMsgResp, error) { +func (s *Server) OnlinePushMsg(context context.Context, req *msggateway.OnlinePushMsgReq) (*msggateway.OnlinePushMsgResp, error) { panic("implement me") } -func (s *Server) GetUsersOnlineStatus( - ctx context.Context, - req *msggateway.GetUsersOnlineStatusReq, -) (*msggateway.GetUsersOnlineStatusResp, error) { +func (s *Server) GetUsersOnlineStatus(ctx context.Context, req *msggateway.GetUsersOnlineStatusReq) (*msggateway.GetUsersOnlineStatusResp, error) { if !authverify.IsAppManagerUid(ctx, s.config.Share.IMAdminUserID) { return nil, errs.ErrNoPermission.WrapMsg("only app manager") } @@ -155,6 +149,7 @@ func (s *Server) pushToUser(ctx context.Context, userID string, msgData *sdkws.M (client.IsBackground && client.PlatformID != constant.IOSPlatformID) { err := client.PushMessage(ctx, msgData) if err != nil { + log.ZWarn(ctx, "online push msg failed", err, "userID", userID, "platformID", client.PlatformID) userPlatform.ResultCode = int64(servererrs.ErrPushMsgErr.Code()) } else { if _, ok := s.pushTerminal[client.PlatformID]; ok { @@ -220,10 +215,7 @@ func (s *Server) SuperGroupOnlineBatchPushOneMsg(ctx context.Context, req *msgga } } -func (s *Server) KickUserOffline( - ctx context.Context, - req *msggateway.KickUserOfflineReq, -) (*msggateway.KickUserOfflineResp, error) { +func (s *Server) KickUserOffline(ctx context.Context, req *msggateway.KickUserOfflineReq) (*msggateway.KickUserOfflineResp, error) { for _, v := range req.KickUserIDList { clients, _, ok := s.LongConnServer.GetUserPlatformCons(v, int(req.PlatformID)) if !ok { diff --git a/internal/msggateway/message_handler.go b/internal/msggateway/message_handler.go index 4b78c1004..5407ba90c 100644 --- a/internal/msggateway/message_handler.go +++ b/internal/msggateway/message_handler.go @@ -16,6 +16,7 @@ package msggateway import ( "context" + "encoding/json" "sync" "github.com/go-playground/validator/v10" @@ -31,6 +32,16 @@ import ( "github.com/openimsdk/tools/utils/jsonutil" ) +const ( + TextPing = "ping" + TextPong = "pong" +) + +type TextMessage struct { + Type string `json:"type"` + Body json.RawMessage `json:"body"` +} + type Req struct { ReqIdentifier int32 `json:"reqIdentifier" validate:"required"` Token string `json:"token"` diff --git a/internal/msggateway/ws_server.go b/internal/msggateway/ws_server.go index 48e4b5cee..e6b4f3fa4 100644 --- a/internal/msggateway/ws_server.go +++ b/internal/msggateway/ws_server.go @@ -37,7 +37,6 @@ type LongConnServer interface { SetKickHandlerInfo(i *kickHandler) SubUserOnlineStatus(ctx context.Context, client *Client, data *Req) ([]byte, error) Compressor - Encoder MessageHandler } @@ -61,7 +60,7 @@ type WsServer struct { authClient *rpcclient.Auth disCov discovery.SvcDiscoveryRegistry Compressor - Encoder + //Encoder MessageHandler webhookClient *webhook.Client } @@ -135,7 +134,6 @@ func NewWsServer(msgGatewayConfig *Config, opts ...Option) *WsServer { clients: newUserMap(), subscription: newSubscription(), Compressor: NewGzipCompressor(), - Encoder: NewGobEncoder(), webhookClient: webhook.NewWebhookClient(msgGatewayConfig.WebhooksConfig.URL), } } @@ -278,14 +276,7 @@ func (ws *WsServer) registerClient(client *Client) { wg.Wait() - log.ZDebug( - client.ctx, - "user online", - "online user Num", - ws.onlineUserNum.Load(), - "online user conn Num", - ws.onlineUserConnNum.Load(), - ) + log.ZDebug(client.ctx, "user online", "online user Num", ws.onlineUserNum.Load(), "online user conn Num", ws.onlineUserConnNum.Load()) } func getRemoteAdders(client []*Client) string { diff --git a/internal/push/offlinepush/jpush/body/notification.go b/internal/push/offlinepush/jpush/body/notification.go index 42e59c46c..383b3fb26 100644 --- a/internal/push/offlinepush/jpush/body/notification.go +++ b/internal/push/offlinepush/jpush/body/notification.go @@ -15,6 +15,7 @@ package body import ( + "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options" "github.com/openimsdk/open-im-server/v3/pkg/common/config" ) @@ -26,38 +27,44 @@ type Notification struct { type Android struct { Alert string `json:"alert,omitempty"` + Title string `json:"title,omitempty"` Intent struct { URL string `json:"url,omitempty"` } `json:"intent,omitempty"` - Extras Extras `json:"extras"` + Extras map[string]string `json:"extras,omitempty"` } type Ios struct { - Alert string `json:"alert,omitempty"` - Sound string `json:"sound,omitempty"` - Badge string `json:"badge,omitempty"` - Extras Extras `json:"extras"` - MutableContent bool `json:"mutable-content"` + Alert IosAlert `json:"alert,omitempty"` + Sound string `json:"sound,omitempty"` + Badge string `json:"badge,omitempty"` + Extras map[string]string `json:"extras,omitempty"` + MutableContent bool `json:"mutable-content"` } -type Extras struct { - ClientMsgID string `json:"clientMsgID"` +type IosAlert struct { + Title string `json:"title,omitempty"` + Body string `json:"body,omitempty"` } -func (n *Notification) SetAlert(alert string) { +func (n *Notification) SetAlert(alert string, title string, opts *options.Opts) { n.Alert = alert n.Android.Alert = alert - n.IOS.Alert = alert - n.IOS.Sound = "default" - n.IOS.Badge = "+1" + n.Android.Title = title + n.IOS.Alert.Body = alert + n.IOS.Alert.Title = title + n.IOS.Sound = opts.IOSPushSound + if opts.IOSBadgeCount { + n.IOS.Badge = "+1" + } } -func (n *Notification) SetExtras(extras Extras) { +func (n *Notification) SetExtras(extras map[string]string) { n.IOS.Extras = extras n.Android.Extras = extras } func (n *Notification) SetAndroidIntent(pushConf *config.Push) { - n.Android.Intent.URL = pushConf.JPNS.PushIntent + n.Android.Intent.URL = pushConf.JPush.PushIntent } func (n *Notification) IOSEnableMutableContent() { diff --git a/internal/push/offlinepush/jpush/push.go b/internal/push/offlinepush/jpush/push.go index dac52597f..2694902f2 100644 --- a/internal/push/offlinepush/jpush/push.go +++ b/internal/push/offlinepush/jpush/push.go @@ -18,9 +18,9 @@ import ( "context" "encoding/base64" "fmt" - "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options" "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/jpush/body" + "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/tools/utils/httputil" ) @@ -57,17 +57,23 @@ func (j *JPush) Push(ctx context.Context, userIDs []string, title, content strin var au body.Audience au.SetAlias(userIDs) var no body.Notification - var extras body.Extras + extras := make(map[string]string) + extras["ex"] = opts.Ex if opts.Signal.ClientMsgID != "" { - extras.ClientMsgID = opts.Signal.ClientMsgID + extras["ClientMsgID"] = opts.Signal.ClientMsgID } no.IOSEnableMutableContent() no.SetExtras(extras) - no.SetAlert(title) + no.SetAlert(content, title, opts) no.SetAndroidIntent(j.pushConf) var msg body.Message msg.SetMsgContent(content) + msg.SetTitle(title) + if opts.Signal.ClientMsgID != "" { + msg.SetExtras("ClientMsgID", opts.Signal.ClientMsgID) + } + msg.SetExtras("ex", opts.Ex) var opt body.Options opt.SetApnsProduction(j.pushConf.IOSPush.Production) var pushObj body.PushObj @@ -76,19 +82,26 @@ func (j *JPush) Push(ctx context.Context, userIDs []string, title, content strin pushObj.SetNotification(&no) pushObj.SetMessage(&msg) pushObj.SetOptions(&opt) - var resp any - return j.request(ctx, pushObj, resp, 5) + var resp map[string]any + return j.request(ctx, pushObj, &resp, 5) } -func (j *JPush) request(ctx context.Context, po body.PushObj, resp any, timeout int) error { - return j.httpClient.PostReturn( +func (j *JPush) request(ctx context.Context, po body.PushObj, resp *map[string]any, timeout int) error { + err := j.httpClient.PostReturn( ctx, - j.pushConf.JPNS.PushURL, + j.pushConf.JPush.PushURL, map[string]string{ - "Authorization": j.getAuthorization(j.pushConf.JPNS.AppKey, j.pushConf.JPNS.MasterSecret), + "Authorization": j.getAuthorization(j.pushConf.JPush.AppKey, j.pushConf.JPush.MasterSecret), }, po, resp, timeout, ) + if err != nil { + return err + } + if (*resp)["sendno"] != "0" { + return fmt.Errorf("jpush push failed %v", resp) + } + return nil } diff --git a/internal/push/offlinepush_handler.go b/internal/push/offlinepush_handler.go index a80c147f4..5c69da005 100644 --- a/internal/push/offlinepush_handler.go +++ b/internal/push/offlinepush_handler.go @@ -73,7 +73,7 @@ func (o *OfflinePushConsumerHandler) getOfflinePushInfos(msg *sdkws.MsgData) (ti IsAtSelf bool `json:"isAtSelf"` } - opts = &options.Opts{Signal: &options.Signal{}} + opts = &options.Opts{Signal: &options.Signal{ClientMsgID: msg.ClientMsgID}} if msg.OfflinePushInfo != nil { opts.IOSBadgeCount = msg.OfflinePushInfo.IOSBadgeCount opts.IOSPushSound = msg.OfflinePushInfo.IOSPushSound diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index 41ad5962a..ae308dfe9 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -4,6 +4,10 @@ import ( "context" "encoding/json" + "math/rand" + "strconv" + "time" + "github.com/IBM/sarama" "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush" "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options" @@ -27,9 +31,6 @@ import ( "github.com/openimsdk/tools/utils/timeutil" "github.com/redis/go-redis/v9" "google.golang.org/protobuf/proto" - "math/rand" - "strconv" - "time" ) type ConsumerHandler struct { @@ -335,6 +336,7 @@ func (c *ConsumerHandler) groupMessagesHandler(ctx context.Context, groupID stri func (c *ConsumerHandler) offlinePushMsg(ctx context.Context, msg *sdkws.MsgData, offlinePushUserIDs []string) error { title, content, opts, err := c.getOfflinePushInfos(msg) if err != nil { + log.ZError(ctx, "getOfflinePushInfos failed", err, "msg", msg) return err } err = c.offlinePusher.Push(ctx, offlinePushUserIDs, title, content, opts) @@ -364,7 +366,7 @@ func (c *ConsumerHandler) getOfflinePushInfos(msg *sdkws.MsgData) (title, conten IsAtSelf bool `json:"isAtSelf"` } - opts = &options.Opts{Signal: &options.Signal{}} + opts = &options.Opts{Signal: &options.Signal{ClientMsgID: msg.ClientMsgID}} if msg.OfflinePushInfo != nil { opts.IOSBadgeCount = msg.OfflinePushInfo.IOSBadgeCount opts.IOSPushSound = msg.OfflinePushInfo.IOSPushSound diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index da6c63d60..c40018cbc 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -69,6 +69,7 @@ type Mongo struct { Database string `mapstructure:"database"` Username string `mapstructure:"username"` Password string `mapstructure:"password"` + AuthSource string `mapstructure:"authSource"` MaxPoolSize int `mapstructure:"maxPoolSize"` MaxRetry int `mapstructure:"maxRetry"` } @@ -212,12 +213,12 @@ type Push struct { FilePath string `mapstructure:"filePath"` AuthURL string `mapstructure:"authURL"` } `mapstructure:"fcm"` - JPNS struct { + JPush struct { AppKey string `mapstructure:"appKey"` MasterSecret string `mapstructure:"masterSecret"` PushURL string `mapstructure:"pushURL"` PushIntent string `mapstructure:"pushIntent"` - } `mapstructure:"jpns"` + } `mapstructure:"jpush"` IOSPush struct { PushSound string `mapstructure:"pushSound"` BadgeCount bool `mapstructure:"badgeCount"` @@ -490,6 +491,7 @@ func (m *Mongo) Build() *mongoutil.Config { Database: m.Database, Username: m.Username, Password: m.Password, + AuthSource: m.AuthSource, MaxPoolSize: m.MaxPoolSize, MaxRetry: m.MaxRetry, } diff --git a/pkg/common/storage/controller/auth.go b/pkg/common/storage/controller/auth.go index 82d6f9476..df1274967 100644 --- a/pkg/common/storage/controller/auth.go +++ b/pkg/common/storage/controller/auth.go @@ -2,15 +2,14 @@ package controller import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" - "github.com/openimsdk/tools/log" - "github.com/golang-jwt/jwt/v4" "github.com/openimsdk/open-im-server/v3/pkg/authverify" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/tools/errs" + "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/tokenverify" ) diff --git a/scripts/create-topic.sh b/scripts/create-topic.sh deleted file mode 100755 index 866230e4f..000000000 --- a/scripts/create-topic.sh +++ /dev/null @@ -1,41 +0,0 @@ -#!/usr/bin/env bash -# Wait for Kafka to be ready - -KAFKA_SERVER=kafka-service:9092 - -MAX_ATTEMPTS=300 -attempt_num=1 - -echo "Waiting for Kafka to be ready..." - -until /opt/bitnami/kafka/bin/kafka-topics.sh --list --bootstrap-server $KAFKA_SERVER; do - echo "Attempt $attempt_num of $MAX_ATTEMPTS: Kafka not ready yet..." - if [ $attempt_num -eq $MAX_ATTEMPTS ]; then - echo "Kafka not ready after $MAX_ATTEMPTS attempts, exiting" - exit 1 - fi - attempt_num=$((attempt_num+1)) - sleep 1 -done - -echo "Kafka is ready. Creating topics..." - - -topics=("toRedis" "toMongo" "toPush" "toOfflinePush") -partitions=8 -replicationFactor=1 - -for topic in "${topics[@]}"; do - if /opt/bitnami/kafka/bin/kafka-topics.sh --create \ - --bootstrap-server $KAFKA_SERVER \ - --replication-factor $replicationFactor \ - --partitions $partitions \ - --topic $topic - then - echo "Topic $topic created." - else - echo "Failed to create topic $topic." - fi -done - -echo "All topics created." diff --git a/scripts/mongo-init.sh b/scripts/mongo-init.sh deleted file mode 100755 index 25bb2d654..000000000 --- a/scripts/mongo-init.sh +++ /dev/null @@ -1,66 +0,0 @@ -# Copyright © 2023 OpenIM. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -mongosh < 0) { - try { - db = connect('mongodb://127.0.0.1:27017/admin'); - var authResult = db.auth(rootUsername, rootPassword); - if (authResult) { - print('Authentication successful for root user: ' + rootUsername); - connected = true; - } else { - print('Authentication failed for root user: ' + rootUsername + ' with password: ' + rootPassword); - quit(1); - } - } catch (e) { - maxRetries--; - print('Connection failed, retrying... Remaining attempts: ' + maxRetries); - sleep(1000); // Sleep for 1 second - } -} - -if (connected) { - db = db.getSiblingDB(dbName); - var createUserResult = db.createUser({ - user: openimUsername, - pwd: openimPassword, - roles: [{ - role: 'readWrite', - db: dbName - }] - }); - - if (createUserResult.ok == 1) { - print('User creation successful. User: ' + openimUsername + ', Database: ' + dbName); - } else { - print('User creation failed for user: ' + openimUsername + ' in database: ' + dbName); - quit(1); - } -} else { - print('Failed to connect to MongoDB after 300 retries.'); - quit(1); -} -EOF - - - - diff --git a/tools/check-component/main.go b/tools/check-component/main.go index 384d8c1b9..646228b8e 100644 --- a/tools/check-component/main.go +++ b/tools/check-component/main.go @@ -72,7 +72,7 @@ func CheckMinIO(ctx context.Context, config *config.Minio) error { } func CheckKafka(ctx context.Context, conf *config.Kafka) error { - return kafka.Check(ctx, conf.Build(), []string{conf.ToMongoTopic, conf.ToRedisTopic, conf.ToPushTopic, conf.ToOfflinePushTopic}) + return kafka.CheckHealth(ctx, conf.Build()) } func initConfig(configDir string) (*config.Mongo, *config.Redis, *config.Kafka, *config.Minio, *config.Discovery, error) {