Merge branch 'main' into main

pull/1386/head
Xinwei Xiong 2 years ago committed by GitHub
commit 7994ba1557
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,90 +0,0 @@
# Copyright © 2023 OpenIM. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
name: OpenIM API TEST
on:
push:
branches:
- main
paths-ignore:
- "docs/**"
- "README.md"
- "README_zh-CN.md"
- "CONTRIBUTING.md"
pull_request:
branches:
- main
paths-ignore:
- "README.md"
- "README_zh-CN.md"
- "CONTRIBUTING.md"
- "docs/**"
env:
GO_VERSION: "1.19"
GOLANGCI_VERSION: "v1.50.1"
jobs:
execute-linux-systemd-scripts:
name: Execute OpenIM script on ${{ matrix.os }}
runs-on: ${{ matrix.os }}
environment:
name: openim
strategy:
matrix:
go_version: ["1.20"]
os: ["ubuntu-latest"]
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Go ${{ matrix.go_version }}
uses: actions/setup-go@v4
with:
go-version: ${{ matrix.go_version }}
id: go
- name: Install Task
uses: arduino/setup-task@v1
with:
version: '3.x' # If available, use the latest major version that's compatible
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Docker Operations
run: |
curl -o docker-compose.yml https://raw.githubusercontent.com/OpenIMSDK/openim-docker/main/example/basic-openim-server-dependency.yml
sudo docker compose up -d
sudo sleep 60
- name: Module Operations
run: |
sudo make tidy
sudo make tools.verify.go-gitlint
- name: Build, Start, Check Services and Print Logs
run: |
sudo ./scripts/install/install.sh -i && \
sudo ./scripts/install/install.sh -s && \
(echo "An error occurred, printing logs:" && sudo cat ./_output/logs/* 2>/dev/null)
- name: Run Test
run: |
sudo make test-api && \
(echo "An error occurred, printing logs:" && sudo cat ./_output/logs/* 2>/dev/null)
- name: Stop Services
run: |
sudo ./scripts/install/install.sh -u && \
(echo "An error occurred, printing logs:" && sudo cat ./_output/logs/* 2>/dev/null)

@ -15,11 +15,11 @@
name: Build OpenIM Web Docker image name: Build OpenIM Web Docker image
on: on:
schedule: # schedule:
- cron: '30 3 * * *' # - cron: '30 3 * * *'
push: push:
branches: branches:
- main # - main
- release-* - release-*
tags: tags:
- v* - v*

BIN
chat

Binary file not shown.

@ -123,6 +123,30 @@ Explore our Helm-Charts repository and read through: [Helm-Charts Repository](ht
Using the helm charts repository, you can ignore the following configuration, but if you want to just use the server and scale on top of it, you can go ahead: Using the helm charts repository, you can ignore the following configuration, but if you want to just use the server and scale on top of it, you can go ahead:
**Use the Helm template to generate the deployment yaml file: `openim-charts.yaml`**
**Gen Image:**
```bash
../scripts/genconfig.sh ../scripts/install/environment.sh ./templates/helm-image.yaml > ./charts/generated-configs/helm-image.yaml
```
**Gen Charts:**
```bash
for chart in ./charts/*/; do
if [[ "$chart" == *"generated-configs"* || "$chart" == *"helmfile.yaml"* ]]; then
continue
fi
if [ -f "${chart}values.yaml" ]; then
helm template "$chart" -f "./charts/generated-configs/helm-image.yaml" -f "./charts/generated-configs/config.yaml" -f "./charts/generated-configs/notification.yaml" >> openim-charts.yaml
else
helm template "$chart" >> openim-charts.yaml
fi
done
```
**Use Helmfile:** **Use Helmfile:**
```bash ```bash

File diff suppressed because it is too large Load Diff

@ -96,7 +96,14 @@ verifyCode:
accessKeySecret: "" accessKeySecret: ""
signName: "" signName: ""
verificationCodeTemplateCode: "" verificationCodeTemplateCode: ""
mail: # 根据对应的发件邮箱更改 sendMail、senderAuthorizationCode、smtpAddr、smtpPort 即可
title: ""
senderMail: "" # 发送者
senderAuthorizationCode: "" # 授权码
smtpAddr: "smtp.qq.com" # smtp 服务器地址
smtpPort: 25 # smtp 服务器邮件发送端口
testDepartMentID: 001
imAPIURL: http://127.0.0.1:10002
###################### Proxy Header ###################### ###################### Proxy Header ######################
# 获取ip的header,没有配置直接获取远程地址 # 获取ip的header,没有配置直接获取远程地址

@ -45,6 +45,7 @@ require (
github.com/redis/go-redis/v9 v9.2.1 github.com/redis/go-redis/v9 v9.2.1
github.com/tencentyun/cos-go-sdk-v5 v0.7.45 github.com/tencentyun/cos-go-sdk-v5 v0.7.45
go.uber.org/automaxprocs v1.5.3 go.uber.org/automaxprocs v1.5.3
golang.org/x/sync v0.4.0
gopkg.in/src-d/go-git.v4 v4.13.1 gopkg.in/src-d/go-git.v4 v4.13.1
gotest.tools v2.2.0+incompatible gotest.tools v2.2.0+incompatible
) )
@ -132,7 +133,6 @@ require (
golang.org/x/arch v0.3.0 // indirect golang.org/x/arch v0.3.0 // indirect
golang.org/x/net v0.17.0 // indirect golang.org/x/net v0.17.0 // indirect
golang.org/x/oauth2 v0.13.0 // indirect golang.org/x/oauth2 v0.13.0 // indirect
golang.org/x/sync v0.4.0 // indirect
golang.org/x/sys v0.13.0 // indirect golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect golang.org/x/text v0.13.0 // indirect
golang.org/x/time v0.3.0 // indirect golang.org/x/time v0.3.0 // indirect

@ -17,22 +17,19 @@ package msggateway
import ( import (
"context" "context"
"github.com/OpenIMSDK/tools/mcontext"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/OpenIMSDK/tools/errs"
"google.golang.org/grpc" "google.golang.org/grpc"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/protocol/msggateway" "github.com/OpenIMSDK/protocol/msggateway"
"github.com/OpenIMSDK/tools/discoveryregistry" "github.com/OpenIMSDK/tools/discoveryregistry"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/log" "github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/mcontext"
"github.com/OpenIMSDK/tools/utils" "github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
) )
@ -41,6 +38,7 @@ func (s *Server) InitServer(disCov discoveryregistry.SvcDiscoveryRegistry, serve
if err != nil { if err != nil {
return err return err
} }
msgModel := cache.NewMsgCacheModel(rdb) msgModel := cache.NewMsgCacheModel(rdb)
s.LongConnServer.SetDiscoveryRegistry(disCov) s.LongConnServer.SetDiscoveryRegistry(disCov)
s.LongConnServer.SetCacheHandler(msgModel) s.LongConnServer.SetCacheHandler(msgModel)
@ -97,22 +95,25 @@ func (s *Server) GetUsersOnlineStatus(
if !ok { if !ok {
continue continue
} }
temp := new(msggateway.GetUsersOnlineStatusResp_SuccessResult)
temp.UserID = userID uresp := new(msggateway.GetUsersOnlineStatusResp_SuccessResult)
uresp.UserID = userID
for _, client := range clients { for _, client := range clients {
if client != nil { if client == nil {
ps := new(msggateway.GetUsersOnlineStatusResp_SuccessDetail) continue
ps.Platform = constant.PlatformIDToName(client.PlatformID)
ps.Status = constant.OnlineStatus
ps.ConnID = client.ctx.GetConnID()
ps.Token = client.token
ps.IsBackground = client.IsBackground
temp.Status = constant.OnlineStatus
temp.DetailPlatformStatus = append(temp.DetailPlatformStatus, ps)
} }
ps := new(msggateway.GetUsersOnlineStatusResp_SuccessDetail)
ps.Platform = constant.PlatformIDToName(client.PlatformID)
ps.Status = constant.OnlineStatus
ps.ConnID = client.ctx.GetConnID()
ps.Token = client.token
ps.IsBackground = client.IsBackground
uresp.Status = constant.OnlineStatus
uresp.DetailPlatformStatus = append(uresp.DetailPlatformStatus, ps)
} }
if temp.Status == constant.OnlineStatus { if uresp.Status == constant.OnlineStatus {
resp.SuccessResult = append(resp.SuccessResult, temp) resp.SuccessResult = append(resp.SuccessResult, uresp)
} }
} }
return &resp, nil return &resp, nil
@ -129,50 +130,55 @@ func (s *Server) SuperGroupOnlineBatchPushOneMsg(
ctx context.Context, ctx context.Context,
req *msggateway.OnlineBatchPushOneMsgReq, req *msggateway.OnlineBatchPushOneMsgReq,
) (*msggateway.OnlineBatchPushOneMsgResp, error) { ) (*msggateway.OnlineBatchPushOneMsgResp, error) {
var singleUserResult []*msggateway.SingleMsgToUserResults
var singleUserResults []*msggateway.SingleMsgToUserResults
for _, v := range req.PushToUserIDs { for _, v := range req.PushToUserIDs {
var resp []*msggateway.SingleMsgToUserPlatform var resp []*msggateway.SingleMsgToUserPlatform
tempT := &msggateway.SingleMsgToUserResults{ results := &msggateway.SingleMsgToUserResults{
UserID: v, UserID: v,
} }
clients, ok := s.LongConnServer.GetUserAllCons(v) clients, ok := s.LongConnServer.GetUserAllCons(v)
if !ok { if !ok {
log.ZDebug(ctx, "push user not online", "userID", v) log.ZDebug(ctx, "push user not online", "userID", v)
tempT.Resp = resp results.Resp = resp
singleUserResult = append(singleUserResult, tempT) singleUserResults = append(singleUserResults, results)
continue continue
} }
log.ZDebug(ctx, "push user online", "clients", clients, "userID", v) log.ZDebug(ctx, "push user online", "clients", clients, "userID", v)
for _, client := range clients { for _, client := range clients {
if client != nil { if client == nil {
temp := &msggateway.SingleMsgToUserPlatform{ continue
RecvID: v, }
RecvPlatFormID: int32(client.PlatformID),
} userPlatform := &msggateway.SingleMsgToUserPlatform{
if !client.IsBackground || RecvID: v,
(client.IsBackground == true && client.PlatformID != constant.IOSPlatformID) { RecvPlatFormID: int32(client.PlatformID),
err := client.PushMessage(ctx, req.MsgData) }
if err != nil { if !client.IsBackground ||
temp.ResultCode = -2 (client.IsBackground && client.PlatformID != constant.IOSPlatformID) {
resp = append(resp, temp) err := client.PushMessage(ctx, req.MsgData)
} else { if err != nil {
if utils.IsContainInt(client.PlatformID, s.pushTerminal) { userPlatform.ResultCode = -2
tempT.OnlinePush = true resp = append(resp, userPlatform)
resp = append(resp, temp)
}
}
} else { } else {
temp.ResultCode = -3 if utils.IsContainInt(client.PlatformID, s.pushTerminal) {
resp = append(resp, temp) results.OnlinePush = true
resp = append(resp, userPlatform)
}
} }
} else {
userPlatform.ResultCode = -3
resp = append(resp, userPlatform)
} }
} }
tempT.Resp = resp results.Resp = resp
singleUserResult = append(singleUserResult, tempT) singleUserResults = append(singleUserResults, results)
} }
return &msggateway.OnlineBatchPushOneMsgResp{ return &msggateway.OnlineBatchPushOneMsgResp{
SinglePushResult: singleUserResult, SinglePushResult: singleUserResults,
}, nil }, nil
} }
@ -181,17 +187,21 @@ func (s *Server) KickUserOffline(
req *msggateway.KickUserOfflineReq, req *msggateway.KickUserOfflineReq,
) (*msggateway.KickUserOfflineResp, error) { ) (*msggateway.KickUserOfflineResp, error) {
for _, v := range req.KickUserIDList { for _, v := range req.KickUserIDList {
if clients, _, ok := s.LongConnServer.GetUserPlatformCons(v, int(req.PlatformID)); ok { clients, _, ok := s.LongConnServer.GetUserPlatformCons(v, int(req.PlatformID))
for _, client := range clients { if !ok {
log.ZDebug(ctx, "kick user offline", "userID", v, "platformID", req.PlatformID, "client", client)
if err := client.longConnServer.KickUserConn(client); err != nil {
log.ZWarn(ctx, "kick user offline failed", err, "userID", v, "platformID", req.PlatformID)
}
}
} else {
log.ZInfo(ctx, "conn not exist", "userID", v, "platformID", req.PlatformID) log.ZInfo(ctx, "conn not exist", "userID", v, "platformID", req.PlatformID)
continue
}
for _, client := range clients {
log.ZDebug(ctx, "kick user offline", "userID", v, "platformID", req.PlatformID, "client", client)
if err := client.longConnServer.KickUserConn(client); err != nil {
log.ZWarn(ctx, "kick user offline failed", err, "userID", v, "platformID", req.PlatformID)
}
} }
continue
} }
return &msggateway.KickUserOfflineResp{}, nil return &msggateway.KickUserOfflineResp{}, nil
} }

@ -23,26 +23,22 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/OpenIMSDK/protocol/msggateway" "github.com/go-playground/validator/v10"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/OpenIMSDK/protocol/constant"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
"golang.org/x/sync/errgroup"
"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/protocol/msggateway"
"github.com/OpenIMSDK/tools/discoveryregistry" "github.com/OpenIMSDK/tools/discoveryregistry"
"github.com/go-playground/validator/v10"
"github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/log" "github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/utils" "github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
) )
type LongConnServer interface { type LongConnServer interface {
@ -61,12 +57,6 @@ type LongConnServer interface {
MessageHandler MessageHandler
} }
var bufferPool = sync.Pool{
New: func() interface{} {
return make([]byte, 1024)
},
}
type WsServer struct { type WsServer struct {
port int port int
wsMaxConnNum int64 wsMaxConnNum int64
@ -78,7 +68,6 @@ type WsServer struct {
onlineUserNum atomic.Int64 onlineUserNum atomic.Int64
onlineUserConnNum atomic.Int64 onlineUserConnNum atomic.Int64
handshakeTimeout time.Duration handshakeTimeout time.Duration
hubServer *Server
validate *validator.Validate validate *validator.Validate
cache cache.MsgModel cache cache.MsgModel
userClient *rpcclient.UserRpcClient userClient *rpcclient.UserRpcClient
@ -183,27 +172,39 @@ func (ws *WsServer) Run() error {
return http.ListenAndServe(":"+utils.IntToString(ws.port), nil) // Start listening return http.ListenAndServe(":"+utils.IntToString(ws.port), nil) // Start listening
} }
var concurrentRequest = 3
func (ws *WsServer) sendUserOnlineInfoToOtherNode(ctx context.Context, client *Client) error { func (ws *WsServer) sendUserOnlineInfoToOtherNode(ctx context.Context, client *Client) error {
conns, err := ws.disCov.GetConns(ctx, config.Config.RpcRegisterName.OpenImMessageGatewayName) conns, err := ws.disCov.GetConns(ctx, config.Config.RpcRegisterName.OpenImMessageGatewayName)
if err != nil { if err != nil {
return err return err
} }
wg := errgroup.Group{}
wg.SetLimit(concurrentRequest)
// Online push user online message to other node // Online push user online message to other node
for _, v := range conns { for _, v := range conns {
v := v // safe closure var
if v.Target() == ws.disCov.GetSelfConnTarget() { if v.Target() == ws.disCov.GetSelfConnTarget() {
log.ZDebug(ctx, "Filter out this node", "node", v.Target()) log.ZDebug(ctx, "Filter out this node", "node", v.Target())
continue continue
} }
msgClient := msggateway.NewMsgGatewayClient(v)
_, err := msgClient.MultiTerminalLoginCheck(ctx, &msggateway.MultiTerminalLoginCheckReq{ wg.Go(func() error {
UserID: client.UserID, msgClient := msggateway.NewMsgGatewayClient(v)
PlatformID: int32(client.PlatformID), Token: client.token, _, err := msgClient.MultiTerminalLoginCheck(ctx, &msggateway.MultiTerminalLoginCheckReq{
UserID: client.UserID,
PlatformID: int32(client.PlatformID), Token: client.token,
})
if err != nil {
log.ZWarn(ctx, "MultiTerminalLoginCheck err", err, "node", v.Target())
}
return nil
}) })
if err != nil {
log.ZWarn(ctx, "MultiTerminalLoginCheck err", err, "node", v.Target())
continue
}
} }
_ = wg.Wait()
return nil return nil
} }
@ -289,70 +290,72 @@ func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Clien
} }
fallthrough fallthrough
case constant.AllLoginButSameTermKick: case constant.AllLoginButSameTermKick:
if clientOK { if !clientOK {
isDeleteUser := ws.clients.deleteClients(newClient.UserID, oldClients) return
if isDeleteUser { }
ws.onlineUserNum.Add(-1)
} isDeleteUser := ws.clients.deleteClients(newClient.UserID, oldClients)
for _, c := range oldClients { if isDeleteUser {
err := c.KickOnlineMessage() ws.onlineUserNum.Add(-1)
if err != nil { }
log.ZWarn(c.ctx, "KickOnlineMessage", err) for _, c := range oldClients {
} err := c.KickOnlineMessage()
if err != nil {
log.ZWarn(c.ctx, "KickOnlineMessage", err)
} }
m, err := ws.cache.GetTokensWithoutError( }
m, err := ws.cache.GetTokensWithoutError(
newClient.ctx,
newClient.UserID,
newClient.PlatformID,
)
if err != nil && err != redis.Nil {
log.ZWarn(
newClient.ctx, newClient.ctx,
"get token from redis err",
err,
"userID",
newClient.UserID, newClient.UserID,
"platformID",
newClient.PlatformID, newClient.PlatformID,
) )
if err != nil && err != redis.Nil { return
log.ZWarn( }
newClient.ctx, if m == nil {
"get token from redis err", log.ZWarn(
err,
"userID",
newClient.UserID,
"platformID",
newClient.PlatformID,
)
return
}
if m == nil {
log.ZWarn(
newClient.ctx,
"m is nil",
errors.New("m is nil"),
"userID",
newClient.UserID,
"platformID",
newClient.PlatformID,
)
return
}
log.ZDebug(
newClient.ctx, newClient.ctx,
"get token from redis", "m is nil",
errors.New("m is nil"),
"userID", "userID",
newClient.UserID, newClient.UserID,
"platformID", "platformID",
newClient.PlatformID, newClient.PlatformID,
"tokenMap",
m,
) )
return
for k := range m { }
if k != newClient.ctx.GetToken() { log.ZDebug(
m[k] = constant.KickedToken newClient.ctx,
} "get token from redis",
} "userID",
log.ZDebug(newClient.ctx, "set token map is ", "token map", m, "userID", newClient.UserID,
newClient.UserID, "token", newClient.ctx.GetToken()) "platformID",
err = ws.cache.SetTokenMapByUidPid(newClient.ctx, newClient.UserID, newClient.PlatformID, m) newClient.PlatformID,
if err != nil { "tokenMap",
log.ZWarn(newClient.ctx, "SetTokenMapByUidPid err", err, "userID", newClient.UserID, "platformID", newClient.PlatformID) m,
return )
for k := range m {
if k != newClient.ctx.GetToken() {
m[k] = constant.KickedToken
} }
} }
log.ZDebug(newClient.ctx, "set token map is ", "token map", m, "userID",
newClient.UserID, "token", newClient.ctx.GetToken())
err = ws.cache.SetTokenMapByUidPid(newClient.ctx, newClient.UserID, newClient.PlatformID, m)
if err != nil {
log.ZWarn(newClient.ctx, "SetTokenMapByUidPid err", err, "userID", newClient.UserID, "platformID", newClient.PlatformID)
return
}
} }
} }
@ -365,7 +368,7 @@ func (ws *WsServer) unregisterClient(client *Client) {
} }
ws.onlineUserConnNum.Add(-1) ws.onlineUserConnNum.Add(-1)
ws.SetUserOnlineStatus(client.ctx, client, constant.Offline) ws.SetUserOnlineStatus(client.ctx, client, constant.Offline)
log.ZInfo(client.ctx, "user offline", "close reason", client.closedErr, "online user Num", ws.onlineUserNum, "online user conn Num", log.ZInfo(client.ctx, "user offline", "close reason", client.closedErr, "online user Num", ws.onlineUserNum.Load(), "online user conn Num",
ws.onlineUserConnNum.Load(), ws.onlineUserConnNum.Load(),
) )
} }
@ -404,7 +407,7 @@ func (ws *WsServer) wsHandler(w http.ResponseWriter, r *http.Request) {
httpError(connContext, errs.ErrConnArgsErr) httpError(connContext, errs.ErrConnArgsErr)
return return
} }
if err := authverify.WsVerifyToken(token, userID, platformID); err != nil { if err = authverify.WsVerifyToken(token, userID, platformID); err != nil {
httpError(connContext, err) httpError(connContext, err)
return return
} }

@ -21,14 +21,13 @@ import (
"net/http" "net/http"
"sync" "sync"
"github.com/OpenIMSDK/tools/mw"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors" "github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
"github.com/OpenIMSDK/tools/mw"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"

@ -427,49 +427,62 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(
break break
} }
} }
rwLock := new(sync.RWMutex)
log.ZDebug(context.Background(), "online new session msg come", "highWaterMarkOffset", log.ZDebug(context.Background(), "online new session msg come", "highWaterMarkOffset",
claim.HighWaterMarkOffset(), "topic", claim.Topic(), "partition", claim.Partition()) claim.HighWaterMarkOffset(), "topic", claim.Topic(), "partition", claim.Partition())
cMsg := make([]*sarama.ConsumerMessage, 0, 1000)
t := time.NewTicker(time.Millisecond * 100) split := 1000
rwLock := new(sync.RWMutex)
messages := make([]*sarama.ConsumerMessage, 0, 1000)
ticker := time.NewTicker(time.Millisecond * 100)
go func() { go func() {
for { for {
select { select {
case <-t.C: case <-ticker.C:
if len(cMsg) > 0 { if len(messages) == 0 {
rwLock.Lock() continue
ccMsg := make([]*sarama.ConsumerMessage, 0, 1000) }
for _, v := range cMsg {
ccMsg = append(ccMsg, v) rwLock.Lock()
} buffer := make([]*sarama.ConsumerMessage, 0, len(messages))
cMsg = make([]*sarama.ConsumerMessage, 0, 1000) buffer = append(buffer, messages...)
rwLock.Unlock()
split := 1000 // reuse slice, set cap to 0
ctx := mcontext.WithTriggerIDContext(context.Background(), utils.OperationIDGenerator()) messages = messages[:0]
log.ZDebug(ctx, "timer trigger msg consumer start", "length", len(ccMsg)) rwLock.Unlock()
for i := 0; i < len(ccMsg)/split; i++ {
// log.Debug() start := time.Now()
och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ ctx := mcontext.WithTriggerIDContext(context.Background(), utils.OperationIDGenerator())
ctx: ctx, cMsgList: ccMsg[i*split : (i+1)*split], log.ZDebug(ctx, "timer trigger msg consumer start", "length", len(buffer))
}} for i := 0; i < len(buffer)/split; i++ {
} och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{
if (len(ccMsg) % split) > 0 { ctx: ctx, cMsgList: buffer[i*split : (i+1)*split],
och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ }}
ctx: ctx, cMsgList: ccMsg[split*(len(ccMsg)/split):], }
}} if (len(buffer) % split) > 0 {
} och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{
log.ZDebug(ctx, "timer trigger msg consumer end", "length", len(ccMsg)) ctx: ctx, cMsgList: buffer[split*(len(buffer)/split):],
}}
} }
log.ZDebug(ctx, "timer trigger msg consumer end",
"length", len(buffer), "time_cost", time.Since(start),
)
} }
} }
}() }()
for msg := range claim.Messages() { for msg := range claim.Messages() {
rwLock.Lock() if len(msg.Value) == 0 {
if len(msg.Value) != 0 { continue
cMsg = append(cMsg, msg)
} }
rwLock.Lock()
messages = append(messages, msg)
rwLock.Unlock() rwLock.Unlock()
sess.MarkMessage(msg, "") sess.MarkMessage(msg, "")
} }
return nil return nil
} }

@ -17,13 +17,18 @@ package tools
import ( import (
"context" "context"
"fmt" "fmt"
"sync" "os"
"os/signal"
"syscall"
"time"
"github.com/redis/go-redis/v9"
"github.com/robfig/cron/v3" "github.com/robfig/cron/v3"
"github.com/OpenIMSDK/tools/log" "github.com/OpenIMSDK/tools/log"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
) )
func StartTask() error { func StartTask() error {
@ -32,23 +37,75 @@ func StartTask() error {
if err != nil { if err != nil {
return err return err
} }
msgTool.ConvertTools()
c := cron.New() msgTool.convertTools()
var wg sync.WaitGroup
wg.Add(1) rdb, err := cache.NewRedis()
if err != nil {
return err
}
// register cron tasks
var crontab = cron.New()
log.ZInfo(context.Background(), "start chatRecordsClearTime cron task", "cron config", config.Config.ChatRecordsClearTime) log.ZInfo(context.Background(), "start chatRecordsClearTime cron task", "cron config", config.Config.ChatRecordsClearTime)
_, err = c.AddFunc(config.Config.ChatRecordsClearTime, msgTool.AllConversationClearMsgAndFixSeq) _, err = crontab.AddFunc(config.Config.ChatRecordsClearTime, cronWrapFunc(rdb, "cron_clear_msg_and_fix_seq", msgTool.AllConversationClearMsgAndFixSeq))
if err != nil { if err != nil {
log.ZError(context.Background(), "start allConversationClearMsgAndFixSeq cron failed", err) log.ZError(context.Background(), "start allConversationClearMsgAndFixSeq cron failed", err)
panic(err) panic(err)
} }
log.ZInfo(context.Background(), "start msgDestruct cron task", "cron config", config.Config.MsgDestructTime) log.ZInfo(context.Background(), "start msgDestruct cron task", "cron config", config.Config.MsgDestructTime)
_, err = c.AddFunc(config.Config.MsgDestructTime, msgTool.ConversationsDestructMsgs) _, err = crontab.AddFunc(config.Config.MsgDestructTime, cronWrapFunc(rdb, "cron_conversations_destruct_msgs", msgTool.ConversationsDestructMsgs))
if err != nil { if err != nil {
log.ZError(context.Background(), "start conversationsDestructMsgs cron failed", err) log.ZError(context.Background(), "start conversationsDestructMsgs cron failed", err)
panic(err) panic(err)
} }
c.Start()
wg.Wait() // start crontab
crontab.Start()
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
<-sigs
// stop crontab, Wait for the running task to exit.
ctx := crontab.Stop()
select {
case <-ctx.Done():
// graceful exit
case <-time.After(15 * time.Second):
// forced exit on timeout
}
return nil return nil
} }
// netlock redis lock.
func netlock(rdb redis.UniversalClient, key string, ttl time.Duration) bool {
value := "used"
ok, err := rdb.SetNX(context.Background(), key, value, ttl).Result() // nolint
if err != nil {
// when err is about redis server, return true.
return false
}
return ok
}
func cronWrapFunc(rdb redis.UniversalClient, key string, fn func()) func() {
enableCronLocker := config.Config.EnableCronLocker
return func() {
// if don't enable cron-locker, call fn directly.
if !enableCronLocker {
fn()
return
}
// when acquire redis lock, call fn().
if netlock(rdb, key, 5*time.Second) {
fn()
}
}
}

@ -0,0 +1,82 @@
package tools
import (
"fmt"
"math/rand"
"sync"
"testing"
"time"
"github.com/redis/go-redis/v9"
"github.com/robfig/cron/v3"
"github.com/stretchr/testify/assert"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
)
func TestDisLock(t *testing.T) {
rdb := redis.NewClient(&redis.Options{})
defer rdb.Close()
assert.Equal(t, true, netlock(rdb, "cron-1", 1*time.Second))
// if exists, get false
assert.Equal(t, false, netlock(rdb, "cron-1", 1*time.Second))
time.Sleep(2 * time.Second)
// wait for key on timeout, get true
assert.Equal(t, true, netlock(rdb, "cron-1", 2*time.Second))
// set different key
assert.Equal(t, true, netlock(rdb, "cron-2", 2*time.Second))
}
func TestCronWrapFunc(t *testing.T) {
rdb := redis.NewClient(&redis.Options{})
defer rdb.Close()
once := sync.Once{}
done := make(chan struct{}, 1)
cb := func() {
once.Do(func() {
close(done)
})
}
start := time.Now()
key := fmt.Sprintf("cron-%v", rand.Int31())
crontab := cron.New(cron.WithSeconds())
crontab.AddFunc("*/1 * * * * *", cronWrapFunc(rdb, key, cb))
crontab.Start()
<-done
dur := time.Since(start)
assert.LessOrEqual(t, dur.Seconds(), float64(2*time.Second))
crontab.Stop()
}
func TestCronWrapFuncWithNetlock(t *testing.T) {
config.Config.EnableCronLocker = true
rdb := redis.NewClient(&redis.Options{})
defer rdb.Close()
done := make(chan string, 10)
crontab := cron.New(cron.WithSeconds())
key := fmt.Sprintf("cron-%v", rand.Int31())
crontab.AddFunc("*/1 * * * * *", cronWrapFunc(rdb, key, func() {
done <- "host1"
}))
crontab.AddFunc("*/1 * * * * *", cronWrapFunc(rdb, key, func() {
done <- "host2"
}))
crontab.Start()
time.Sleep(12 * time.Second)
// the ttl of netlock is 5s, so expected value is 2.
assert.Equal(t, len(done), 2)
crontab.Stop()
}

@ -22,7 +22,7 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
) )
func (c *MsgTool) ConvertTools() { func (c *MsgTool) convertTools() {
ctx := mcontext.NewCtx("convert") ctx := mcontext.NewCtx("convert")
conversationIDs, err := c.conversationDatabase.GetAllConversationIDs(ctx) conversationIDs, err := c.conversationDatabase.GetAllConversationIDs(ctx)
if err != nil { if err != nil {

@ -78,10 +78,11 @@ type configStruct struct {
} `yaml:"mongo"` } `yaml:"mongo"`
Redis struct { Redis struct {
ClusterMode bool `yaml:"clusterMode"` ClusterMode bool `yaml:"clusterMode"`
Address []string `yaml:"address"` Address []string `yaml:"address"`
Username string `yaml:"username"` Username string `yaml:"username"`
Password string `yaml:"password"` Password string `yaml:"password"`
EnablePipeline bool `yaml:"enablePipeline"`
} `yaml:"redis"` } `yaml:"redis"`
Kafka struct { Kafka struct {
@ -231,6 +232,7 @@ type configStruct struct {
ChatRecordsClearTime string `yaml:"chatRecordsClearTime"` ChatRecordsClearTime string `yaml:"chatRecordsClearTime"`
MsgDestructTime string `yaml:"msgDestructTime"` MsgDestructTime string `yaml:"msgDestructTime"`
Secret string `yaml:"secret"` Secret string `yaml:"secret"`
EnableCronLocker bool `yaml:"enableCronLocker"`
TokenPolicy struct { TokenPolicy struct {
Expire int64 `yaml:"expire"` Expire int64 `yaml:"expire"`
} `yaml:"tokenPolicy"` } `yaml:"tokenPolicy"`

@ -28,12 +28,21 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
) )
var (
// singleton pattern.
redisClient redis.UniversalClient
)
const ( const (
maxRetry = 10 // number of retries maxRetry = 10 // number of retries
) )
// NewRedis Initialize redis connection. // NewRedis Initialize redis connection.
func NewRedis() (redis.UniversalClient, error) { func NewRedis() (redis.UniversalClient, error) {
if redisClient != nil {
return redisClient, nil
}
if len(config.Config.Redis.Address) == 0 { if len(config.Config.Redis.Address) == 0 {
return nil, errors.New("redis address is empty") return nil, errors.New("redis address is empty")
} }
@ -66,5 +75,6 @@ func NewRedis() (redis.UniversalClient, error) {
return nil, fmt.Errorf("redis ping %w", err) return nil, fmt.Errorf("redis ping %w", err)
} }
redisClient = rdb
return rdb, err return rdb, err
} }

@ -21,6 +21,7 @@ import (
"time" "time"
"github.com/dtm-labs/rockscache" "github.com/dtm-labs/rockscache"
"golang.org/x/sync/errgroup"
unrelationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation" unrelationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation"
@ -62,6 +63,8 @@ const (
uidPidToken = "UID_PID_TOKEN_STATUS:" uidPidToken = "UID_PID_TOKEN_STATUS:"
) )
var concurrentLimit = 3
type SeqCache interface { type SeqCache interface {
SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error
GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error)
@ -345,85 +348,165 @@ func (c *msgCache) allMessageCacheKey(conversationID string) string {
} }
func (c *msgCache) GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) { func (c *msgCache) GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) {
if config.Config.Redis.EnablePipeline {
return c.PipeGetMessagesBySeq(ctx, conversationID, seqs)
}
return c.ParallelGetMessagesBySeq(ctx, conversationID, seqs)
}
func (c *msgCache) PipeGetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) {
pipe := c.rdb.Pipeline()
results := []*redis.StringCmd{}
for _, seq := range seqs { for _, seq := range seqs {
res, err := c.rdb.Get(ctx, c.getMessageCacheKey(conversationID, seq)).Result() results = append(results, pipe.Get(ctx, c.getMessageCacheKey(conversationID, seq)))
if err != nil { }
log.ZError(ctx, "GetMessagesBySeq failed", err, "conversationID", conversationID, "seq", seq)
_, err = pipe.Exec(ctx)
if err != nil && err != redis.Nil {
return seqMsgs, failedSeqs, errs.Wrap(err, "pipe.get")
}
for idx, res := range results {
seq := seqs[idx]
if res.Err() != nil {
log.ZError(ctx, "GetMessagesBySeq failed", err, "conversationID", conversationID, "seq", seq, "err", res.Err())
failedSeqs = append(failedSeqs, seq) failedSeqs = append(failedSeqs, seq)
continue continue
} }
msg := sdkws.MsgData{} msg := sdkws.MsgData{}
if err = msgprocessor.String2Pb(res, &msg); err != nil { if err = msgprocessor.String2Pb(res.Val(), &msg); err != nil {
log.ZError(ctx, "GetMessagesBySeq Unmarshal failed", err, "res", res, "conversationID", conversationID, "seq", seq) log.ZError(ctx, "GetMessagesBySeq Unmarshal failed", err, "res", res, "conversationID", conversationID, "seq", seq)
failedSeqs = append(failedSeqs, seq) failedSeqs = append(failedSeqs, seq)
continue continue
} }
if msg.Status == constant.MsgDeleted { if msg.Status == constant.MsgDeleted {
failedSeqs = append(failedSeqs, seq) failedSeqs = append(failedSeqs, seq)
continue continue
} }
seqMsgs = append(seqMsgs, &msg) seqMsgs = append(seqMsgs, &msg)
} }
return return
//pipe := c.rdb.Pipeline() }
//for _, v := range seqs {
// // MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1 func (c *msgCache) ParallelGetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) {
// key := c.getMessageCacheKey(conversationID, v) type entry struct {
// if err := pipe.Get(ctx, key).Err(); err != nil && err != redis.Nil { err error
// return nil, nil, err msg *sdkws.MsgData
// } }
//}
//result, err := pipe.Exec(ctx) wg := errgroup.Group{}
//for i, v := range result { wg.SetLimit(concurrentLimit)
// cmd := v.(*redis.StringCmd)
// if cmd.Err() != nil { results := make([]entry, len(seqs)) // set slice len/cap to length of seqs.
// failedSeqs = append(failedSeqs, seqs[i]) for idx, seq := range seqs {
// } else { // closure safe var
// msg := sdkws.MsgData{} idx := idx
// err = msgprocessor.String2Pb(cmd.Val(), &msg) seq := seq
// if err == nil {
// if msg.Status != constant.MsgDeleted { wg.Go(func() error {
// seqMsgs = append(seqMsgs, &msg) res, err := c.rdb.Get(ctx, c.getMessageCacheKey(conversationID, seq)).Result()
// continue if err != nil {
// } log.ZError(ctx, "GetMessagesBySeq failed", err, "conversationID", conversationID, "seq", seq)
// } else { results[idx] = entry{err: err}
// log.ZWarn(ctx, "UnmarshalString failed", err, "conversationID", conversationID, "seq", seqs[i], "msg", cmd.Val()) return nil
// } }
// failedSeqs = append(failedSeqs, seqs[i])
// } msg := sdkws.MsgData{}
//} if err = msgprocessor.String2Pb(res, &msg); err != nil {
//return seqMsgs, failedSeqs, err log.ZError(ctx, "GetMessagesBySeq Unmarshal failed", err, "res", res, "conversationID", conversationID, "seq", seq)
results[idx] = entry{err: err}
return nil
}
if msg.Status == constant.MsgDeleted {
results[idx] = entry{err: err}
return nil
}
results[idx] = entry{msg: &msg}
return nil
})
}
_ = wg.Wait()
for idx, res := range results {
if res.err != nil {
failedSeqs = append(failedSeqs, seqs[idx])
continue
}
seqMsgs = append(seqMsgs, res.msg)
}
return
} }
func (c *msgCache) SetMessageToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) { func (c *msgCache) SetMessageToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) {
if config.Config.Redis.EnablePipeline {
return c.PipeSetMessageToCache(ctx, conversationID, msgs)
}
return c.ParallelSetMessageToCache(ctx, conversationID, msgs)
}
func (c *msgCache) PipeSetMessageToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) {
pipe := c.rdb.Pipeline()
for _, msg := range msgs { for _, msg := range msgs {
s, err := msgprocessor.Pb2String(msg) s, err := msgprocessor.Pb2String(msg)
if err != nil { if err != nil {
return 0, errs.Wrap(err) return 0, errs.Wrap(err, "pb.marshal")
} }
key := c.getMessageCacheKey(conversationID, msg.Seq) key := c.getMessageCacheKey(conversationID, msg.Seq)
if err := c.rdb.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil { _ = pipe.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second)
}
results, err := pipe.Exec(ctx)
if err != nil {
return 0, errs.Wrap(err, "pipe.set")
}
for _, res := range results {
if res.Err() != nil {
return 0, errs.Wrap(err) return 0, errs.Wrap(err)
} }
} }
return len(msgs), nil
}
func (c *msgCache) ParallelSetMessageToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) {
wg := errgroup.Group{}
wg.SetLimit(concurrentLimit)
for _, msg := range msgs {
msg := msg // closure safe var
wg.Go(func() error {
s, err := msgprocessor.Pb2String(msg)
if err != nil {
return errs.Wrap(err)
}
key := c.getMessageCacheKey(conversationID, msg.Seq)
if err := c.rdb.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil {
return errs.Wrap(err)
}
return nil
})
}
err := wg.Wait()
if err != nil {
return 0, err
}
return len(msgs), nil return len(msgs), nil
//pipe := c.rdb.Pipeline()
//var failedMsgs []*sdkws.MsgData
//for _, msg := range msgs {
// key := c.getMessageCacheKey(conversationID, msg.Seq)
// s, err := msgprocessor.Pb2String(msg)
// if err != nil {
// return 0, errs.Wrap(err)
// }
// err = pipe.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err()
// if err != nil {
// failedMsgs = append(failedMsgs, msg)
// log.ZWarn(ctx, "set msg 2 cache failed", err, "msg", failedMsgs)
// }
//}
//_, err := pipe.Exec(ctx)
//return len(failedMsgs), err
} }
func (c *msgCache) getMessageDelUserListKey(conversationID string, seq int64) string { func (c *msgCache) getMessageDelUserListKey(conversationID string, seq int64) string {

@ -0,0 +1,251 @@
package cache
import (
"context"
"fmt"
"math/rand"
"testing"
"github.com/OpenIMSDK/protocol/sdkws"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert"
)
func TestParallelSetMessageToCache(t *testing.T) {
var (
cid = fmt.Sprintf("cid-%v", rand.Int63())
seqFirst = rand.Int63()
msgs = []*sdkws.MsgData{}
)
for i := 0; i < 100; i++ {
msgs = append(msgs, &sdkws.MsgData{
Seq: seqFirst + int64(i),
})
}
testParallelSetMessageToCache(t, cid, msgs)
}
func testParallelSetMessageToCache(t *testing.T, cid string, msgs []*sdkws.MsgData) {
rdb := redis.NewClient(&redis.Options{})
defer rdb.Close()
cacher := msgCache{rdb: rdb}
ret, err := cacher.ParallelSetMessageToCache(context.Background(), cid, msgs)
assert.Nil(t, err)
assert.Equal(t, len(msgs), ret)
// validate
for _, msg := range msgs {
key := cacher.getMessageCacheKey(cid, msg.Seq)
val, err := rdb.Exists(context.Background(), key).Result()
assert.Nil(t, err)
assert.EqualValues(t, 1, val)
}
}
func TestPipeSetMessageToCache(t *testing.T) {
var (
cid = fmt.Sprintf("cid-%v", rand.Int63())
seqFirst = rand.Int63()
msgs = []*sdkws.MsgData{}
)
for i := 0; i < 100; i++ {
msgs = append(msgs, &sdkws.MsgData{
Seq: seqFirst + int64(i),
})
}
testPipeSetMessageToCache(t, cid, msgs)
}
func testPipeSetMessageToCache(t *testing.T, cid string, msgs []*sdkws.MsgData) {
rdb := redis.NewClient(&redis.Options{})
defer rdb.Close()
cacher := msgCache{rdb: rdb}
ret, err := cacher.PipeSetMessageToCache(context.Background(), cid, msgs)
assert.Nil(t, err)
assert.Equal(t, len(msgs), ret)
// validate
for _, msg := range msgs {
key := cacher.getMessageCacheKey(cid, msg.Seq)
val, err := rdb.Exists(context.Background(), key).Result()
assert.Nil(t, err)
assert.EqualValues(t, 1, val)
}
}
func TestGetMessagesBySeq(t *testing.T) {
var (
cid = fmt.Sprintf("cid-%v", rand.Int63())
seqFirst = rand.Int63()
msgs = []*sdkws.MsgData{}
)
seqs := []int64{}
for i := 0; i < 100; i++ {
msgs = append(msgs, &sdkws.MsgData{
Seq: seqFirst + int64(i),
SendID: fmt.Sprintf("fake-sendid-%v", i),
})
seqs = append(seqs, seqFirst+int64(i))
}
// set data to cache
testPipeSetMessageToCache(t, cid, msgs)
// get data from cache with parallet mode
testParallelGetMessagesBySeq(t, cid, seqs, msgs)
// get data from cache with pipeline mode
testPipeGetMessagesBySeq(t, cid, seqs, msgs)
}
func testParallelGetMessagesBySeq(t *testing.T, cid string, seqs []int64, inputMsgs []*sdkws.MsgData) {
rdb := redis.NewClient(&redis.Options{})
defer rdb.Close()
cacher := msgCache{rdb: rdb}
respMsgs, failedSeqs, err := cacher.ParallelGetMessagesBySeq(context.Background(), cid, seqs)
assert.Nil(t, err)
assert.Equal(t, 0, len(failedSeqs))
assert.Equal(t, len(respMsgs), len(seqs))
// validate
for idx, msg := range respMsgs {
assert.Equal(t, msg.Seq, inputMsgs[idx].Seq)
assert.Equal(t, msg.SendID, inputMsgs[idx].SendID)
}
}
func testPipeGetMessagesBySeq(t *testing.T, cid string, seqs []int64, inputMsgs []*sdkws.MsgData) {
rdb := redis.NewClient(&redis.Options{})
defer rdb.Close()
cacher := msgCache{rdb: rdb}
respMsgs, failedSeqs, err := cacher.PipeGetMessagesBySeq(context.Background(), cid, seqs)
assert.Nil(t, err)
assert.Equal(t, 0, len(failedSeqs))
assert.Equal(t, len(respMsgs), len(seqs))
// validate
for idx, msg := range respMsgs {
assert.Equal(t, msg.Seq, inputMsgs[idx].Seq)
assert.Equal(t, msg.SendID, inputMsgs[idx].SendID)
}
}
func TestGetMessagesBySeqWithEmptySeqs(t *testing.T) {
var (
cid = fmt.Sprintf("cid-%v", rand.Int63())
seqFirst int64 = 0
msgs = []*sdkws.MsgData{}
)
seqs := []int64{}
for i := 0; i < 100; i++ {
msgs = append(msgs, &sdkws.MsgData{
Seq: seqFirst + int64(i),
SendID: fmt.Sprintf("fake-sendid-%v", i),
})
seqs = append(seqs, seqFirst+int64(i))
}
// don't set cache, only get data from cache.
// get data from cache with parallet mode
testParallelGetMessagesBySeqWithEmptry(t, cid, seqs, msgs)
// get data from cache with pipeline mode
testPipeGetMessagesBySeqWithEmptry(t, cid, seqs, msgs)
}
func testParallelGetMessagesBySeqWithEmptry(t *testing.T, cid string, seqs []int64, inputMsgs []*sdkws.MsgData) {
rdb := redis.NewClient(&redis.Options{})
defer rdb.Close()
cacher := msgCache{rdb: rdb}
respMsgs, failedSeqs, err := cacher.ParallelGetMessagesBySeq(context.Background(), cid, seqs)
assert.Nil(t, err)
assert.Equal(t, len(seqs), len(failedSeqs))
assert.Equal(t, 0, len(respMsgs))
}
func testPipeGetMessagesBySeqWithEmptry(t *testing.T, cid string, seqs []int64, inputMsgs []*sdkws.MsgData) {
rdb := redis.NewClient(&redis.Options{})
defer rdb.Close()
cacher := msgCache{rdb: rdb}
respMsgs, failedSeqs, err := cacher.PipeGetMessagesBySeq(context.Background(), cid, seqs)
assert.Equal(t, err, redis.Nil)
assert.Equal(t, len(seqs), len(failedSeqs))
assert.Equal(t, 0, len(respMsgs))
}
func TestGetMessagesBySeqWithLostHalfSeqs(t *testing.T) {
var (
cid = fmt.Sprintf("cid-%v", rand.Int63())
seqFirst int64 = 0
msgs = []*sdkws.MsgData{}
)
seqs := []int64{}
for i := 0; i < 100; i++ {
msgs = append(msgs, &sdkws.MsgData{
Seq: seqFirst + int64(i),
SendID: fmt.Sprintf("fake-sendid-%v", i),
})
seqs = append(seqs, seqFirst+int64(i))
}
// Only set half the number of messages.
testParallelSetMessageToCache(t, cid, msgs[:50])
// get data from cache with parallet mode
testParallelGetMessagesBySeqWithLostHalfSeqs(t, cid, seqs, msgs)
// get data from cache with pipeline mode
testPipeGetMessagesBySeqWithLostHalfSeqs(t, cid, seqs, msgs)
}
func testParallelGetMessagesBySeqWithLostHalfSeqs(t *testing.T, cid string, seqs []int64, inputMsgs []*sdkws.MsgData) {
rdb := redis.NewClient(&redis.Options{})
defer rdb.Close()
cacher := msgCache{rdb: rdb}
respMsgs, failedSeqs, err := cacher.ParallelGetMessagesBySeq(context.Background(), cid, seqs)
assert.Nil(t, err)
assert.Equal(t, len(seqs)/2, len(failedSeqs))
assert.Equal(t, len(seqs)/2, len(respMsgs))
for idx, msg := range respMsgs {
assert.Equal(t, msg.Seq, seqs[idx])
}
}
func testPipeGetMessagesBySeqWithLostHalfSeqs(t *testing.T, cid string, seqs []int64, inputMsgs []*sdkws.MsgData) {
rdb := redis.NewClient(&redis.Options{})
defer rdb.Close()
cacher := msgCache{rdb: rdb}
respMsgs, failedSeqs, err := cacher.PipeGetMessagesBySeq(context.Background(), cid, seqs)
assert.Nil(t, err)
assert.Equal(t, len(seqs)/2, len(failedSeqs))
assert.Equal(t, len(seqs)/2, len(respMsgs))
for idx, msg := range respMsgs {
assert.Equal(t, msg.Seq, seqs[idx])
}
}

@ -16,9 +16,14 @@ package controller
import ( import (
"context" "context"
"encoding/json"
"errors" "errors"
"time" "time"
"github.com/OpenIMSDK/protocol/constant"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
"github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/errs"
@ -32,7 +37,6 @@ import (
unrelationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation" unrelationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation" "github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation"
"github.com/openimsdk/open-im-server/v3/pkg/common/kafka" "github.com/openimsdk/open-im-server/v3/pkg/common/kafka"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
pbmsg "github.com/OpenIMSDK/protocol/msg" pbmsg "github.com/OpenIMSDK/protocol/msg"
"github.com/OpenIMSDK/protocol/sdkws" "github.com/OpenIMSDK/protocol/sdkws"
@ -398,7 +402,7 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa
func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) { func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) {
for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, seqs) { for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, seqs) {
// log.ZDebug(ctx, "getMsgBySeqs", "docID", docID, "seqs", seqs) // log.ZDebug(ctx, "getMsgBySeqs", "docID", docID, "seqs", seqs)
msgs, err := db.findMsgInfoBySeq(ctx, userID, docID, seqs) msgs, err := db.findMsgInfoBySeq(ctx, userID, docID, conversationID, seqs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -409,12 +413,70 @@ func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversat
return totalMsgs, nil return totalMsgs, nil
} }
func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, userID, docID string, seqs []int64) (totalMsgs []*unrelationtb.MsgInfoModel, err error) { func (db *commonMsgDatabase) handlerDBMsg(ctx context.Context, cache map[int64][]*unrelationtb.MsgInfoModel, userID, conversationID string, msg *unrelationtb.MsgInfoModel) {
if msg.IsRead {
msg.Msg.IsRead = true
}
if msg.Msg.ContentType != constant.Quote {
return
}
if msg.Msg.Content == "" {
return
}
var quoteMsg struct {
Text string `json:"text,omitempty"`
QuoteMessage *sdkws.MsgData `json:"quoteMessage,omitempty"`
MessageEntityList json.RawMessage `json:"messageEntityList,omitempty"`
}
if err := json.Unmarshal([]byte(msg.Msg.Content), &quoteMsg); err != nil {
log.ZError(ctx, "json.Unmarshal", err)
return
}
if quoteMsg.QuoteMessage == nil || quoteMsg.QuoteMessage.ContentType == constant.MsgRevokeNotification {
return
}
var msgs []*unrelationtb.MsgInfoModel
if v, ok := cache[quoteMsg.QuoteMessage.Seq]; ok {
msgs = v
} else {
if quoteMsg.QuoteMessage.Seq > 0 {
ms, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, userID, db.msg.GetDocID(conversationID, quoteMsg.QuoteMessage.Seq), []int64{quoteMsg.QuoteMessage.Seq})
if err != nil {
log.ZError(ctx, "GetMsgBySeqIndexIn1Doc", err, "conversationID", conversationID, "seq", quoteMsg.QuoteMessage.Seq)
return
}
msgs = ms
cache[quoteMsg.QuoteMessage.Seq] = ms
}
}
if len(msgs) != 0 && msgs[0].Msg.ContentType != constant.MsgRevokeNotification {
return
}
quoteMsg.QuoteMessage.ContentType = constant.MsgRevokeNotification
if len(msgs) > 0 {
quoteMsg.QuoteMessage.Content = []byte(msgs[0].Msg.Content)
} else {
quoteMsg.QuoteMessage.Content = []byte("{}")
}
data, err := json.Marshal(&quoteMsg)
if err != nil {
log.ZError(ctx, "json.Marshal", err)
return
}
msg.Msg.Content = string(data)
if _, err := db.msgDocDatabase.UpdateMsg(ctx, db.msg.GetDocID(conversationID, msg.Msg.Seq), db.msg.GetMsgIndex(msg.Msg.Seq), "msg", msg.Msg); err != nil {
log.ZError(ctx, "UpdateMsgContent", err)
}
}
func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, userID, docID string, conversationID string, seqs []int64) (totalMsgs []*unrelationtb.MsgInfoModel, err error) {
msgs, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, userID, docID, seqs) msgs, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, userID, docID, seqs)
if err != nil {
return nil, err
}
tempCache := make(map[int64][]*unrelationtb.MsgInfoModel)
for _, msg := range msgs { for _, msg := range msgs {
if msg.IsRead { db.handlerDBMsg(ctx, tempCache, userID, conversationID, msg)
msg.Msg.IsRead = true
}
} }
return msgs, err return msgs, err
} }
@ -423,7 +485,7 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, userID strin
log.ZDebug(ctx, "getMsgBySeqsRange", "conversationID", conversationID, "allSeqs", allSeqs, "begin", begin, "end", end) log.ZDebug(ctx, "getMsgBySeqsRange", "conversationID", conversationID, "allSeqs", allSeqs, "begin", begin, "end", end)
for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, allSeqs) { for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, allSeqs) {
log.ZDebug(ctx, "getMsgBySeqsRange", "docID", docID, "seqs", seqs) log.ZDebug(ctx, "getMsgBySeqsRange", "docID", docID, "seqs", seqs)
msgs, err := db.findMsgInfoBySeq(ctx, userID, docID, seqs) msgs, err := db.findMsgInfoBySeq(ctx, userID, docID, conversationID, seqs)
if err != nil { if err != nil {
return nil, err return nil, err
} }

@ -144,9 +144,9 @@ func Test_BatchInsertChat2DB(t *testing.T) {
} }
func GetDB() *commonMsgDatabase { func GetDB() *commonMsgDatabase {
config.Config.Mongo.Address = []string{"192.168.44.128:37017"} config.Config.Mongo.Address = []string{"203.56.175.233:37017"}
// config.Config.Mongo.Timeout = 60 // config.Config.Mongo.Timeout = 60
config.Config.Mongo.Database = "openIM" config.Config.Mongo.Database = "openIM_v3"
// config.Config.Mongo.Source = "admin" // config.Config.Mongo.Source = "admin"
config.Config.Mongo.Username = "root" config.Config.Mongo.Username = "root"
config.Config.Mongo.Password = "openIM123" config.Config.Mongo.Password = "openIM123"
@ -232,37 +232,17 @@ func Test_FindBySeq(t *testing.T) {
// } // }
//} //}
//func Test_Delete1(t *testing.T) { func TestName(t *testing.T) {
// config.Config.Mongo.DBAddress = []string{"192.168.44.128:37017"} db := GetDB()
// config.Config.Mongo.DBTimeout = 60 var seqs []int64
// config.Config.Mongo.DBDatabase = "openIM" for i := int64(1); i <= 4; i++ {
// config.Config.Mongo.DBSource = "admin" seqs = append(seqs, i)
// config.Config.Mongo.DBUserName = "root" }
// config.Config.Mongo.DBPassword = "openIM123" msgs, err := db.getMsgBySeqsRange(context.Background(), "4931176757", "si_3866692501_4931176757", seqs, seqs[0], seqs[len(seqs)-1])
// config.Config.Mongo.DBMaxPoolSize = 100 if err != nil {
// config.Config.Mongo.DBRetainChatRecords = 3650 t.Fatal(err)
// config.Config.Mongo.ChatRecordsClearTime = "0 2 * * 3" }
//
// mongo, err := unrelation.NewMongo() t.Log(msgs)
// if err != nil {
// panic(err) }
// }
// err = mongo.GetDatabase().Client().Ping(context.Background(), nil)
// if err != nil {
// panic(err)
// }
//
// c := mongo.GetClient().Database("openIM").Collection("msg")
//
// var o unrelationtb.MsgDocModel
//
// err = c.FindOne(context.Background(), bson.M{"doc_id": "test:0"}).Decode(&o)
// if err != nil {
// panic(err)
// }
//
// for i, model := range o.Msg {
// fmt.Println(i, model == nil)
// }
//
//}

@ -175,7 +175,6 @@ func (c *Controller) CompleteUpload(ctx context.Context, uploadID string, partHa
return nil, err return nil, err
} }
if md5Sum := md5.Sum([]byte(strings.Join(partHashs, partSeparator))); hex.EncodeToString(md5Sum[:]) != upload.Hash { if md5Sum := md5.Sum([]byte(strings.Join(partHashs, partSeparator))); hex.EncodeToString(md5Sum[:]) != upload.Hash {
fmt.Println("CompleteUpload sum:", hex.EncodeToString(md5Sum[:]), "upload hash:", upload.Hash)
return nil, errors.New("md5 mismatching") return nil, errors.New("md5 mismatching")
} }
if info, err := c.StatObject(ctx, c.HashPath(upload.Hash)); err == nil { if info, err := c.StatObject(ctx, c.HashPath(upload.Hash)); err == nil {

@ -3,7 +3,7 @@ package ginprometheus
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"io/ioutil" "io"
"net/http" "net/http"
"os" "os"
"strconv" "strconv"
@ -17,40 +17,42 @@ import (
var defaultMetricPath = "/metrics" var defaultMetricPath = "/metrics"
// counter, counter_vec, gauge, gauge_vec, // counter, counter_vec, gauge, gauge_vec,
// histogram, histogram_vec, summary, summary_vec // histogram, histogram_vec, summary, summary_vec.
var reqCnt = &Metric{ var (
ID: "reqCnt", reqCounter = &Metric{
Name: "requests_total", ID: "reqCnt",
Description: "How many HTTP requests processed, partitioned by status code and HTTP method.", Name: "requests_total",
Type: "counter_vec", Description: "How many HTTP requests processed, partitioned by status code and HTTP method.",
Args: []string{"code", "method", "handler", "host", "url"}} Type: "counter_vec",
Args: []string{"code", "method", "handler", "host", "url"}}
var reqDur = &Metric{
ID: "reqDur", reqDuration = &Metric{
Name: "request_duration_seconds", ID: "reqDur",
Description: "The HTTP request latencies in seconds.", Name: "request_duration_seconds",
Type: "histogram_vec", Description: "The HTTP request latencies in seconds.",
Args: []string{"code", "method", "url"}, Type: "histogram_vec",
} Args: []string{"code", "method", "url"},
}
var resSz = &Metric{ resSize = &Metric{
ID: "resSz", ID: "resSz",
Name: "response_size_bytes", Name: "response_size_bytes",
Description: "The HTTP response sizes in bytes.", Description: "The HTTP response sizes in bytes.",
Type: "summary"} Type: "summary"}
var reqSz = &Metric{ reqSize = &Metric{
ID: "reqSz", ID: "reqSz",
Name: "request_size_bytes", Name: "request_size_bytes",
Description: "The HTTP request sizes in bytes.", Description: "The HTTP request sizes in bytes.",
Type: "summary"} Type: "summary"}
var standardMetrics = []*Metric{ standardMetrics = []*Metric{
reqCnt, reqCounter,
reqDur, reqDuration,
resSz, resSize,
reqSz, reqSize,
} }
)
/* /*
RequestCounterURLLabelMappingFn is a function which can be supplied to the middleware to control RequestCounterURLLabelMappingFn is a function which can be supplied to the middleware to control
@ -74,7 +76,7 @@ which would map "/customer/alice" and "/customer/bob" to their template "/custom
type RequestCounterURLLabelMappingFn func(c *gin.Context) string type RequestCounterURLLabelMappingFn func(c *gin.Context) string
// Metric is a definition for the name, description, type, ID, and // Metric is a definition for the name, description, type, ID, and
// prometheus.Collector type (i.e. CounterVec, Summary, etc) of each metric // prometheus.Collector type (i.e. CounterVec, Summary, etc) of each metric.
type Metric struct { type Metric struct {
MetricCollector prometheus.Collector MetricCollector prometheus.Collector
ID string ID string
@ -84,7 +86,7 @@ type Metric struct {
Args []string Args []string
} }
// Prometheus contains the metrics gathered by the instance and its path // Prometheus contains the metrics gathered by the instance and its path.
type Prometheus struct { type Prometheus struct {
reqCnt *prometheus.CounterVec reqCnt *prometheus.CounterVec
reqDur *prometheus.HistogramVec reqDur *prometheus.HistogramVec
@ -102,7 +104,7 @@ type Prometheus struct {
URLLabelFromContext string URLLabelFromContext string
} }
// PrometheusPushGateway contains the configuration for pushing to a Prometheus pushgateway (optional) // PrometheusPushGateway contains the configuration for pushing to a Prometheus pushgateway (optional).
type PrometheusPushGateway struct { type PrometheusPushGateway struct {
// Push interval in seconds // Push interval in seconds
@ -112,7 +114,7 @@ type PrometheusPushGateway struct {
// where JOBNAME can be any string of your choice // where JOBNAME can be any string of your choice
PushGatewayURL string PushGatewayURL string
// Local metrics URL where metrics are fetched from, this could be ommited in the future // Local metrics URL where metrics are fetched from, this could be omitted in the future
// if implemented using prometheus common/expfmt instead // if implemented using prometheus common/expfmt instead
MetricsURL string MetricsURL string
@ -120,9 +122,11 @@ type PrometheusPushGateway struct {
Job string Job string
} }
// NewPrometheus generates a new set of metrics with a certain subsystem name // NewPrometheus generates a new set of metrics with a certain subsystem name.
func NewPrometheus(subsystem string, customMetricsList ...[]*Metric) *Prometheus { func NewPrometheus(subsystem string, customMetricsList ...[]*Metric) *Prometheus {
subsystem = "app" if subsystem == "" {
subsystem = "app"
}
var metricsList []*Metric var metricsList []*Metric
@ -131,16 +135,13 @@ func NewPrometheus(subsystem string, customMetricsList ...[]*Metric) *Prometheus
} else if len(customMetricsList) == 1 { } else if len(customMetricsList) == 1 {
metricsList = customMetricsList[0] metricsList = customMetricsList[0]
} }
metricsList = append(metricsList, standardMetrics...)
for _, metric := range standardMetrics {
metricsList = append(metricsList, metric)
}
p := &Prometheus{ p := &Prometheus{
MetricsList: metricsList, MetricsList: metricsList,
MetricsPath: defaultMetricPath, MetricsPath: defaultMetricPath,
ReqCntURLLabelMappingFn: func(c *gin.Context) string { ReqCntURLLabelMappingFn: func(c *gin.Context) string {
return c.Request.URL.Path return c.FullPath() // e.g. /user/:id , /user/:id/info
}, },
} }
@ -150,7 +151,7 @@ func NewPrometheus(subsystem string, customMetricsList ...[]*Metric) *Prometheus
} }
// SetPushGateway sends metrics to a remote pushgateway exposed on pushGatewayURL // SetPushGateway sends metrics to a remote pushgateway exposed on pushGatewayURL
// every pushIntervalSeconds. Metrics are fetched from metricsURL // every pushIntervalSeconds. Metrics are fetched from metricsURL.
func (p *Prometheus) SetPushGateway(pushGatewayURL, metricsURL string, pushIntervalSeconds time.Duration) { func (p *Prometheus) SetPushGateway(pushGatewayURL, metricsURL string, pushIntervalSeconds time.Duration) {
p.Ppg.PushGatewayURL = pushGatewayURL p.Ppg.PushGatewayURL = pushGatewayURL
p.Ppg.MetricsURL = metricsURL p.Ppg.MetricsURL = metricsURL
@ -158,13 +159,13 @@ func (p *Prometheus) SetPushGateway(pushGatewayURL, metricsURL string, pushInter
p.startPushTicker() p.startPushTicker()
} }
// SetPushGatewayJob job name, defaults to "gin" // SetPushGatewayJob job name, defaults to "gin".
func (p *Prometheus) SetPushGatewayJob(j string) { func (p *Prometheus) SetPushGatewayJob(j string) {
p.Ppg.Job = j p.Ppg.Job = j
} }
// SetListenAddress for exposing metrics on address. If not set, it will be exposed at the // SetListenAddress for exposing metrics on address. If not set, it will be exposed at the
// same address of the gin engine that is being used // same address of the gin engine that is being used.
func (p *Prometheus) SetListenAddress(address string) { func (p *Prometheus) SetListenAddress(address string) {
p.listenAddress = address p.listenAddress = address
if p.listenAddress != "" { if p.listenAddress != "" {
@ -181,7 +182,7 @@ func (p *Prometheus) SetListenAddressWithRouter(listenAddress string, r *gin.Eng
} }
} }
// SetMetricsPath set metrics paths // SetMetricsPath set metrics paths.
func (p *Prometheus) SetMetricsPath(e *gin.Engine) { func (p *Prometheus) SetMetricsPath(e *gin.Engine) {
if p.listenAddress != "" { if p.listenAddress != "" {
@ -192,7 +193,7 @@ func (p *Prometheus) SetMetricsPath(e *gin.Engine) {
} }
} }
// SetMetricsPathWithAuth set metrics paths with authentication // SetMetricsPathWithAuth set metrics paths with authentication.
func (p *Prometheus) SetMetricsPathWithAuth(e *gin.Engine, accounts gin.Accounts) { func (p *Prometheus) SetMetricsPathWithAuth(e *gin.Engine, accounts gin.Accounts) {
if p.listenAddress != "" { if p.listenAddress != "" {
@ -205,34 +206,43 @@ func (p *Prometheus) SetMetricsPathWithAuth(e *gin.Engine, accounts gin.Accounts
} }
func (p *Prometheus) runServer() { func (p *Prometheus) runServer() {
if p.listenAddress != "" { go p.router.Run(p.listenAddress)
go p.router.Run(p.listenAddress)
}
} }
func (p *Prometheus) getMetrics() []byte { func (p *Prometheus) getMetrics() []byte {
response, _ := http.Get(p.Ppg.MetricsURL) response, err := http.Get(p.Ppg.MetricsURL)
if err != nil {
return nil
}
defer response.Body.Close() defer response.Body.Close()
body, _ := ioutil.ReadAll(response.Body)
body, _ := io.ReadAll(response.Body)
return body return body
} }
var hostname, _ = os.Hostname()
func (p *Prometheus) getPushGatewayURL() string { func (p *Prometheus) getPushGatewayURL() string {
h, _ := os.Hostname()
if p.Ppg.Job == "" { if p.Ppg.Job == "" {
p.Ppg.Job = "gin" p.Ppg.Job = "gin"
} }
return p.Ppg.PushGatewayURL + "/metrics/job/" + p.Ppg.Job + "/instance/" + h return p.Ppg.PushGatewayURL + "/metrics/job/" + p.Ppg.Job + "/instance/" + hostname
} }
func (p *Prometheus) sendMetricsToPushGateway(metrics []byte) { func (p *Prometheus) sendMetricsToPushGateway(metrics []byte) {
req, err := http.NewRequest("POST", p.getPushGatewayURL(), bytes.NewBuffer(metrics)) req, err := http.NewRequest("POST", p.getPushGatewayURL(), bytes.NewBuffer(metrics))
if err != nil {
return
}
client := &http.Client{} client := &http.Client{}
if _, err = client.Do(req); err != nil { resp, err := client.Do(req)
if err != nil {
fmt.Println("Error sending to push gateway error:", err.Error()) fmt.Println("Error sending to push gateway error:", err.Error())
} }
resp.Body.Close()
} }
func (p *Prometheus) startPushTicker() { func (p *Prometheus) startPushTicker() {
@ -244,7 +254,7 @@ func (p *Prometheus) startPushTicker() {
}() }()
} }
// NewMetric associates prometheus.Collector based on Metric.Type // NewMetric associates prometheus.Collector based on Metric.Type.
func NewMetric(m *Metric, subsystem string) prometheus.Collector { func NewMetric(m *Metric, subsystem string) prometheus.Collector {
var metric prometheus.Collector var metric prometheus.Collector
switch m.Type { switch m.Type {
@ -321,20 +331,20 @@ func NewMetric(m *Metric, subsystem string) prometheus.Collector {
} }
func (p *Prometheus) registerMetrics(subsystem string) { func (p *Prometheus) registerMetrics(subsystem string) {
for _, metricDef := range p.MetricsList { for _, metricDef := range p.MetricsList {
metric := NewMetric(metricDef, subsystem) metric := NewMetric(metricDef, subsystem)
if err := prometheus.Register(metric); err != nil { if err := prometheus.Register(metric); err != nil {
fmt.Println("could not be registered in Prometheus,metricDef.Name:", metricDef.Name, " error:", err.Error()) fmt.Println("could not be registered in Prometheus,metricDef.Name:", metricDef.Name, " error:", err.Error())
} }
switch metricDef { switch metricDef {
case reqCnt: case reqCounter:
p.reqCnt = metric.(*prometheus.CounterVec) p.reqCnt = metric.(*prometheus.CounterVec)
case reqDur: case reqDuration:
p.reqDur = metric.(*prometheus.HistogramVec) p.reqDur = metric.(*prometheus.HistogramVec)
case resSz: case resSize:
p.resSz = metric.(prometheus.Summary) p.resSz = metric.(prometheus.Summary)
case reqSz: case reqSize:
p.reqSz = metric.(prometheus.Summary) p.reqSz = metric.(prometheus.Summary)
} }
metricDef.MetricCollector = metric metricDef.MetricCollector = metric
@ -353,7 +363,7 @@ func (p *Prometheus) UseWithAuth(e *gin.Engine, accounts gin.Accounts) {
p.SetMetricsPathWithAuth(e, accounts) p.SetMetricsPathWithAuth(e, accounts)
} }
// HandlerFunc defines handler function for middleware // HandlerFunc defines handler function for middleware.
func (p *Prometheus) HandlerFunc() gin.HandlerFunc { func (p *Prometheus) HandlerFunc() gin.HandlerFunc {
return func(c *gin.Context) { return func(c *gin.Context) {
if c.Request.URL.Path == p.MetricsPath { if c.Request.URL.Path == p.MetricsPath {
@ -393,7 +403,7 @@ func prometheusHandler() gin.HandlerFunc {
} }
func computeApproximateRequestSize(r *http.Request) int { func computeApproximateRequestSize(r *http.Request) int {
s := 0 var s int
if r.URL != nil { if r.URL != nil {
s = len(r.URL.Path) s = len(r.URL.Path)
} }

@ -118,8 +118,9 @@ func GetNotificationConversationIDByConversationID(conversationID string) string
l := strings.Split(conversationID, "_") l := strings.Split(conversationID, "_")
if len(l) > 1 { if len(l) > 1 {
l[0] = "n" l[0] = "n"
return conversationID return strings.Join(l, "_")
} }
return "" return ""
} }

@ -39,7 +39,7 @@ scripts/
├── demo.sh # Demonstration or example script. ├── demo.sh # Demonstration or example script.
├── docker-check-service.sh # Docker script to check services' status. ├── docker-check-service.sh # Docker script to check services' status.
├── docker-start-all.sh # Docker script to start all containers/services. ├── docker-start-all.sh # Docker script to start all containers/services.
├── ensure_tag.sh # Ensure correct tags or labeling. ├── ensure-tag.sh # Ensure correct tags or labeling.
├── env_check.sh # Environment verification and checking. ├── env_check.sh # Environment verification and checking.
├── gen-swagger-docs.sh # Script to generate Swagger documentation. ├── gen-swagger-docs.sh # Script to generate Swagger documentation.
├── genconfig.sh # Generate configuration files. ├── genconfig.sh # Generate configuration files.

@ -1,76 +0,0 @@
#!/usr/bin/env bash
# Copyright © 2023 OpenIM. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#Include shell font styles and some basic information
SCRIPTS_ROOT=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
OPENIM_ROOT=$(dirname "${SCRIPTS_ROOT}")/..
#Include shell font styles and some basic information
source $SCRIPTS_ROOT/path_info.sh
source $SCRIPTS_ROOT/lib/init.sh
cd $SCRIPTS_ROOT
echo -e "check time synchronize.................................."
t=`curl http://time.akamai.com/?iso -s`
t1=`date -d $t +%s`
t2=`date +%s`
let between=t2-t1
if [[ $between -gt 10 ]] || [[ $between -lt -10 ]]; then
echo -e ${RED_PREFIX}"Warning: The difference between the iso time and the server's time is too large: "$between"s" ${COLOR_SUFFIX}
else
echo -e ${GREEN_PREFIX} "ok: Server time is synchronized " ${COLOR_SUFFIX}
fi
echo -e "check login user........................................"
user=`whoami`
if [ $user == "root" ] ; then
echo -e ${GREEN_PREFIX} "ok: login user is root" ${COLOR_SUFFIX}
else
echo -e ${RED_PREFIX}"Warning: The current user is not root "${COLOR_SUFFIX}
fi
echo -e "check docker............................................"
docker_running=`systemctl status docker | grep running | grep active | wc -l`
docker_version=`docker-compose -v; docker -v`
if [ $docker_running -gt 0 ]; then
echo -e ${GREEN_PREFIX} "ok: docker is running" ${COLOR_SUFFIX}
echo -e ${GREEN_PREFIX} $docker_version ${COLOR_SUFFIX}
else
echo -e ${RED_PREFIX}"docker not running"${COLOR_SUFFIX}
fi
echo -e "check environment......................................."
SYSTEM=`uname -s`
if [ $SYSTEM != "Linux" ] ; then
echo -e ${RED_PREFIX}"Warning: Currently only Linux is supported"${COLOR_SUFFIX}
else
echo -e ${GREEN_PREFIX} "ok: system is linux"${COLOR_SUFFIX}
fi
echo -e "check memory............................................"
available=`free -m | grep Mem | awk '{print $NF}'`
if [ $available -lt 2000 ] ; then
echo -e ${RED_PREFIX}"Warning: Your memory not enough, available is: " "$available"m${COLOR_SUFFIX}"\c"
echo -e ${RED_PREFIX}", must be greater than 2000m"${COLOR_SUFFIX}
else
echo -e ${GREEN_PREFIX} "ok: available memory is: "$available"m${COLOR_SUFFIX}"
fi

@ -38,9 +38,8 @@ IAM_ROOT=$(dirname "${BASH_SOURCE[0]}")/../..
[[ -z ${COMMON_SOURCED} ]] && source ${IAM_ROOT}/scripts/install/common.sh [[ -z ${COMMON_SOURCED} ]] && source ${IAM_ROOT}/scripts/install/common.sh
# API Server API Address:Port # API Server API Address:Port
INSECURE_OPENIMAPI=${IAM_APISERVER_HOST}:${API_OPENIM_PORT} INSECURE_OPENIMAPI="http://${OPENIM_API_HOST}:${API_OPENIM_PORT}"
INSECURE_OPENIMAUTO=${OPENIM_RPC_AUTH_HOST}:${OPENIM_AUTH_PORT} INSECURE_OPENIMAUTO=${OPENIM_RPC_AUTH_HOST}:${OPENIM_AUTH_PORT}
CCURL="curl -f -s -XPOST" # Create CCURL="curl -f -s -XPOST" # Create
UCURL="curl -f -s -XPUT" # Update UCURL="curl -f -s -XPUT" # Update
RCURL="curl -f -s -XGET" # Retrieve RCURL="curl -f -s -XGET" # Retrieve
@ -73,7 +72,7 @@ function openim::test::auth() {
# Define a function to get a token (Admin Token) # Define a function to get a token (Admin Token)
openim::test::get_token() { openim::test::get_token() {
token_response=$(${CCURL} "${OperationID}" "${Header}" http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/auth/user_token \ token_response=$(${CCURL} "${OperationID}" "${Header}" ${INSECURE_OPENIMAPI}/auth/user_token \
-d'{"secret": "'"$SECRET"'","platformID": 1,"userID": "openIM123456"}') -d'{"secret": "'"$SECRET"'","platformID": 1,"userID": "openIM123456"}')
token=$(echo $token_response | grep -Po 'token[" :]+\K[^"]+') token=$(echo $token_response | grep -Po 'token[" :]+\K[^"]+')
echo "$token" echo "$token"
@ -94,7 +93,7 @@ EOF
) )
echo "Requesting force logout for user: $request_body" echo "Requesting force logout for user: $request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/auth/force_logout" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/auth/force_logout" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }
@ -127,7 +126,7 @@ EOF
echo "Request body for user registration: $request_body" echo "Request body for user registration: $request_body"
# Send the registration request # Send the registration request
local user_register_response=$(${CCURL} "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/user/user_register" -d "${request_body}") local user_register_response=$(${CCURL} "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/user/user_register" -d "${request_body}")
# Check for errors in the response # Check for errors in the response
openim::test::check_error "$user_register_response" openim::test::check_error "$user_register_response"
@ -148,7 +147,7 @@ EOF
) )
echo "$request_body" echo "$request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/user/account_check" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/user/account_check" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }
@ -166,7 +165,7 @@ EOF
) )
echo "$request_body" echo "$request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/user/get_users" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/user/get_users" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }
@ -184,7 +183,7 @@ EOF
) )
echo "$request_body" echo "$request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/user/get_users_info" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/user/get_users_info" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }
@ -204,7 +203,7 @@ EOF
) )
echo "$request_body" echo "$request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/user/get_users_online_status" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/user/get_users_online_status" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }
@ -223,7 +222,7 @@ EOF
) )
echo "$request_body" echo "$request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/user/update_user_info" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/user/update_user_info" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }
@ -238,7 +237,7 @@ EOF
) )
echo "$request_body" echo "$request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/user/get_subscribe_users_status" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/user/get_subscribe_users_status" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }
@ -259,7 +258,7 @@ EOF
) )
echo "$request_body" echo "$request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/user/subscribe_users_status" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/user/subscribe_users_status" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }
@ -275,7 +274,7 @@ EOF
) )
echo "$request_body" echo "$request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/user/set_global_msg_recv_opt" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/user/set_global_msg_recv_opt" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }
@ -330,7 +329,7 @@ EOF
) )
echo "$request_body" echo "$request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/friend/is_friend" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/friend/is_friend" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }
@ -346,7 +345,7 @@ EOF
) )
echo "$request_body" echo "$request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/friend/delete_friend" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/friend/delete_friend" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }
@ -365,7 +364,7 @@ EOF
) )
echo "$request_body" echo "$request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/friend/get_friend_apply_list" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/friend/get_friend_apply_list" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }
@ -384,7 +383,7 @@ EOF
) )
echo "$request_body" echo "$request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/friend/get_friend_list" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/friend/get_friend_list" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }
@ -401,7 +400,7 @@ EOF
) )
echo "$request_body" echo "$request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/friend/set_friend_remark" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/friend/set_friend_remark" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }
@ -419,7 +418,7 @@ EOF
) )
echo "$request_body" echo "$request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/friend/add_friend" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/friend/add_friend" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }
@ -437,7 +436,7 @@ EOF
) )
echo "$request_body" echo "$request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/friend/import_friend" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/friend/import_friend" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }
@ -456,7 +455,7 @@ EOF
) )
echo "$request_body" echo "$request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/friend/add_friend_response" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/friend/add_friend_response" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }
@ -475,7 +474,7 @@ EOF
) )
echo "$request_body" echo "$request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/friend/get_self_friend_apply_list" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/friend/get_self_friend_apply_list" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }
@ -491,7 +490,7 @@ EOF
) )
echo "$request_body" echo "$request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/friend/add_black" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/friend/add_black" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }
@ -507,7 +506,7 @@ EOF
) )
echo "$request_body" echo "$request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/friend/remove_black" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/friend/remove_black" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }
@ -526,7 +525,7 @@ EOF
) )
echo "$request_body" echo "$request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/friend/get_black_list" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/friend/get_black_list" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }
@ -619,7 +618,7 @@ EOF
) )
echo "$request_body" echo "$request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/group/create_group" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/group/create_group" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }
@ -639,7 +638,7 @@ EOF
) )
echo "$request_body" echo "$request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/group/invite_user_to_group" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/group/invite_user_to_group" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }
@ -656,7 +655,7 @@ EOF
) )
echo "$request_body" echo "$request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/group/transfer_group" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/group/transfer_group" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }
@ -671,7 +670,7 @@ EOF
) )
echo "$request_body" echo "$request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/group/get_groups_info" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/group/get_groups_info" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }
@ -690,7 +689,7 @@ EOF
) )
echo "$request_body" echo "$request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/group/kick_group" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/group/kick_group" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }
@ -706,7 +705,7 @@ EOF
) )
echo "$request_body" echo "$request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/group/get_group_members_info" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/group/get_group_members_info" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }
@ -725,7 +724,7 @@ EOF
) )
echo "$request_body" echo "$request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/group/get_group_member_list" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/group/get_group_member_list" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }
@ -744,7 +743,7 @@ EOF
) )
echo "$request_body" echo "$request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/group/get_joined_group_list" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/group/get_joined_group_list" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }
@ -769,7 +768,7 @@ EOF
) )
echo "$request_body" echo "$request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/group/set_group_member_info" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/group/set_group_member_info" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }
@ -784,7 +783,7 @@ EOF
) )
echo "$request_body" echo "$request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/group/mute_group" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/group/mute_group" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }
@ -799,7 +798,7 @@ EOF
) )
echo "$request_body" echo "$request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/group/cancel_mute_group" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/group/cancel_mute_group" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }
@ -814,7 +813,7 @@ EOF
) )
echo "$request_body" echo "$request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/group/dismiss_group" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/group/dismiss_group" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }
@ -830,7 +829,7 @@ EOF
) )
echo "$request_body" echo "$request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/group/cancel_mute_group_member" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/group/cancel_mute_group_member" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }
@ -848,7 +847,7 @@ EOF
) )
echo "$request_body" echo "$request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/group/join_group" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/group/join_group" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }
@ -873,7 +872,7 @@ EOF
) )
echo "$request_body" echo "$request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/group/set_group_info" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/group/set_group_info" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }
@ -890,7 +889,7 @@ EOF
) )
echo "$request_body" echo "$request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/group/quit_group" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/group/quit_group" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }
@ -909,7 +908,7 @@ EOF
) )
echo "$request_body" echo "$request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/group/get_recv_group_applicationList" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/group/get_recv_group_applicationList" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }
@ -927,7 +926,7 @@ EOF
) )
echo "$request_body" echo "$request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/group/group_application_response" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/group/group_application_response" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }
@ -946,7 +945,7 @@ EOF
) )
echo "$request_body" echo "$request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/group/get_user_req_group_applicationList" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/group/get_user_req_group_applicationList" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }
@ -963,7 +962,7 @@ EOF
) )
echo "$request_body" echo "$request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/group/mute_group_member" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/group/mute_group_member" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }
@ -981,7 +980,7 @@ EOF
) )
echo "$request_body" echo "$request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/group/get_group_users_req_application_list" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/group/get_group_users_req_application_list" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }
@ -1078,7 +1077,7 @@ function openim::test::group() {
# Define a function to register a user # Define a function to register a user
openim::register_user() { openim::register_user() {
user_register_response=$(${CCURL} "${Header}" http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/user/user_register \ user_register_response=$(${CCURL} "${Header}" ${INSECURE_OPENIMAPI}/user/user_register \
-d'{ -d'{
"secret": "openIM123", "secret": "openIM123",
"users": [{"userID": "11111112","nickname": "yourNickname","faceURL": "yourFaceURL"}] "users": [{"userID": "11111112","nickname": "yourNickname","faceURL": "yourFaceURL"}]
@ -1090,7 +1089,7 @@ openim::register_user() {
# Define a function to check the account # Define a function to check the account
openim::test::check_account() { openim::test::check_account() {
local token=$1 local token=$1
account_check_response=$(${CCURL} "${Header}" -H"operationID: 1646445464564" -H"token: ${token}" http://localhost:${API_OPENIM_PORT}/user/account_check \ account_check_response=$(${CCURL} "${Header}" -H"operationID: 1646445464564" -H"token: ${token}" ${INSECURE_OPENIMAPI}/user/account_check \
-d'{ -d'{
"checkUserIDs": ["11111111","11111112"] "checkUserIDs": ["11111111","11111112"]
}') }')
@ -1164,7 +1163,7 @@ EOF
) )
echo "$request_body" echo "$request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/msg/send_msg" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/msg/send_msg" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }
@ -1185,7 +1184,7 @@ EOF
) )
echo "$request_body" echo "$request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/msg/revoke_msg" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/msg/revoke_msg" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }
@ -1203,7 +1202,7 @@ EOF
) )
echo "$request_body" echo "$request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/msg/user_clear_all_msg" -d "${request_body}") local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/msg/user_clear_all_msg" -d "${request_body}")
openim::test::check_error "$response" openim::test::check_error "$response"
} }

@ -34,7 +34,7 @@ release.tag: tools.verify.gsemver release.ensure-tag
## release.ensure-tag: ensure tag ## release.ensure-tag: ensure tag
.PHONY: release.ensure-tag .PHONY: release.ensure-tag
release.ensure-tag: tools.verify.gsemver release.ensure-tag: tools.verify.gsemver
@scripts/ensure_tag.sh @scripts/ensure-tag.sh
## release.help: Display help information about the release package ## release.help: Display help information about the release package
.PHONY: release.help .PHONY: release.help

@ -1,7 +1,10 @@
package user package user
import ( import (
"fmt"
gettoken "github.com/openimsdk/open-im-server/v3/test/e2e/api/token" gettoken "github.com/openimsdk/open-im-server/v3/test/e2e/api/token"
"github.com/openimsdk/open-im-server/v3/test/e2e/framework/config"
) )
// UserInfoRequest represents a request to get or update user information // UserInfoRequest represents a request to get or update user information
@ -17,14 +20,20 @@ type GetUsersOnlineStatusRequest struct {
// GetUsersInfo retrieves detailed information for a list of user IDs // GetUsersInfo retrieves detailed information for a list of user IDs
func GetUsersInfo(token string, userIDs []string) error { func GetUsersInfo(token string, userIDs []string) error {
url := fmt.Sprintf("http://%s:%s/user/get_users_info", config.LoadConfig().APIHost, config.LoadConfig().APIPort)
requestBody := UserInfoRequest{ requestBody := UserInfoRequest{
UserIDs: userIDs, UserIDs: userIDs,
} }
return sendPostRequestWithToken("http://your-api-host:port/user/get_users_info", token, requestBody) return sendPostRequestWithToken(url, token, requestBody)
} }
// UpdateUserInfo updates the information for a user // UpdateUserInfo updates the information for a user
func UpdateUserInfo(token, userID, nickname, faceURL string) error { func UpdateUserInfo(token, userID, nickname, faceURL string) error {
url := fmt.Sprintf("http://%s:%s/user/update_user_info", config.LoadConfig().APIHost, config.LoadConfig().APIPort)
requestBody := UserInfoRequest{ requestBody := UserInfoRequest{
UserInfo: &gettoken.User{ UserInfo: &gettoken.User{
UserID: userID, UserID: userID,
@ -32,13 +41,17 @@ func UpdateUserInfo(token, userID, nickname, faceURL string) error {
FaceURL: faceURL, FaceURL: faceURL,
}, },
} }
return sendPostRequestWithToken("http://your-api-host:port/user/update_user_info", token, requestBody) return sendPostRequestWithToken(url, token, requestBody)
} }
// GetUsersOnlineStatus retrieves the online status for a list of user IDs // GetUsersOnlineStatus retrieves the online status for a list of user IDs
func GetUsersOnlineStatus(token string, userIDs []string) error { func GetUsersOnlineStatus(token string, userIDs []string) error {
url := fmt.Sprintf("http://%s:%s/user/get_users_online_status", config.LoadConfig().APIHost, config.LoadConfig().APIPort)
requestBody := GetUsersOnlineStatusRequest{ requestBody := GetUsersOnlineStatusRequest{
UserIDs: userIDs, UserIDs: userIDs,
} }
return sendPostRequestWithToken("http://your-api-host:port/user/get_users_online_status", token, requestBody)
return sendPostRequestWithToken(url, token, requestBody)
} }

@ -8,6 +8,7 @@ import (
"net/http" "net/http"
gettoken "github.com/openimsdk/open-im-server/v3/test/e2e/api/token" gettoken "github.com/openimsdk/open-im-server/v3/test/e2e/api/token"
"github.com/openimsdk/open-im-server/v3/test/e2e/framework/config"
) )
// ForceLogoutRequest represents a request to force a user logout // ForceLogoutRequest represents a request to force a user logout
@ -34,30 +35,39 @@ type Pagination struct {
// ForceLogout forces a user to log out // ForceLogout forces a user to log out
func ForceLogout(token, userID string, platformID int) error { func ForceLogout(token, userID string, platformID int) error {
url := fmt.Sprintf("http://%s:%s/auth/force_logout", config.LoadConfig().APIHost, config.LoadConfig().APIPort)
requestBody := ForceLogoutRequest{ requestBody := ForceLogoutRequest{
PlatformID: platformID, PlatformID: platformID,
UserID: userID, UserID: userID,
} }
return sendPostRequestWithToken("http://your-api-host:port/auth/force_logout", token, requestBody) return sendPostRequestWithToken(url, token, requestBody)
} }
// CheckUserAccount checks if the user accounts exist // CheckUserAccount checks if the user accounts exist
func CheckUserAccount(token string, userIDs []string) error { func CheckUserAccount(token string, userIDs []string) error {
url := fmt.Sprintf("http://%s:%s/user/account_check", config.LoadConfig().APIHost, config.LoadConfig().APIPort)
requestBody := CheckUserAccountRequest{ requestBody := CheckUserAccountRequest{
CheckUserIDs: userIDs, CheckUserIDs: userIDs,
} }
return sendPostRequestWithToken("http://your-api-host:port/user/account_check", token, requestBody) return sendPostRequestWithToken(url, token, requestBody)
} }
// GetUsers retrieves a list of users with pagination // GetUsers retrieves a list of users with pagination
func GetUsers(token string, pageNumber, showNumber int) error { func GetUsers(token string, pageNumber, showNumber int) error {
url := fmt.Sprintf("http://%s:%s/user/account_check", config.LoadConfig().APIHost, config.LoadConfig().APIPort)
requestBody := GetUsersRequest{ requestBody := GetUsersRequest{
Pagination: Pagination{ Pagination: Pagination{
PageNumber: pageNumber, PageNumber: pageNumber,
ShowNumber: showNumber, ShowNumber: showNumber,
}, },
} }
return sendPostRequestWithToken("http://your-api-host:port/user/get_users", token, requestBody) return sendPostRequestWithToken(url, token, requestBody)
} }
// sendPostRequestWithToken sends a POST request with a token in the header // sendPostRequestWithToken sends a POST request with a token in the header

@ -1,6 +1,9 @@
package config package config
import "flag" import (
"flag"
"os"
)
// Flags is the flag set that AddOptions adds to. Test authors should // Flags is the flag set that AddOptions adds to. Test authors should
// also use it instead of directly adding to the global command line. // also use it instead of directly adding to the global command line.
@ -19,3 +22,49 @@ func CopyFlags(source *flag.FlagSet, target *flag.FlagSet) {
target.Var(flag.Value, flag.Name, flag.Usage) target.Var(flag.Value, flag.Name, flag.Usage)
}) })
} }
// Config defines the configuration structure for the OpenIM components.
type Config struct {
APIHost string
APIPort string
MsgGatewayHost string
MsgTransferHost string
PushHost string
RPCAuthHost string
RPCConversationHost string
RPCFriendHost string
RPCGroupHost string
RPCMsgHost string
RPCThirdHost string
RPCUserHost string
// Add other configuration fields as needed
}
// LoadConfig loads the configurations from environment variables or default values.
func LoadConfig() *Config {
return &Config{
APIHost: getEnv("OPENIM_API_HOST", "127.0.0.1"),
APIPort: getEnv("API_OPENIM_PORT", "10002"),
// TODO: Set default variable
MsgGatewayHost: getEnv("OPENIM_MSGGATEWAY_HOST", "default-msggateway-host"),
MsgTransferHost: getEnv("OPENIM_MSGTRANSFER_HOST", "default-msgtransfer-host"),
PushHost: getEnv("OPENIM_PUSH_HOST", "default-push-host"),
RPCAuthHost: getEnv("OPENIM_RPC_AUTH_HOST", "default-rpc-auth-host"),
RPCConversationHost: getEnv("OPENIM_RPC_CONVERSATION_HOST", "default-rpc-conversation-host"),
RPCFriendHost: getEnv("OPENIM_RPC_FRIEND_HOST", "default-rpc-friend-host"),
RPCGroupHost: getEnv("OPENIM_RPC_GROUP_HOST", "default-rpc-group-host"),
RPCMsgHost: getEnv("OPENIM_RPC_MSG_HOST", "default-rpc-msg-host"),
RPCThirdHost: getEnv("OPENIM_RPC_THIRD_HOST", "default-rpc-third-host"),
RPCUserHost: getEnv("OPENIM_RPC_USER_HOST", "default-rpc-user-host"),
}
}
// getEnv is a helper function to read an environment variable or return a default value.
func getEnv(key, defaultValue string) string {
value, exists := os.LookupEnv(key)
if !exists {
return defaultValue
}
return value
}

Loading…
Cancel
Save