Merge branch 'main' of https://github.com/openimsdk/open-im-server into chore/remove-cm

pull/3614/head
mo3et 4 weeks ago
commit ae54b7a9ce

@ -17,3 +17,13 @@ prometheus:
ports:
# This address can be accessed via a browser
grafanaURL:
ratelimiter:
# Whether to enable rate limiting
enable: false
# WindowSize defines time duration per window
window: 20s
# BucketNum defines bucket number for each window
bucket: 500
# CPU threshold; valid range 01000 (1000 = 100%)
cpuThreshold: 850

@ -26,3 +26,20 @@ longConnSvr:
websocketMaxMsgLen: 4096
# WebSocket connection handshake timeout in seconds
websocketTimeout: 10
ratelimiter:
# Whether to enable rate limiting
enable: false
# WindowSize defines time duration per window
window: 20s
# BucketNum defines bucket number for each window
bucket: 500
# CPU threshold; valid range 01000 (1000 = 100%)
cpuThreshold: 850
circuitBreaker:
enable: false
window: 5s # Time window size (seconds)
bucket: 100 # Number of buckets
success: 0.6 # Success rate threshold (0.6 means 60%)
request: 500 # Request threshold; circuit breaker evaluation occurs when reached

@ -6,3 +6,20 @@ prometheus:
# List of ports that Prometheus listens on; each port corresponds to an instance of monitoring. Ensure these are managed accordingly
# It will only take effect when autoSetPorts is set to false.
ports:
ratelimiter:
# Whether to enable rate limiting
enable: false
# WindowSize defines time duration per window
window: 20s
# BucketNum defines bucket number for each window
bucket: 500
# CPU threshold; valid range 01000 (1000 = 100%)
cpuThreshold: 850
circuitBreaker:
enable: false
window: 5s # Time window size (seconds)
bucket: 100 # Number of buckets
success: 0.6 # Success rate threshold (0.6 means 60%)
request: 500 # Request threshold; circuit breaker evaluation occurs when reached

@ -10,10 +10,26 @@ rpc:
# It will only take effect when autoSetPorts is set to false.
ports:
ratelimiter:
# Whether to enable rate limiting
enable: false
# WindowSize defines time duration per window
window: 20s
# BucketNum defines bucket number for each window
bucket: 500
# CPU threshold; valid range 01000 (1000 = 100%)
cpuThreshold: 850
circuitBreaker:
enable: false
window: 5s # Time window size (seconds)
bucket: 100 # Number of buckets
success: 0.6 # Success rate threshold (0.6 means 60%)
request: 500 # Request threshold; circuit breaker evaluation occurs when reached
prometheus:
# Enable or disable Prometheus monitoring
enable: true
enable: false
# List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup
# It will only take effect when autoSetPorts is set to false.
ports:

@ -20,3 +20,20 @@ prometheus:
tokenPolicy:
# Token validity period, in days
expire: 90
ratelimiter:
# Whether to enable rate limiting
enable: false
# WindowSize defines time duration per window
window: 20s
# BucketNum defines bucket number for each window
bucket: 500
# CPU threshold; valid range 01000 (1000 = 100%)
cpuThreshold: 850
circuitBreaker:
enable: false
window: 5s # Time window size (seconds)
bucket: 100 # Number of buckets
success: 0.6 # Success rate threshold (0.6 means 60%)
request: 500 # Request threshold; circuit breaker evaluation occurs when reached

@ -16,3 +16,20 @@ prometheus:
# List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup
# It will only take effect when autoSetPorts is set to false.
ports:
ratelimiter:
# Whether to enable rate limiting
enable: false
# WindowSize defines time duration per window
window: 20s
# BucketNum defines bucket number for each window
bucket: 500
# CPU threshold; valid range 01000 (1000 = 100%)
cpuThreshold: 850
circuitBreaker:
enable: false
window: 5s # Time window size (seconds)
bucket: 100 # Number of buckets
success: 0.6 # Success rate threshold (0.6 means 60%)
request: 500 # Request threshold; circuit breaker evaluation occurs when reached

@ -16,3 +16,20 @@ prometheus:
# List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup
# It will only take effect when autoSetPorts is set to false.
ports:
ratelimiter:
# Whether to enable rate limiting
enable: false
# WindowSize defines time duration per window
window: 20s
# BucketNum defines bucket number for each window
bucket: 500
# CPU threshold; valid range 01000 (1000 = 100%)
cpuThreshold: 850
circuitBreaker:
enable: false
window: 5s # Time window size (seconds)
bucket: 100 # Number of buckets
success: 0.6 # Success rate threshold (0.6 means 60%)
request: 500 # Request threshold; circuit breaker evaluation occurs when reached

@ -19,3 +19,20 @@ prometheus:
enableHistoryForNewMembers: true
ratelimiter:
# Whether to enable rate limiting
enable: false
# WindowSize defines time duration per window
window: 20s
# BucketNum defines bucket number for each window
bucket: 500
# CPU threshold; valid range 01000 (1000 = 100%)
cpuThreshold: 850
circuitBreaker:
enable: false
window: 5s # Time window size (seconds)
bucket: 100 # Number of buckets
success: 0.6 # Success rate threshold (0.6 means 60%)
request: 500 # Request threshold; circuit breaker evaluation occurs when reached

@ -20,3 +20,20 @@ prometheus:
# Does sending messages require friend verification
friendVerify: false
ratelimiter:
# Whether to enable rate limiting
enable: false
# WindowSize defines time duration per window
window: 20s
# BucketNum defines bucket number for each window
bucket: 500
# CPU threshold; valid range 01000 (1000 = 100%)
cpuThreshold: 850
circuitBreaker:
enable: false
window: 5s # Time window size (seconds)
bucket: 100 # Number of buckets
success: 0.6 # Success rate threshold (0.6 means 60%)
request: 500 # Request threshold; circuit breaker evaluation occurs when reached

@ -17,6 +17,22 @@ prometheus:
# It will only take effect when autoSetPorts is set to false.
ports:
ratelimiter:
# Whether to enable rate limiting
enable: false
# WindowSize defines time duration per window
window: 20s
# BucketNum defines bucket number for each window
bucket: 500
# CPU threshold; valid range 01000 (1000 = 100%)
cpuThreshold: 850
circuitBreaker:
enable: false
window: 5s # Time window size (seconds)
bucket: 100 # Number of buckets
success: 0.6 # Success rate threshold (0.6 means 60%)
request: 500 # Request threshold; circuit breaker evaluation occurs when reached
object:
# Use MinIO as object storage, or set to "cos", "oss", "kodo", "aws", while also configuring the corresponding settings

@ -16,3 +16,20 @@ prometheus:
# Prometheus listening ports, must be consistent with the number of rpc.ports
# It will only take effect when autoSetPorts is set to false.
ports:
ratelimiter:
# Whether to enable rate limiting
enable: false
# WindowSize defines time duration per window
window: 20s
# BucketNum defines bucket number for each window
bucket: 500
# CPU threshold; valid range 01000 (1000 = 100%)
cpuThreshold: 850
circuitBreaker:
enable: false
window: 5s # Time window size (seconds)
bucket: 100 # Number of buckets
success: 0.6 # Success rate threshold (0.6 means 60%)
request: 500 # Request threshold; circuit breaker evaluation occurs when reached

@ -12,8 +12,8 @@ require (
github.com/gorilla/websocket v1.5.1
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.73-alpha.14
github.com/openimsdk/tools v0.0.50-alpha.103
github.com/openimsdk/protocol v0.0.73-alpha.17
github.com/openimsdk/tools v0.0.50-alpha.105
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0
github.com/stretchr/testify v1.10.0
@ -135,6 +135,7 @@ require (
github.com/leodido/go-urn v1.4.0 // indirect
github.com/lestrrat-go/strftime v1.0.6 // indirect
github.com/lithammer/shortuuid v3.0.0+incompatible // indirect
github.com/lufia/plan9stats v0.0.0-20230110061619-bbe2e5e100de // indirect
github.com/magefile/mage v1.15.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
@ -151,6 +152,7 @@ require (
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.45.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
@ -160,6 +162,8 @@ require (
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/sercand/kuberesolver/v6 v6.0.1 // indirect
github.com/shirou/gopsutil/v3 v3.24.5 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.6.0 // indirect

@ -303,6 +303,8 @@ github.com/likexian/gokit v0.25.13 h1:p2Uw3+6fGG53CwdU2Dz0T6bOycdb2+bAFAa3ymwWVk
github.com/likexian/gokit v0.25.13/go.mod h1:qQhEWFBEfqLCO3/vOEo2EDKd+EycekVtUK4tex+l2H4=
github.com/lithammer/shortuuid v3.0.0+incompatible h1:NcD0xWW/MZYXEHa6ITy6kaXN5nwm/V115vj2YXfhS0w=
github.com/lithammer/shortuuid v3.0.0+incompatible/go.mod h1:FR74pbAuElzOUuenUHTK2Tciko1/vKuIKS9dSkDrA4w=
github.com/lufia/plan9stats v0.0.0-20230110061619-bbe2e5e100de h1:V53FWzU6KAZVi1tPp5UIsMoUWJ2/PNwYIDXnu7QuBCE=
github.com/lufia/plan9stats v0.0.0-20230110061619-bbe2e5e100de/go.mod h1:JKx41uQRwqlTZabZc+kILPrO/3jlKnQ2Z8b7YiVw5cE=
github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg=
github.com/magefile/mage v1.15.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A=
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
@ -347,10 +349,10 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
github.com/openimsdk/gomake v0.0.15-alpha.11 h1:PQudYDRESYeYlUYrrLLJhYIlUPO5x7FAx+o5El9U/Bw=
github.com/openimsdk/gomake v0.0.15-alpha.11/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.73-alpha.14 h1:lv9wNiPRm6G7q74TfpMobKrSfeTaBlZ+Ps3O6UFPmaE=
github.com/openimsdk/protocol v0.0.73-alpha.14/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw=
github.com/openimsdk/tools v0.0.50-alpha.103 h1:jYvI86cWiVu8a8iw1panw+pwIiStuUHF76h3fxA6ESI=
github.com/openimsdk/tools v0.0.50-alpha.103/go.mod h1:qCExFBqXpQBMzZck3XGIFwivBayAn2KNqB3WAd++IJw=
github.com/openimsdk/protocol v0.0.73-alpha.17 h1:ddo0QMns1GVwAmrPIPlAQ7uKmThAYLnOt+CIOgLsJyE=
github.com/openimsdk/protocol v0.0.73-alpha.17/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw=
github.com/openimsdk/tools v0.0.50-alpha.105 h1:axuCvKXhxY2RGLhpMMFNgBtE0B65T2Sr1JDW3UD9nBs=
github.com/openimsdk/tools v0.0.50-alpha.105/go.mod h1:x9i/e+WJFW4tocy6RNJQ9NofQiP3KJ1Y576/06TqOG4=
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
@ -361,6 +363,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b h1:0LFwY6Q3gMACTjAbMZBjXAqTOzOwFaj2Ld6cjeQ7Rig=
github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
github.com/prometheus/client_golang v1.18.0 h1:HzFfmkOzH5Q8L8G+kSJKUx5dtG87sewO+FoDDqP5Tbk=
@ -397,6 +401,12 @@ github.com/sercand/kuberesolver/v6 v6.0.1 h1:XZUTA0gy/lgDYp/UhEwv7Js24F1j8NJ833Q
github.com/sercand/kuberesolver/v6 v6.0.1/go.mod h1:C0tsTuRMONSY+Xf7pv7RMW1/JlewY1+wS8SZE+1lf1s=
github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=
github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/gopsutil/v3 v3.24.5 h1:i0t8kL+kQTvpAYToeuiVk3TgDeKOFioZO3Ztz/iZ9pI=
github.com/shirou/gopsutil/v3 v3.24.5/go.mod h1:bsoOS1aStSs9ErQ1WWfxllSeS1K5D+U30r2NfcubMVk=
github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM=
github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ=
github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU=
github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k=
github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
github.com/spf13/afero v1.11.0 h1:WJQKhtpdm3v2IzqG8VMqrr6Rf3UYpEF239Jy9wNepM8=
@ -548,6 +558,7 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=

@ -62,3 +62,7 @@ func (o *ConversationApi) GetPinnedConversationIDs(c *gin.Context) {
func (o *ConversationApi) UpdateConversationsByUser(c *gin.Context) {
a2r.Call(c, conversation.ConversationClient.UpdateConversationsByUser, o.Client)
}
func (o *ConversationApi) DeleteConversations(c *gin.Context) {
a2r.Call(c, conversation.ConversationClient.DeleteConversations, o.Client)
}

@ -0,0 +1,83 @@
package api
import (
"fmt"
"math"
"net/http"
"strconv"
"time"
"github.com/gin-gonic/gin"
"github.com/openimsdk/tools/apiresp"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/stability/ratelimit"
"github.com/openimsdk/tools/stability/ratelimit/bbr"
)
type RateLimiter struct {
Enable bool `yaml:"enable"`
Window time.Duration `yaml:"window"` // time duration per window
Bucket int `yaml:"bucket"` // bucket number for each window
CPUThreshold int64 `yaml:"cpuThreshold"` // CPU threshold; valid range 01000 (1000 = 100%)
}
func RateLimitMiddleware(config *RateLimiter) gin.HandlerFunc {
if !config.Enable {
return func(c *gin.Context) {
c.Next()
}
}
limiter := bbr.NewBBRLimiter(
bbr.WithWindow(config.Window),
bbr.WithBucket(config.Bucket),
bbr.WithCPUThreshold(config.CPUThreshold),
)
return func(c *gin.Context) {
status := limiter.Stat()
c.Header("X-BBR-CPU", strconv.FormatInt(status.CPU, 10))
c.Header("X-BBR-MinRT", strconv.FormatInt(status.MinRt, 10))
c.Header("X-BBR-MaxPass", strconv.FormatInt(status.MaxPass, 10))
c.Header("X-BBR-MaxInFlight", strconv.FormatInt(status.MaxInFlight, 10))
c.Header("X-BBR-InFlight", strconv.FormatInt(status.InFlight, 10))
done, err := limiter.Allow()
if err != nil {
c.Header("X-RateLimit-Policy", "BBR")
c.Header("Retry-After", calculateBBRRetryAfter(status))
c.Header("X-RateLimit-Limit", strconv.FormatInt(status.MaxInFlight, 10))
c.Header("X-RateLimit-Remaining", "0") // There is no concept of remaining quota in BBR.
fmt.Println("rate limited:", err, "path:", c.Request.URL.Path)
log.ZWarn(c, "rate limited", err, "path", c.Request.URL.Path)
c.AbortWithStatus(http.StatusTooManyRequests)
apiresp.GinError(c, errs.NewCodeError(http.StatusTooManyRequests, "too many requests, please try again later"))
return
}
c.Next()
done(ratelimit.DoneInfo{})
}
}
func calculateBBRRetryAfter(status bbr.Stat) string {
loadRatio := float64(status.CPU) / float64(status.CPU)
if loadRatio < 0.8 {
return "1"
}
if loadRatio < 0.95 {
return "2"
}
backoff := 1 + int64(math.Pow(loadRatio-0.95, 2)*50)
if backoff > 5 {
backoff = 5
}
return strconv.FormatInt(backoff, 10)
}

@ -97,6 +97,18 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cf
case BestSpeed:
r.Use(gzip.Gzip(gzip.BestSpeed))
}
// Use rate limiter middleware
if cfg.API.RateLimiter.Enable {
rl := &RateLimiter{
Enable: cfg.API.RateLimiter.Enable,
Window: cfg.API.RateLimiter.Window,
Bucket: cfg.API.RateLimiter.Bucket,
CPUThreshold: cfg.API.RateLimiter.CPUThreshold,
}
r.Use(RateLimitMiddleware(rl))
}
if config.Standalone() {
r.Use(func(c *gin.Context) {
c.Set(authverify.CtxAdminUserIDsKey, cfg.Share.IMAdminUser.UserIDs)
@ -277,6 +289,7 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cf
conversationGroup.POST("/get_owner_conversation", c.GetOwnerConversation)
conversationGroup.POST("/get_not_notify_conversation_ids", c.GetNotNotifyConversationIDs)
conversationGroup.POST("/get_pinned_conversation_ids", c.GetPinnedConversationIDs)
conversationGroup.POST("/delete_conversations", c.DeleteConversations)
}
{

@ -13,6 +13,7 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
"github.com/openimsdk/open-im-server/v3/pkg/rpccache"
pbAuth "github.com/openimsdk/protocol/auth"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/mcontext"
"github.com/go-playground/validator/v10"
@ -64,6 +65,8 @@ type WsServer struct {
webhookClient *webhook.Client
userClient *rpcli.UserClient
authClient *rpcli.AuthClient
ready atomic.Bool
}
type kickHandler struct {
@ -93,6 +96,8 @@ func (ws *WsServer) SetDiscoveryRegistry(ctx context.Context, disCov discovery.C
ws.authClient = rpcli.NewAuthClient(authConn)
ws.MessageHandler = NewGrpcHandler(ws.validate, rpcli.NewMsgClient(msgConn), rpcli.NewPushMsgServiceClient(pushConn))
ws.disCov = disCov
ws.ready.Store(true)
return nil
}
@ -437,6 +442,11 @@ func (ws *WsServer) wsHandler(w http.ResponseWriter, r *http.Request) {
// Create a new connection context
connContext := newContext(w, r)
if !ws.ready.Load() {
httpError(connContext, errs.New("ws server not ready"))
return
}
// Check if the current number of online user connections exceeds the maximum limit
if ws.onlineUserConnNum.Load() >= ws.wsMaxConnNum {
// If it exceeds the maximum connection number, return an error via HTTP and stop processing
@ -453,6 +463,11 @@ func (ws *WsServer) wsHandler(w http.ResponseWriter, r *http.Request) {
return
}
if ws.authClient == nil {
httpError(connContext, errs.New("auth client is not initialized"))
return
}
// Call the authentication client to parse the Token obtained from the context
resp, err := ws.authClient.ParseToken(connContext, connContext.GetToken())
if err != nil {

@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"errors"
"github.com/openimsdk/tools/mq"
"sync"

@ -23,6 +23,7 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/protocol/constant"
pbconversation "github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/errs"
@ -781,7 +782,7 @@ func (c *conversationServer) ClearUserConversationMsg(ctx context.Context, req *
}
latestMsgDestructTime := time.UnixMilli(req.Timestamp)
for i, conversation := range conversations {
if conversation.IsMsgDestruct == false || conversation.MsgDestructTime == 0 {
if !conversation.IsMsgDestruct || conversation.MsgDestructTime == 0 {
continue
}
seq, err := c.msgClient.GetLastMessageSeqByTime(ctx, conversation.ConversationID, req.Timestamp-(conversation.MsgDestructTime*1000))
@ -821,3 +822,53 @@ func (c *conversationServer) setConversationMinSeqAndLatestMsgDestructTime(ctx c
c.conversationNotificationSender.ConversationChangeNotification(ctx, ownerUserID, []string{conversationID})
return nil
}
func (c *conversationServer) DeleteConversations(ctx context.Context, req *pbconversation.DeleteConversationsReq) (resp *pbconversation.DeleteConversationsResp, err error) {
if err := authverify.CheckAccess(ctx, req.OwnerUserID); err != nil {
return nil, err
}
if req.NeedDeleteTime == 0 && len(req.ConversationIDs) == 0 {
return nil, errs.ErrArgs.WrapMsg("need_delete_time or conversationIDs need be set")
}
if req.NeedDeleteTime != 0 && len(req.ConversationIDs) != 0 {
return nil, errs.ErrArgs.WrapMsg("need_delete_time and conversationIDs cannot both be set")
}
var needDeleteConversationIDs []string
if len(req.ConversationIDs) == 0 {
deleteTimeThreshold := time.Now().AddDate(0, 0, -int(req.NeedDeleteTime)).UnixMilli()
conversationIDs, err := c.conversationDatabase.GetConversationIDs(ctx, req.OwnerUserID)
if err != nil {
return nil, err
}
latestMsgs, err := c.msgClient.GetLastMessage(ctx, &msg.GetLastMessageReq{
UserID: req.OwnerUserID,
ConversationIDs: conversationIDs,
})
if err != nil {
return nil, err
}
for conversationID, msg := range latestMsgs.Msgs {
if msg.SendTime < deleteTimeThreshold {
needDeleteConversationIDs = append(needDeleteConversationIDs, conversationID)
}
}
if len(needDeleteConversationIDs) == 0 {
return &pbconversation.DeleteConversationsResp{}, nil
}
} else {
needDeleteConversationIDs = req.ConversationIDs
}
if err := c.conversationDatabase.DeleteUsersConversations(ctx, req.OwnerUserID, needDeleteConversationIDs); err != nil {
return nil, err
}
// c.conversationNotificationSender.ConversationDeleteNotification(ctx, req.OwnerUserID, needDeleteConversationIDs)
return &pbconversation.DeleteConversationsResp{}, nil
}

@ -59,3 +59,12 @@ func (c *ConversationNotificationSender) ConversationUnreadChangeNotification(
c.Notification(ctx, userID, userID, constant.ConversationUnreadNotification, tips)
}
func (c *ConversationNotificationSender) ConversationDeleteNotification(ctx context.Context, userID string, conversationIDs []string) {
tips := &sdkws.ConversationDeleteTips{
UserID: userID,
ConversationIDs: conversationIDs,
}
c.Notification(ctx, userID, userID, constant.ConversationDeleteNotification, tips)
}

@ -67,6 +67,9 @@ func (a *ApiCmd) runE() error {
}
return startrpc.Start(
a.ctx, &a.apiConfig.Discovery,
nil,
nil,
// &a.apiConfig.API.RateLimiter,
&prometheus,
a.apiConfig.API.Api.ListenIP, "",
a.apiConfig.API.Prometheus.AutoSetPorts,

@ -43,7 +43,7 @@ func (a *AuthRpcCmd) Exec() error {
}
func (a *AuthRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.authConfig.Discovery, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP,
return startrpc.Start(a.ctx, &a.authConfig.Discovery, &a.authConfig.RpcConfig.CircuitBreaker, &a.authConfig.RpcConfig.RateLimiter, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP,
a.authConfig.RpcConfig.RPC.RegisterIP, a.authConfig.RpcConfig.RPC.AutoSetPorts, a.authConfig.RpcConfig.RPC.Ports,
a.Index(), a.authConfig.Discovery.RpcService.Auth, nil, a.authConfig,
[]string{

@ -44,7 +44,7 @@ func (a *ConversationRpcCmd) Exec() error {
}
func (a *ConversationRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.conversationConfig.Discovery, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP,
return startrpc.Start(a.ctx, &a.conversationConfig.Discovery, &a.conversationConfig.RpcConfig.CircuitBreaker, &a.conversationConfig.RpcConfig.RateLimiter, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP,
a.conversationConfig.RpcConfig.RPC.RegisterIP, a.conversationConfig.RpcConfig.RPC.AutoSetPorts, a.conversationConfig.RpcConfig.RPC.Ports,
a.Index(), a.conversationConfig.Discovery.RpcService.Conversation, &a.conversationConfig.NotificationConfig, a.conversationConfig,
[]string{

@ -42,6 +42,8 @@ func (a *CronTaskCmd) runE() error {
var prometheus config.Prometheus
return startrpc.Start(
a.ctx, &a.cronTaskConfig.Discovery,
nil,
nil,
&prometheus,
"", "",
true,

@ -44,7 +44,7 @@ func (a *FriendRpcCmd) Exec() error {
}
func (a *FriendRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.relationConfig.Discovery, &a.relationConfig.RpcConfig.Prometheus, a.relationConfig.RpcConfig.RPC.ListenIP,
return startrpc.Start(a.ctx, &a.relationConfig.Discovery, &a.relationConfig.RpcConfig.CircuitBreaker, &a.relationConfig.RpcConfig.RateLimiter, &a.relationConfig.RpcConfig.Prometheus, a.relationConfig.RpcConfig.RPC.ListenIP,
a.relationConfig.RpcConfig.RPC.RegisterIP, a.relationConfig.RpcConfig.RPC.AutoSetPorts, a.relationConfig.RpcConfig.RPC.Ports,
a.Index(), a.relationConfig.Discovery.RpcService.Friend, &a.relationConfig.NotificationConfig, a.relationConfig,
[]string{

@ -45,7 +45,7 @@ func (a *GroupRpcCmd) Exec() error {
}
func (a *GroupRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.groupConfig.Discovery, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP,
return startrpc.Start(a.ctx, &a.groupConfig.Discovery, &a.groupConfig.RpcConfig.CircuitBreaker, &a.groupConfig.RpcConfig.RateLimiter, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP,
a.groupConfig.RpcConfig.RPC.RegisterIP, a.groupConfig.RpcConfig.RPC.AutoSetPorts, a.groupConfig.RpcConfig.RPC.Ports,
a.Index(), a.groupConfig.Discovery.RpcService.Group, &a.groupConfig.NotificationConfig, a.groupConfig,
[]string{

@ -45,7 +45,7 @@ func (a *MsgRpcCmd) Exec() error {
}
func (a *MsgRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.msgConfig.Discovery, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP,
return startrpc.Start(a.ctx, &a.msgConfig.Discovery, &a.msgConfig.RpcConfig.CircuitBreaker, &a.msgConfig.RpcConfig.RateLimiter, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP,
a.msgConfig.RpcConfig.RPC.RegisterIP, a.msgConfig.RpcConfig.RPC.AutoSetPorts, a.msgConfig.RpcConfig.RPC.Ports,
a.Index(), a.msgConfig.Discovery.RpcService.Msg, &a.msgConfig.NotificationConfig, a.msgConfig,
[]string{

@ -47,6 +47,8 @@ func (m *MsgGatewayCmd) runE() error {
var prometheus config.Prometheus
return startrpc.Start(
m.ctx, &m.msgGatewayConfig.Discovery,
&m.msgGatewayConfig.MsgGateway.CircuitBreaker,
&m.msgGatewayConfig.MsgGateway.RateLimiter,
&prometheus,
rpc.ListenIP, rpc.RegisterIP,
rpc.AutoSetPorts,

@ -48,6 +48,8 @@ func (m *MsgTransferCmd) runE() error {
var prometheus config.Prometheus
return startrpc.Start(
m.ctx, &m.msgTransferConfig.Discovery,
&m.msgTransferConfig.MsgTransfer.CircuitBreaker,
&m.msgTransferConfig.MsgTransfer.RateLimiter,
&prometheus,
"", "",
true,

@ -46,7 +46,7 @@ func (a *PushRpcCmd) Exec() error {
}
func (a *PushRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.pushConfig.Discovery, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP,
return startrpc.Start(a.ctx, &a.pushConfig.Discovery, &a.pushConfig.RpcConfig.CircuitBreaker, &a.pushConfig.RpcConfig.RateLimiter, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP,
a.pushConfig.RpcConfig.RPC.RegisterIP, a.pushConfig.RpcConfig.RPC.AutoSetPorts, a.pushConfig.RpcConfig.RPC.Ports,
a.Index(), a.pushConfig.Discovery.RpcService.Push, &a.pushConfig.NotificationConfig, a.pushConfig,
[]string{

@ -44,7 +44,7 @@ func (a *ThirdRpcCmd) Exec() error {
}
func (a *ThirdRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.thirdConfig.Discovery, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP,
return startrpc.Start(a.ctx, &a.thirdConfig.Discovery, &a.thirdConfig.RpcConfig.CircuitBreaker, &a.thirdConfig.RpcConfig.RateLimiter, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP,
a.thirdConfig.RpcConfig.RPC.RegisterIP, a.thirdConfig.RpcConfig.RPC.AutoSetPorts, a.thirdConfig.RpcConfig.RPC.Ports,
a.Index(), a.thirdConfig.Discovery.RpcService.Third, &a.thirdConfig.NotificationConfig, a.thirdConfig,
[]string{

@ -45,7 +45,7 @@ func (a *UserRpcCmd) Exec() error {
}
func (a *UserRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.userConfig.Discovery, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP,
return startrpc.Start(a.ctx, &a.userConfig.Discovery, &a.userConfig.RpcConfig.CircuitBreaker, &a.userConfig.RpcConfig.RateLimiter, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP,
a.userConfig.RpcConfig.RPC.RegisterIP, a.userConfig.RpcConfig.RPC.AutoSetPorts, a.userConfig.RpcConfig.RPC.Ports,
a.Index(), a.userConfig.Discovery.RpcService.User, &a.userConfig.NotificationConfig, a.userConfig,
[]string{

@ -129,6 +129,23 @@ type API struct {
Ports []int `yaml:"ports"`
GrafanaURL string `yaml:"grafanaURL"`
} `yaml:"prometheus"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
}
type RateLimiter struct {
Enable bool `yaml:"enable"`
Window time.Duration `yaml:"window"`
Bucket int `yaml:"bucket"`
CPUThreshold int64 `yaml:"cpuThreshold"`
}
type CircuitBreaker struct {
Enable bool `yaml:"enable"`
Window time.Duration `yaml:"window"`
Bucket int `yaml:"bucket"`
Success float64 `yaml:"success"`
Request int64 `yaml:"request"`
}
type CronTask struct {
@ -203,6 +220,8 @@ type MsgGateway struct {
WebsocketMaxMsgLen int `yaml:"websocketMaxMsgLen"`
WebsocketTimeout int `yaml:"websocketTimeout"`
} `yaml:"longConnSvr"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"`
}
type MsgTransfer struct {
@ -211,6 +230,8 @@ type MsgTransfer struct {
AutoSetPorts bool `yaml:"autoSetPorts"`
Ports []int `yaml:"ports"`
} `yaml:"prometheus"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"`
}
type Push struct {
@ -241,7 +262,9 @@ type Push struct {
BadgeCount bool `yaml:"badgeCount"`
Production bool `yaml:"production"`
} `yaml:"iosPush"`
FullUserCache bool `yaml:"fullUserCache"`
FullUserCache bool `yaml:"fullUserCache"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"`
}
type Auth struct {
@ -250,28 +273,38 @@ type Auth struct {
TokenPolicy struct {
Expire int64 `yaml:"expire"`
} `yaml:"tokenPolicy"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"`
}
type Conversation struct {
RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"`
RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"`
}
type Friend struct {
RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"`
RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"`
}
type Group struct {
RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"`
EnableHistoryForNewMembers bool `yaml:"enableHistoryForNewMembers"`
RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"`
EnableHistoryForNewMembers bool `yaml:"enableHistoryForNewMembers"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"`
}
type Msg struct {
RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"`
FriendVerify bool `yaml:"friendVerify"`
RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"`
FriendVerify bool `yaml:"friendVerify"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"`
}
type Third struct {
@ -284,6 +317,8 @@ type Third struct {
Kodo Kodo `yaml:"kodo"`
Aws Aws `yaml:"aws"`
} `yaml:"object"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"`
}
type Cos struct {
BucketURL string `yaml:"bucketURL"`
@ -322,8 +357,10 @@ type Aws struct {
}
type User struct {
RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"`
RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"`
}
type RPC struct {

@ -0,0 +1,107 @@
package startrpc
import (
"context"
"time"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/stability/circuitbreaker"
"github.com/openimsdk/tools/stability/circuitbreaker/sre"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type CircuitBreaker struct {
Enable bool `yaml:"enable"`
Success float64 `yaml:"success"` // success rate threshold (0.0-1.0)
Request int64 `yaml:"request"` // request threshold
Bucket int `yaml:"bucket"` // number of buckets
Window time.Duration `yaml:"window"` // time window for statistics
}
func NewCircuitBreaker(config *CircuitBreaker) circuitbreaker.CircuitBreaker {
if !config.Enable {
return nil
}
return sre.NewSREBraker(
sre.WithWindow(config.Window),
sre.WithBucket(config.Bucket),
sre.WithSuccess(config.Success),
sre.WithRequest(config.Request),
)
}
func UnaryCircuitBreakerInterceptor(breaker circuitbreaker.CircuitBreaker) grpc.ServerOption {
if breaker == nil {
return grpc.ChainUnaryInterceptor(func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
return handler(ctx, req)
})
}
return grpc.ChainUnaryInterceptor(func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
if err := breaker.Allow(); err != nil {
log.ZWarn(ctx, "rpc circuit breaker open", err, "method", info.FullMethod)
return nil, status.Error(codes.Unavailable, "service unavailable due to circuit breaker")
}
resp, err = handler(ctx, req)
if err != nil {
if st, ok := status.FromError(err); ok {
switch st.Code() {
case codes.OK:
breaker.MarkSuccess()
case codes.InvalidArgument, codes.NotFound, codes.AlreadyExists, codes.PermissionDenied:
breaker.MarkSuccess()
default:
breaker.MarkFailed()
}
} else {
breaker.MarkFailed()
}
} else {
breaker.MarkSuccess()
}
return resp, err
})
}
func StreamCircuitBreakerInterceptor(breaker circuitbreaker.CircuitBreaker) grpc.ServerOption {
if breaker == nil {
return grpc.ChainStreamInterceptor(func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
return handler(srv, ss)
})
}
return grpc.ChainStreamInterceptor(func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
if err := breaker.Allow(); err != nil {
log.ZWarn(ss.Context(), "rpc circuit breaker open", err, "method", info.FullMethod)
return status.Error(codes.Unavailable, "service unavailable due to circuit breaker")
}
err := handler(srv, ss)
if err != nil {
if st, ok := status.FromError(err); ok {
switch st.Code() {
case codes.OK:
breaker.MarkSuccess()
case codes.InvalidArgument, codes.NotFound, codes.AlreadyExists, codes.PermissionDenied:
breaker.MarkSuccess()
default:
breaker.MarkFailed()
}
} else {
breaker.MarkFailed()
}
} else {
breaker.MarkSuccess()
}
return err
})
}

@ -0,0 +1,70 @@
package startrpc
import (
"context"
"time"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/stability/ratelimit"
"github.com/openimsdk/tools/stability/ratelimit/bbr"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type RateLimiter struct {
Enable bool
Window time.Duration
Bucket int
CPUThreshold int64
}
func NewRateLimiter(config *RateLimiter) ratelimit.Limiter {
if !config.Enable {
return nil
}
return bbr.NewBBRLimiter(
bbr.WithWindow(config.Window),
bbr.WithBucket(config.Bucket),
bbr.WithCPUThreshold(config.CPUThreshold),
)
}
func UnaryRateLimitInterceptor(limiter ratelimit.Limiter) grpc.ServerOption {
if limiter == nil {
return grpc.ChainUnaryInterceptor(func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
return handler(ctx, req)
})
}
return grpc.ChainUnaryInterceptor(func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
done, err := limiter.Allow()
if err != nil {
log.ZWarn(ctx, "rpc rate limited", err, "method", info.FullMethod)
return nil, status.Errorf(codes.ResourceExhausted, "rpc request rate limit exceeded: %v, please try again later", err)
}
defer done(ratelimit.DoneInfo{})
return handler(ctx, req)
})
}
func StreamRateLimitInterceptor(limiter ratelimit.Limiter) grpc.ServerOption {
if limiter == nil {
return grpc.ChainStreamInterceptor(func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
return handler(srv, ss)
})
}
return grpc.ChainStreamInterceptor(func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
done, err := limiter.Allow()
if err != nil {
log.ZWarn(ss.Context(), "rpc rate limited", err, "method", info.FullMethod)
return status.Errorf(codes.ResourceExhausted, "rpc request rate limit exceeded: %v, please try again later", err)
}
defer done(ratelimit.DoneInfo{})
return handler(srv, ss)
})
}

@ -33,7 +33,7 @@ func init() {
prommetrics.RegistryAll()
}
func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *conf.Prometheus, listenIP,
func Start[T any](ctx context.Context, disc *conf.Discovery, circuitBreakerConfig *conf.CircuitBreaker, rateLimiterConfig *conf.RateLimiter, prometheusConfig *conf.Prometheus, listenIP,
registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, notification *conf.Notification, config T,
watchConfigNames []string, watchServiceNames []string,
rpcFn func(ctx context.Context, config T, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error,
@ -70,6 +70,45 @@ func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *c
}
}
if circuitBreakerConfig != nil && circuitBreakerConfig.Enable {
cb := &CircuitBreaker{
Enable: circuitBreakerConfig.Enable,
Success: circuitBreakerConfig.Success,
Request: circuitBreakerConfig.Request,
Bucket: circuitBreakerConfig.Bucket,
Window: circuitBreakerConfig.Window,
}
breaker := NewCircuitBreaker(cb)
options = append(options,
UnaryCircuitBreakerInterceptor(breaker),
StreamCircuitBreakerInterceptor(breaker),
)
log.ZInfo(ctx, "RPC circuit breaker enabled",
"service", rpcRegisterName,
"window", circuitBreakerConfig.Window,
"bucket", circuitBreakerConfig.Bucket,
"success", circuitBreakerConfig.Success,
"requestThreshold", circuitBreakerConfig.Request)
}
if rateLimiterConfig != nil && rateLimiterConfig.Enable {
limiter := NewRateLimiter((*RateLimiter)(rateLimiterConfig))
options = append(options,
UnaryRateLimitInterceptor(limiter),
StreamRateLimitInterceptor(limiter),
)
log.ZInfo(ctx, "RPC rate limiter enabled",
"service", rpcRegisterName,
"window", rateLimiterConfig.Window,
"bucket", rateLimiterConfig.Bucket,
"cpuThreshold", rateLimiterConfig.CPUThreshold)
}
registerIP, err := network.GetRpcRegisterIP(registerIP)
if err != nil {
return err
@ -109,7 +148,7 @@ func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *c
go func() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT)
select {
case <-ctx.Done():
return

@ -2,6 +2,7 @@ package cache
import (
"context"
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
)
@ -43,7 +44,7 @@ type ConversationCache interface {
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache
DelConversationNotNotifyMessageUserIDs(userIDs ...string) ConversationCache
DelConversationPinnedMessageUserIDs(userIDs ...string) ConversationCache
DelUserPinnedConversations(userIDs ...string) ConversationCache
DelConversationVersionUserIDs(userIDs ...string) ConversationCache
FindMaxConversationUserVersion(ctx context.Context, userID string) (*relationtb.VersionLog, error)

@ -253,7 +253,7 @@ func (c *ConversationRedisCache) DelConversationNotNotifyMessageUserIDs(userIDs
return cache
}
func (c *ConversationRedisCache) DelConversationPinnedMessageUserIDs(userIDs ...string) cache.ConversationCache {
func (c *ConversationRedisCache) DelUserPinnedConversations(userIDs ...string) cache.ConversationCache {
cache := c.CloneConversationCache()
for _, userID := range userIDs {
cache.AddKeys(c.getPinnedConversationIDsKey(userID))

@ -64,6 +64,8 @@ type ConversationDatabase interface {
GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error)
// FindRandConversation finds random conversations based on the specified timestamp and limit.
FindRandConversation(ctx context.Context, ts int64, limit int) ([]*relationtb.Conversation, error)
DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error)
}
func NewConversationDatabase(conversation database.Conversation, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase {
@ -106,7 +108,7 @@ func (c *conversationDatabase) SetUsersConversationFieldTx(ctx context.Context,
cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...)
}
if _, ok := fieldMap["is_pinned"]; ok {
cache = cache.DelConversationPinnedMessageUserIDs(userIDs...)
cache = cache.DelUserPinnedConversations(userIDs...)
}
cache = cache.DelConversationVersionUserIDs(haveUserIDs...)
}
@ -158,7 +160,7 @@ func (c *conversationDatabase) UpdateUsersConversationField(ctx context.Context,
cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...)
}
if _, ok := args["is_pinned"]; ok {
cache = cache.DelConversationPinnedMessageUserIDs(userIDs...)
cache = cache.DelUserPinnedConversations(userIDs...)
}
return cache.ChainExecDel(ctx)
}
@ -189,7 +191,7 @@ func (c *conversationDatabase) CreateConversation(ctx context.Context, conversat
DelUserConversationIDsHash(userIDs...).
DelConversationVersionUserIDs(userIDs...).
DelConversationNotNotifyMessageUserIDs(notNotifyUserIDs...).
DelConversationPinnedMessageUserIDs(pinnedUserIDs...).
DelUserPinnedConversations(pinnedUserIDs...).
ChainExecDel(ctx)
}
@ -245,7 +247,7 @@ func (c *conversationDatabase) SetUserConversations(ctx context.Context, ownerUs
cache := c.cache.CloneConversationCache()
cache = cache.DelConversationVersionUserIDs(ownerUserID).
DelConversationNotNotifyMessageUserIDs(ownerUserID).
DelConversationPinnedMessageUserIDs(ownerUserID)
DelUserPinnedConversations(ownerUserID)
groupIDs := datautil.Distinct(datautil.Filter(conversations, func(e *relationtb.Conversation) (string, bool) {
return e.GroupID, e.GroupID != ""
@ -415,3 +417,21 @@ func (c *conversationDatabase) GetPinnedConversationIDs(ctx context.Context, use
func (c *conversationDatabase) FindRandConversation(ctx context.Context, ts int64, limit int) ([]*relationtb.Conversation, error) {
return c.conversationDB.FindRandConversation(ctx, ts, limit)
}
func (c *conversationDatabase) DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error) {
return c.tx.Transaction(ctx, func(ctx context.Context) error {
err = c.conversationDB.DeleteUsersConversations(ctx, userID, conversationIDs)
if err != nil {
return err
}
cache := c.cache.CloneConversationCache()
cache = cache.DelConversations(userID, conversationIDs...).
DelConversationVersionUserIDs(userID).
DelConversationIDs(userID).
DelUserConversationIDsHash(userID).
DelConversationNotNotifyMessageUserIDs(userID).
DelUserPinnedConversations(userID)
return cache.ChainExecDel(ctx)
})
}

@ -30,4 +30,5 @@ type Conversation interface {
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error)
FindRandConversation(ctx context.Context, ts int64, limit int) ([]*model.Conversation, error)
DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error)
}

@ -294,3 +294,20 @@ func (c *ConversationMgo) FindRandConversation(ctx context.Context, ts int64, li
}
return mongoutil.Aggregate[*model.Conversation](ctx, c.coll, pipeline)
}
func (c *ConversationMgo) DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error) {
if len(conversationIDs) == 0 {
return nil
}
return mongoutil.IncrVersion(func() error {
err := mongoutil.DeleteMany(ctx, c.coll, bson.M{"owner_user_id": userID, "conversation_id": bson.M{"$in": conversationIDs}})
return err
}, func() error {
for _, conversationID := range conversationIDs {
if err := c.version.IncrVersion(ctx, userID, []string{conversationID}, model.VersionStateDelete); err != nil {
return err
}
}
return nil
})
}

Loading…
Cancel
Save