fix: fix the conflect

pull/2001/head
luhaoling 2 years ago
commit ef5f018ca8

@ -92,6 +92,7 @@ jobs:
- name: Exec OpenIM API test
run: |
sudo make test-api
mkdir -p ./tmp
touch ./tmp/test.md
echo "# OpenIM Test" >> ./tmp/test.md
@ -104,6 +105,7 @@ jobs:
- name: Exec OpenIM E2E Test
run: |
sudo make test-e2e
echo "" >> ./tmp/test.md
echo "## OpenIM E2E Test" >> ./tmp/test.md
echo "<details><summary>Command Output for OpenIM E2E Test</summary>" >> ./tmp/test.md

@ -74,6 +74,17 @@ jobs:
echo $latest_tag > pkg/common/config/version
continue-on-error: true
- name: Gen CHANGELOG file
run: |
current_tag=$(git describe --tags --abbrev=0)
version=$(echo "$current_tag" | sed -E 's/^v?([0-9]+)\.([0-9]+)\..*$/\1.\2/')
echo "OpenIM Version: $version"
make tools.install.git-chglog
cd CHANGELOG
git-chglog --tag-filter-pattern "v${version}.*" -o CHANGELOG-${version}.md
cd ..
continue-on-error: true
- name: Run unit test and get test coverage
run: |
make cover

@ -40,10 +40,14 @@ run:
# "/" will be replaced by current OS file path separator to properly work
# on Windows.
skip-dirs:
- components
- docs
- util
- .*~
- api/swagger/docs
- server/docs
- components/mnt/config/certs
- logs
# default is true. Enables skipping of directories:
# vendor$, third_party$, testdata$, examples$, Godeps$, builtin$
@ -58,6 +62,12 @@ run:
skip-files:
- ".*\\.my\\.go$"
- _test.go
- ".*_test.go"
- "mocks/"
- ".github/"
- "logs/"
- "_output/"
- "components/"
# by default isn't set. If set we pass it to "go list -mod={option}". From "go help modules":
# If invoked with -mod=readonly, the go command is disallowed from the implicit
@ -108,7 +118,6 @@ linters-settings:
right-to-left-isolate: true
first-strong-isolate: true
pop-directional-isolate: true
dogsled:
# checks assignments with too many blank identifiers; default is 2
max-blank-identifiers: 2
dupl:
@ -418,7 +427,7 @@ linters-settings:
govet:
# report about shadowed variables
check-shadowing: true
check-shadowing: false
# settings per analyzer
settings:
@ -489,9 +498,9 @@ linters-settings:
- github.com\/user\/package\/v4\.Type
lll:
# max line length, lines longer will be reported. Default is 120.
# max line length, lines longer will be reported. Default is 250.
# '\t' is counted as 1 character by default, and can be changed with the tab-width option
line-length: 240
line-length: 250
# tab width in spaces. Default to 1.
tab-width: 4
maligned:
@ -715,17 +724,34 @@ linters:
# enable-all: true
disable-all: true
enable:
- typecheck # 基本的类型检查
- gofmt # 格式检查
- govet # Go 语言的标准检查工具
- gosimple # 简化代码的建议
- misspell # 拼写错误
- staticcheck # 静态检查
- unused # 未使用的代码检查
- goimports # 检查导入是否正确排序和格式化
- godot # 注释句点检查
- bodyclose # 确保 HTTP response body 被关闭
- errcheck # 检查是否遗漏了错误返回值
- typecheck # Basic type checking
- gofmt # Format check
- govet # Go's standard linting tool
- gosimple # Suggestions for simplifying code
- errcheck
- decorder
- sloglint
- ineffassign
- revive
- reassign
- tparallel
- unconvert
- dupl
- dupword
- errname
- gci
- goheader
- goprintffuncname
- gosec
- misspell # Spelling mistakes
- staticcheck # Static analysis
- unused # Checks for unused code
- goimports # Checks if imports are correctly sorted and formatted
- godot # Checks for comment punctuation
- bodyclose # Ensures HTTP response body is closed
- stylecheck # Style checker for Go code
- unused # Checks for unused code
- errcheck # Checks for missed error returns
fast: true
issues:
@ -792,6 +818,11 @@ issues:
- lll
source: "^//go:generate "
- text: ".*[\u4e00-\u9fa5]+.*"
linters:
- golint
source: "^//.*$"
# Independently from option `exclude` we use default exclude patterns,
# it can be disabled by this option. To list all
# excluded by default patterns execute `golangci-lint run --help`.

@ -8,6 +8,12 @@
## [Unreleased]
<a name="v3.5.1-alpha.2"></a>
## [v3.5.1-alpha.2] - 2024-01-26
<a name="v3.5.1-rc.1"></a>
## [v3.5.1-rc.1] - 2024-01-23
<a name="v3.5.1-alpha.1"></a>
## [v3.5.1-alpha.1] - 2024-01-09
@ -59,7 +65,9 @@
- Merge branch 'tuoyun'
[Unreleased]: https://github.com/openimsdk/open-im-server/compare/v3.5.1-alpha.1...HEAD
[Unreleased]: https://github.com/openimsdk/open-im-server/compare/v3.5.1-alpha.2...HEAD
[v3.5.1-alpha.2]: https://github.com/openimsdk/open-im-server/compare/v3.5.1-rc.1...v3.5.1-alpha.2
[v3.5.1-rc.1]: https://github.com/openimsdk/open-im-server/compare/v3.5.1-alpha.1...v3.5.1-rc.1
[v3.5.1-alpha.1]: https://github.com/openimsdk/open-im-server/compare/v3.5.0...v3.5.1-alpha.1
[v3.5.0]: https://github.com/openimsdk/open-im-server/compare/v3.5.1...v3.5.0
[v3.5.1]: https://github.com/openimsdk/open-im-server/compare/v3.5.1-bate.1...v3.5.1

@ -184,7 +184,7 @@ test-e2e:
imports:
@$(MAKE) go.imports
## clean: Remove all files that are created by building. ✨
## clean: Delete all files created by the build, as well as all log files. ✨
.PHONY: clean
clean:
@$(MAKE) go.clean

@ -125,6 +125,8 @@ We support many platforms. Here are the addresses for quick experience on the we
## :hammer_and_wrench: To Start Developing OpenIM
[![Open in GitHub Codespaces](https://github.com/codespaces/badge.svg)](https://codespaces.new/openimsdk/open-im-server)
[![Open in Dev Container](https://img.shields.io/static/v1?label=Dev%20Container&message=Open&color=blue&logo=visualstudiocode)](https://vscode.dev/github/openimsdk/open-im-server)
OpenIM Our goal is to build a top-level open source community. We have a set of standards, in the [Community repository](https://github.com/OpenIMSDK/community).

@ -53,7 +53,7 @@
},
"id": 16,
"panels": [],
"title": "openim自定义指标",
"title": "openim Custom Metrics",
"type": "row"
},
{
@ -144,7 +144,7 @@
"refId": "A"
}
],
"title": "在线人数",
"title": "Online population",
"type": "timeseries"
},
{
@ -235,7 +235,7 @@
"refId": "A"
}
],
"title": "登入/注册人数",
"title": "Login/registration numbers",
"type": "timeseries"
},
{
@ -1345,7 +1345,7 @@
"type": "timeseries"
}
],
"title": "应用服务器流量指标",
"title": "Traffic indicators of the application server",
"type": "row"
}
],

@ -3,7 +3,15 @@
- [Code conventions](#code-conventions)
- [POSIX shell](#posix-shell)
- [Go](#go)
- [Directory and file conventions](#directory-and-file-conventions)
- [OpenIM Naming Conventions Guide](#openim-naming-conventions-guide)
- [1. General File Naming](#1-general-file-naming)
- [2. Special File Types](#2-special-file-types)
- [a. Script and Markdown Files](#a-script-and-markdown-files)
- [b. Uppercase Markdown Documentation](#b-uppercase-markdown-documentation)
- [3. Directory Naming](#3-directory-naming)
- [4. Configuration Files](#4-configuration-files)
- [Best Practices](#best-practices)
- [Directory and File Conventions](#directory-and-file-conventions)
- [Testing conventions](#testing-conventions)
## POSIX shell
@ -67,12 +75,13 @@ Files within the OpenIM project should adhere to the following rules:
+ Stick to lowercase naming where possible for consistency and to prevent issues with case-sensitive systems.
+ Include version numbers or dates in file names if the file is subject to updates, following the format: `project-plan-v1.2.md` or `backup-2023-03-15.sql`.
## Directory and file conventions
## Directory and File Conventions
- Avoid generic utility packages. Instead of naming a package "util", choose a name that clearly describes its purpose. For instance, functions related to waiting operations are contained within the `wait` package, which includes methods like `Poll`, fully named as `wait.Poll`.
- All filenames, script files, configuration files, and directories should be in lowercase and use dashes (`-`) as separators.
- For Go language files, filenames should be in lowercase and use underscores (`_`).
- Package names should match their directory names to ensure consistency. For example, within the `openim-api` directory, the Go file should be named `openim-api.go`, following the convention of using dashes for directory names and aligning package names with directory names.
- Avoid general utility packages. Packages called "util" are suspect. Instead, derive a name that describes your desired function. For example, the utility functions dealing with waiting for operations are in the `wait` package and include functionality like `Poll`. The full name is `wait.Poll`.
- All filenames should be lowercase.
- All source files and directories should use underscores, not dashes.
- Package directories should generally avoid using separators as much as possible. When package names are multiple words, they usually should be in nested subdirectories.
## Testing conventions

@ -449,7 +449,7 @@ This section involves configuring the log settings, including storage location,
| Parameter | Example Value | Description |
| ------------------------- | ------------------------ | --------------------------------- |
| LOG_STORAGE_LOCATION | "${OPENIM_ROOT}/logs/" | Location for storing logs |
| LOG_STORAGE_LOCATION | "${OPENIM_ROOT}/_output/logs/" | Location for storing logs |
| LOG_ROTATION_TIME | "24" | Log rotation time (in hours) |
| LOG_REMAIN_ROTATION_COUNT | "2" | Number of log rotations to retain |
| LOG_REMAIN_LOG_LEVEL | "6" | Log level to retain |

@ -246,6 +246,7 @@ if err != nil {
- Bug description suggestions
- Error descriptions start with a lowercase letter and do not end with punctuation, for example:
```go
// bad
errors.New("Redis connection failed")
@ -254,6 +255,7 @@ errors.New("redis connection failed.")
// good
errors.New("redis connection failed")
```
- Tell users what they can do, not what they can't.
- When declaring a requirement, use must instead of should. For example, `must be greater than 0, must match regex '[a-z]+'`.
- When declaring that a format is incorrect, use must not. For example, `must not contain`.
@ -359,7 +361,7 @@ u := User{
For example:
```
```go
// Seeking to an offset before the start of the file is an error.
// Seeking to any positive offset is legal, but the behavior of subsequent
// I/O operations on the underlying object are implementation-dependent.
@ -386,7 +388,7 @@ Writer
Some common nouns are listed below.
```
```go
// A GonicMapper that contains a list of common initialisms taken from golang/lint
var LintGonicMapper = GonicMapper{
"API": true,
@ -523,6 +525,7 @@ package genericclioptions
// ErrSigningMethod defines invalid signing method error.
var ErrSigningMethod = errors. New("Invalid signing method")
```
- When there is a large block of constant or variable definition, you can comment a general description in front, and then comment the definition of the constant in detail before or at the end of each line of constant, for example:
```go
// Code must start with 1xxxxx.
@ -888,6 +891,7 @@ type LogHandler struct {
}
var_http.Handler = LogHandler{}
```
- When the server processes a request, it should create a context, save the relevant information of the request (such as requestID), and pass it in the function call chain.
### 9.1 Performance
@ -900,3 +904,246 @@ var_http.Handler = LogHandler{}
- If you want to directly modify the value of the map, the value can only be a pointer, otherwise the original value must be overwritten.
- map needs to be locked during concurrency.
- The conversion of interface{} cannot be checked during compilation, it can only be checked at runtime, be careful to cause panic.
## 10 Golang CI Lint
- Golang CI Lint is a fast Go linters runner. It runs linters in parallel, uses caching, and works well with all environments, including CI.
**In local development, you can use the following command to install Golang CI Lint: **
```bash
make lint
```
**In CI/CD, Check the Github Actions status code below after you submit the code directly**
[![OpenIM golangci-lint](https://github.com/openimsdk/open-im-server/actions/workflows/golangci-lint.yml/badge.svg)](https://github.com/openimsdk/open-im-server/actions/workflows/golangci-lint.yml)
golangci lint can select the types of tools, refer to the official documentation: [https://golangci-lint.run/usage/linters/](https://golangci-lint.run/usage/linters/)
The types of comments we currently use include: [https://github.com/openimsdk/open-im-server/blob/main/.golangci.yml](https://github.com/openimsdk/open-im-server/blob/main/.golangci.yml) the `linters.enable` field in the file.
e.g:
```yaml
linters:
# please, do not use `enable-all`: it's deprecated and will be removed soon.
# inverted configuration with `enable-all` and `disable` is not scalable during updates of golangci-lint
# enable-all: true
disable-all: true
enable:
- typecheck # Basic type checking
- gofmt # Format check
- govet # Go's standard linting tool
- gosimple # Suggestions for simplifying code
- misspell # Spelling mistakes
- staticcheck # Static analysis
- unused # Checks for unused code
- goimports # Checks if imports are correctly sorted and formatted
- godot # Checks for comment punctuation
- bodyclose # Ensures HTTP response body is closed
- errcheck # Checks for missed error returns
fast: true
```
Add that Chinese comments are not allowed in go code, please write a complete golangci lint specification on the basis of the above.
### 10.1 Configuration Document
This configuration document is designed to configure the operational parameters of OpenIM (a hypothetical or specific code analysis tool), customize output formats, and provide detailed settings for specific code checkers (linters). Below is a summary of the document drafted based on the provided configuration information.
#### 10.1 Runtime Options
- **Concurrency** (`concurrency`): Default to use the available CPU count, can be manually set to 4 for parallel analysis.
- **Timeout** (`timeout`): Timeout duration for analysis operations, default is 1 minute, set here to 5 minutes.
- **Issue Exit Code** (`issues-exit-code`): Exit code defaults to 1 if at least one issue is found.
- **Test Files** (`tests`): Whether to include test files, defaults to true.
- **Build Tags** (`build-tags`): Specify build tags used by all linters, defaults to an empty list. Example adds `mytag`.
- **Skip Directories** (`skip-dirs`): Configure which directories' issues are not reported, defaults to empty, but some default directories are independently skipped.
- **Skip Files** (`skip-files`): Specify files where issues should not be reported, supports regular expressions.
#### 10.2 Output Configuration
- **Format** (`format`): Set output format, default is "colored-line-number".
- **Print Issued Lines** (`print-issued-lines`): Whether to print the lines where issues occur, defaults to true.
- **Print Linter Name** (`print-linter-name`): Whether to print the linter name at the end of issue text, defaults to true.
- **Uniqueness Filter** (`uniq-by-line`): Whether to make issue outputs unique per line, defaults to true.
- **Path Prefix** (`path-prefix`): Prefix to add to output file references, defaults to no prefix.
- **Sort Results** (`sort-results`): Sort results by file path, line number, and column number.
#### 10.3 Linters Settings
In the configuration file, the `linters-settings` section allows detailed configuration of individual linters. Below are examples of specific linters settings and their purposes:
- **bidichk**: Used to check bidirectional text characters, ensuring correct display direction of text, especially when dealing with mixed left-to-right (LTR) and right-to-left (RTL) text.
- **dogsled**: Monitors excessive use of blank identifiers (`_`) in assignment operations, which may obscure data processing errors or unclear logic.
- **dupl**: Identifies duplicate code blocks, helping developers avoid code redundancy. The `threshold` parameter in settings allows adjustment of code similarity threshold triggering warnings.
- **errcheck**: Checks for unhandled errors. In Go, error handling is achieved by checking function return values. This linter helps ensure all errors are properly handled.
- **exhaustive**: Checks if `switch` statements include all possible values of an enum type, ensuring exhaustiveness of code. This helps avoid forgetting to handle certain cases.
#### 10.4 Example: `errcheck`
**Incorrect Code Example**:
```go
package main
import (
"fmt"
"os"
)
func main() {
f, _ := os.Open("filename.ext")
defer f.Close()
}
```
**Issue**: In the above code, the error return value of `os.Open` function is explicitly ignored. This is a common mistake as it may lead to unhandled errors and hard-to-trace bugs.
**Correct Form**:
```go
package main
import (
"fmt"
"os"
)
func main() {
f, err := os.Open("filename.ext")
if err != nil {
fmt.Printf("error opening file: %v\n", err)
return
}
defer f.Close()
}
```
In the correct form, by checking the error (`err`) returned by `os.Open`, we gracefully handle error cases rather than simply ignoring them.
#### 10.5 Example: `gofmt`
**Incorrect Code Example**:
```go
package main
import "fmt"
func main() {
fmt.Println("Hello, world!")
}
```
**Issue**: This code snippet doesn't follow Go's standard formatting rules, for example, incorrect indentation of `fmt.Println`.
**Correct Form**:
```go
package main
import "fmt"
func main() {
fmt.Println("Hello, world!")
}
```
Using `gofmt` tool can automatically fix such formatting issues, ensuring the code adheres to the coding standards of the Go community.
#### 10.6 Example: `unused`
**Incorrect Code Example**:
```go
package main
func helper() {}
func main() {}
```
**Issue**: The `helper` function is defined but not called anywhere, indicating potential redundant code or missing functionality implementation.
**Correct Form**:
```go
package main
// If the helper function is indeed needed, ensure it's used properly.
func helper() {
// Implement the function's functionality or ensure it's called elsewhere
}
func main() {
helper()
}
```
To improve the section on Linters settings in the document, we'll expand with more detailed explanations and reinforce understanding through examples.
#### 10.7 Example: `dogsled`
**Incorrect Code Example**:
```go
func getValues() (int, int, int) {
return 1, 2, 3
}
func main() {
_, _, val := getValues()
fmt.Println(val) // Only interested in the third return value
}
```
**Explanation**: In the above code, we use two blank identifiers to ignore the first two return values. Excessive use of blank identifiers can make code reading difficult.
**Improved Code**:
Consider refactoring the function or the usage of return values to reduce the need for blank identifiers or explicitly comment why ignoring certain values is safe.
#### 10.8: `exhaustive`
**Incorrect Code Example**:
```go
type Fruit int
const (
Apple Fruit = iota
Banana
Orange
)
func getFruitName(f Fruit) string {
switch f {
case Apple:
return "Apple"
case Banana:
return "Banana"
// Missing handling for Orange
}
return "Unknown"
}
```
**Explanation**: In this code, the `switch` statement doesn't cover all possible values of the `Fruit` type; the case for `Orange` is missing.
**Improved Code**:
```go
func getFruitName(f Fruit) string {
switch f {
case Apple:
return "Apple"
case Banana:
return "Banana"
case Orange:
return "Orange"
}
return "Unknown"
}
```
By adding the missing `case`, we ensure the `switch` statement is exhaustive, handling every possible enum value.
#### 10.9 Optimization of Configuration Files and Application of Code Analysis Tools
Through these examples, we demonstrate how to improve code quality by identifying and fixing common coding issues. OpenIM's configuration files allow developers to customize linters' behavior according to project requirements, ensuring code compliance with predefined quality standards and style guidelines.
By employing these tools and configuration strategies, teams can reduce the number of bugs, enhance code maintainability, and facilitate efficient collaboration during code review processes.

@ -3,6 +3,7 @@ go 1.19
use (
.
./test/typecheck
./tools/codescan
./tools/changelog
./tools/component
./tools/data-conversion

@ -15,11 +15,9 @@
package api
import (
"github.com/gin-gonic/gin"
"github.com/OpenIMSDK/protocol/auth"
"github.com/OpenIMSDK/tools/a2r"
"github.com/gin-gonic/gin"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
)

@ -15,11 +15,9 @@
package api
import (
"github.com/gin-gonic/gin"
"github.com/OpenIMSDK/protocol/conversation"
"github.com/OpenIMSDK/tools/a2r"
"github.com/gin-gonic/gin"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
)

@ -15,24 +15,20 @@
package api
import (
"github.com/go-playground/validator/v10"
"github.com/OpenIMSDK/protocol/constant"
"github.com/go-playground/validator/v10"
)
// RequiredIf validates if the specified field is required based on the session type.
func RequiredIf(fl validator.FieldLevel) bool {
sessionType := fl.Parent().FieldByName("SessionType").Int()
switch sessionType {
case constant.SingleChatType, constant.NotificationChatType:
if fl.FieldName() == "RecvID" {
return fl.Field().String() != ""
}
return fl.FieldName() != "RecvID" || fl.Field().String() != ""
case constant.GroupChatType, constant.SuperGroupChatType:
if fl.FieldName() == "GroupID" {
return fl.Field().String() != ""
}
return fl.FieldName() != "GroupID" || fl.Field().String() != ""
default:
return true
}
return true
}

@ -17,10 +17,8 @@ package api
import (
"github.com/OpenIMSDK/protocol/friend"
"github.com/OpenIMSDK/tools/a2r"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/gin-gonic/gin"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
)
type FriendApi rpcclient.Friend

@ -17,10 +17,8 @@ package api
import (
"github.com/OpenIMSDK/protocol/group"
"github.com/OpenIMSDK/tools/a2r"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/gin-gonic/gin"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
)
type GroupApi rpcclient.Group

@ -27,11 +27,9 @@ import (
"github.com/gin-gonic/gin"
"github.com/go-playground/validator/v10"
"github.com/mitchellh/mapstructure"
"github.com/openimsdk/open-im-server/v3/pkg/apistruct"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/apistruct"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
)
@ -209,6 +207,7 @@ func (m *MessageApi) SendMessage(c *gin.Context) {
// Prepare the message request with additional required data.
sendMsgReq, err := m.getSendMsgReq(c, req.SendMsg)
if err != nil {
// Log and respond with an error if preparation fails.
apiresp.GinError(c, err)
return
}
@ -224,7 +223,6 @@ func (m *MessageApi) SendMessage(c *gin.Context) {
if err != nil {
// Set the status to failed and respond with an error if sending fails.
status = constant.MsgSendFailed
log.ZError(c, "send message err", err)
apiresp.GinError(c, err)
return
}
@ -238,7 +236,8 @@ func (m *MessageApi) SendMessage(c *gin.Context) {
})
if err != nil {
// Log the error if updating the status fails.
log.ZError(c, "SetSendMsgStatus failed", err)
apiresp.GinError(c, err)
return
}
// Respond with a success message and the response payload.
@ -297,7 +296,6 @@ func (m *MessageApi) BatchSendMsg(c *gin.Context) {
resp apistruct.BatchSendMsgResp
)
if err := c.BindJSON(&req); err != nil {
log.ZError(c, "BatchSendMsg BindJSON failed", err)
apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap())
return
}
@ -308,14 +306,12 @@ func (m *MessageApi) BatchSendMsg(c *gin.Context) {
}
var recvIDs []string
var err error
if req.IsSendAll {
pageNumber := 1
showNumber := 500
for {
recvIDsPart, err := m.userRpcClient.GetAllUserIDs(c, int32(pageNumber), int32(showNumber))
if err != nil {
log.ZError(c, "GetAllUserIDs failed", err)
apiresp.GinError(c, err)
return
}
@ -331,7 +327,6 @@ func (m *MessageApi) BatchSendMsg(c *gin.Context) {
log.ZDebug(c, "BatchSendMsg nums", "nums ", len(recvIDs))
sendMsgReq, err := m.getSendMsgReq(c, req.SendMsg)
if err != nil {
log.ZError(c, "decodeData failed", err)
apiresp.GinError(c, err)
return
}

@ -134,7 +134,7 @@ func Start(config *config.GlobalConfig, port int, proPort int) error {
}
func newGinRouter(disCov discoveryregistry.SvcDiscoveryRegistry, rdb redis.UniversalClient, config *config.GlobalConfig) *gin.Engine {
disCov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) // 默认RPC中间件
disCov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
gin.SetMode(gin.ReleaseMode)
r := gin.New()
if v, ok := binding.Validator.Engine().(*validator.Validate); ok {
@ -340,13 +340,11 @@ func GinParseToken(rdb redis.UniversalClient, config *config.GlobalConfig) gin.H
}
m, err := dataBase.GetTokensWithoutError(c, claims.UserID, claims.PlatformID)
if err != nil {
log.ZWarn(c, "cache get token error", errs.ErrTokenNotExist.Wrap())
apiresp.GinError(c, errs.ErrTokenNotExist.Wrap())
c.Abort()
return
}
if len(m) == 0 {
log.ZWarn(c, "cache do not exist token error", errs.ErrTokenNotExist.Wrap())
apiresp.GinError(c, errs.ErrTokenNotExist.Wrap())
c.Abort()
return
@ -355,12 +353,10 @@ func GinParseToken(rdb redis.UniversalClient, config *config.GlobalConfig) gin.H
switch v {
case constant.NormalToken:
case constant.KickedToken:
log.ZWarn(c, "cache kicked token error", errs.ErrTokenKicked.Wrap())
apiresp.GinError(c, errs.ErrTokenKicked.Wrap())
c.Abort()
return
default:
log.ZWarn(c, "cache unknown token error", errs.ErrTokenUnknown.Wrap())
apiresp.GinError(c, errs.ErrTokenUnknown.Wrap())
c.Abort()
return
@ -376,3 +372,10 @@ func GinParseToken(rdb redis.UniversalClient, config *config.GlobalConfig) gin.H
}
}
}
// // handleGinError logs and returns an error response through Gin context.
// func handleGinError(c *gin.Context, logMessage string, errType errs.CodeError, detail string) {
// wrappedErr := errType.Wrap(detail)
// apiresp.GinError(c, wrappedErr)
// c.Abort()
// }

@ -15,11 +15,9 @@
package api
import (
"github.com/gin-gonic/gin"
"github.com/OpenIMSDK/protocol/user"
"github.com/OpenIMSDK/tools/a2r"
"github.com/gin-gonic/gin"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
)

@ -25,7 +25,6 @@ import (
"github.com/OpenIMSDK/tools/a2r"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/mcontext"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
)

@ -67,7 +67,7 @@ func (u *UserApi) GetUsers(c *gin.Context) {
func (u *UserApi) GetUsersOnlineStatus(c *gin.Context) {
var req msggateway.GetUsersOnlineStatusReq
if err := c.BindJSON(&req); err != nil {
apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap())
apiresp.GinError(c, err)
return
}
conns, err := u.Discov.GetConns(c, u.Config.RpcRegisterName.OpenImMessageGatewayName)
@ -85,7 +85,7 @@ func (u *UserApi) GetUsersOnlineStatus(c *gin.Context) {
msgClient := msggateway.NewMsgGatewayClient(v)
reply, err := msgClient.GetUsersOnlineStatus(c, &req)
if err != nil {
log.ZWarn(c, "GetUsersOnlineStatus rpc err", err)
log.ZDebug(c, "GetUsersOnlineStatus rpc error", err)
parseError := apiresp.ParseError(err)
if parseError.ErrCode == errs.NoPermissionError {

@ -20,7 +20,6 @@ import (
"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/tools/mcontext"
cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/http"

@ -22,18 +22,15 @@ import (
"sync"
"sync/atomic"
"github.com/OpenIMSDK/tools/errs"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"google.golang.org/protobuf/proto"
"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/protocol/sdkws"
"github.com/OpenIMSDK/tools/apiresp"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/mcontext"
"github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"google.golang.org/protobuf/proto"
)
var (
@ -91,13 +88,7 @@ type Client struct {
// }
// ResetClient updates the client's state with new connection and context information.
func (c *Client) ResetClient(
ctx *UserConnContext,
conn LongConn,
isBackground, isCompress bool,
longConnServer LongConnServer,
token string,
) {
func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, isBackground, isCompress bool, longConnServer LongConnServer, token string) {
c.w = new(sync.Mutex)
c.conn = conn
c.PlatformID = utils.StringToInt(ctx.GetPlatformID())
@ -112,9 +103,11 @@ func (c *Client) ResetClient(
c.token = token
}
// pingHandler handles ping messages and sends pong responses.
func (c *Client) pingHandler(_ string) error {
_ = c.conn.SetReadDeadline(pongWait)
if err := c.conn.SetReadDeadline(pongWait); err != nil {
return err
}
return c.writePongMsg()
}
@ -141,7 +134,8 @@ func (c *Client) readMessage() {
}
log.ZDebug(c.ctx, "readMessage", "messageType", messageType)
if c.closed.Load() { // 连接刚置位已经关闭,但是协程还没退出的场景
if c.closed.Load() {
// The scenario where the connection has just been closed, but the coroutine has not exited
c.closedErr = ErrConnClosed
return
}
@ -185,11 +179,11 @@ func (c *Client) handleMessage(message []byte) error {
err := c.longConnServer.Decode(message, binaryReq)
if err != nil {
return errs.Wrap(err)
return err
}
if err := c.longConnServer.Validate(binaryReq); err != nil {
return errs.Wrap(err)
return err
}
if binaryReq.SendID != c.UserID {
@ -239,7 +233,7 @@ func (c *Client) setAppBackgroundStatus(ctx context.Context, req *Req) ([]byte,
}
c.IsBackground = isBackground
// todo callback
// TODO: callback
return resp, nil
}
@ -273,7 +267,7 @@ func (c *Client) replyMessage(ctx context.Context, binaryReq *Req, err error, re
}
if binaryReq.ReqIdentifier == WsLogoutMsg {
return errors.New("user logout")
return errs.Wrap(errors.New("user logout"))
}
return nil
}
@ -316,17 +310,21 @@ func (c *Client) writeBinaryMsg(resp Resp) error {
encodedBuf, err := c.longConnServer.Encode(resp)
if err != nil {
return errs.Wrap(err)
return err
}
c.w.Lock()
defer c.w.Unlock()
_ = c.conn.SetWriteDeadline(writeWait)
err = c.conn.SetWriteDeadline(writeWait)
if err != nil {
return err
}
if c.IsCompress {
resultBuf, compressErr := c.longConnServer.CompressWithPool(encodedBuf)
if compressErr != nil {
return errs.Wrap(compressErr)
return compressErr
}
return c.conn.WriteMessage(MessageBinary, resultBuf)
}
@ -344,7 +342,7 @@ func (c *Client) writePongMsg() error {
err := c.conn.SetWriteDeadline(writeWait)
if err != nil {
return errs.Wrap(err)
return err
}
return c.conn.WriteMessage(PongMessage, nil)

@ -17,7 +17,6 @@ package msggateway
import (
"bytes"
"compress/gzip"
"errors"
"io"
"sync"
@ -46,12 +45,15 @@ func NewGzipCompressor() *GzipCompressor {
func (g *GzipCompressor) Compress(rawData []byte) ([]byte, error) {
gzipBuffer := bytes.Buffer{}
gz := gzip.NewWriter(&gzipBuffer)
if _, err := gz.Write(rawData); err != nil {
return nil, errs.Wrap(err)
return nil, errs.Wrap(err, "GzipCompressor.Compress: writing to gzip writer failed")
}
if err := gz.Close(); err != nil {
return nil, errs.Wrap(err)
return nil, errs.Wrap(err, "GzipCompressor.Compress: closing gzip writer failed")
}
return gzipBuffer.Bytes(), nil
}
@ -63,10 +65,10 @@ func (g *GzipCompressor) CompressWithPool(rawData []byte) ([]byte, error) {
gz.Reset(&gzipBuffer)
if _, err := gz.Write(rawData); err != nil {
return nil, errs.Wrap(err)
return nil, errs.Wrap(err, "GzipCompressor.CompressWithPool: error writing data")
}
if err := gz.Close(); err != nil {
return nil, errs.Wrap(err)
return nil, errs.Wrap(err, "GzipCompressor.CompressWithPool: error closing gzip writer")
}
return gzipBuffer.Bytes(), nil
}
@ -75,32 +77,36 @@ func (g *GzipCompressor) DeCompress(compressedData []byte) ([]byte, error) {
buff := bytes.NewBuffer(compressedData)
reader, err := gzip.NewReader(buff)
if err != nil {
return nil, errs.Wrap(err, "NewReader failed")
return nil, errs.Wrap(err, "GzipCompressor.DeCompress: NewReader creation failed")
}
compressedData, err = io.ReadAll(reader)
decompressedData, err := io.ReadAll(reader)
if err != nil {
return nil, errs.Wrap(err, "ReadAll failed")
return nil, errs.Wrap(err, "GzipCompressor.DeCompress: reading from gzip reader failed")
}
_ = reader.Close()
return compressedData, nil
if err = reader.Close(); err != nil {
// Even if closing the reader fails, we've successfully read the data,
// so we return the decompressed data and an error indicating the close failure.
return decompressedData, errs.Wrap(err, "GzipCompressor.DeCompress: closing gzip reader failed")
}
return decompressedData, nil
}
func (g *GzipCompressor) DecompressWithPool(compressedData []byte) ([]byte, error) {
reader := gzipReaderPool.Get().(*gzip.Reader)
if reader == nil {
return nil, errs.Wrap(errors.New("NewReader failed"))
}
defer gzipReaderPool.Put(reader)
err := reader.Reset(bytes.NewReader(compressedData))
if err != nil {
return nil, errs.Wrap(err, "NewReader failed")
return nil, errs.Wrap(err, "GzipCompressor.DecompressWithPool: resetting gzip reader failed")
}
compressedData, err = io.ReadAll(reader)
decompressedData, err := io.ReadAll(reader)
if err != nil {
return nil, errs.Wrap(err, "ReadAll failed")
return nil, errs.Wrap(err, "GzipCompressor.DecompressWithPool: reading from pooled gzip reader failed")
}
if err = reader.Close(); err != nil {
// Similar to DeCompress, return the data and error for close failure.
return decompressedData, errs.Wrap(err, "GzipCompressor.DecompressWithPool: closing pooled gzip reader failed")
}
_ = reader.Close()
return compressedData, nil
return decompressedData, nil
}

@ -37,10 +37,16 @@ func TestCompressDecompress(t *testing.T) {
// compress
dest, err := compressor.CompressWithPool(src)
if err != nil {
t.Log(err)
}
assert.Equal(t, nil, err)
// decompress
res, err := compressor.DecompressWithPool(dest)
if err != nil {
t.Log(err)
}
assert.Equal(t, nil, err)
// check
@ -60,10 +66,16 @@ func TestCompressDecompressWithConcurrency(t *testing.T) {
// compress
dest, err := compressor.CompressWithPool(src)
if err != nil {
t.Log(err)
}
assert.Equal(t, nil, err)
// decompress
res, err := compressor.DecompressWithPool(dest)
if err != nil {
t.Log(err)
}
assert.Equal(t, nil, err)
// check
@ -99,6 +111,7 @@ func BenchmarkDecompress(b *testing.B) {
compressor := NewGzipCompressor()
comdata, err := compressor.Compress(src)
assert.Equal(b, nil, err)
for i := 0; i < b.N; i++ {

@ -37,7 +37,7 @@ func (g *GobEncoder) Encode(data any) ([]byte, error) {
enc := gob.NewEncoder(&buff)
err := enc.Encode(data)
if err != nil {
return nil, err
return nil, errs.Wrap(err, "GobEncoder.Encode failed")
}
return buff.Bytes(), nil
}
@ -47,7 +47,7 @@ func (g *GobEncoder) Decode(encodeData []byte, decodeData any) error {
dec := gob.NewDecoder(buff)
err := dec.Decode(decodeData)
if err != nil {
return errs.Wrap(err)
return errs.Wrap(err, "GobEncoder.Decode failed")
}
return nil
}

@ -17,8 +17,6 @@ package msggateway
import (
"context"
"google.golang.org/grpc"
"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/protocol/msggateway"
"github.com/OpenIMSDK/tools/discoveryregistry"
@ -26,11 +24,11 @@ import (
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/mcontext"
"github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
"google.golang.org/grpc"
)
func (s *Server) InitServer(config *config.GlobalConfig, disCov discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {

@ -23,14 +23,7 @@ import (
// RunWsAndServer run ws server.
func RunWsAndServer(conf *config.GlobalConfig, rpcPort, wsPort, prometheusPort int) error {
fmt.Println(
"start rpc/msg_gateway server, port: ",
rpcPort,
wsPort,
prometheusPort,
", OpenIM version: ",
config.Version,
)
fmt.Println("start rpc/msg_gateway server, port: ", rpcPort, wsPort, prometheusPort, ", OpenIM version: ", config.Version)
longServer, err := NewWsServer(
conf,
WithPort(wsPort),

@ -15,9 +15,11 @@
package msggateway
import (
"errors"
"net/http"
"time"
"github.com/OpenIMSDK/tools/errs"
"github.com/gorilla/websocket"
)
@ -72,7 +74,8 @@ func (d *GWebSocket) GenerateLongConn(w http.ResponseWriter, r *http.Request) er
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return err
// The upgrader.Upgrade method usually returns enough error messages to diagnose problems that may occur during the upgrade
return errs.Wrap(err, "GenerateLongConn: WebSocket upgrade failed")
}
d.conn = conn
return nil
@ -96,7 +99,16 @@ func (d *GWebSocket) SetReadDeadline(timeout time.Duration) error {
}
func (d *GWebSocket) SetWriteDeadline(timeout time.Duration) error {
return d.conn.SetWriteDeadline(time.Now().Add(timeout))
// TODO add error
if timeout <= 0 {
return errs.Wrap(errors.New("timeout must be greater than 0"))
}
// TODO SetWriteDeadline Future add error handling
if err := d.conn.SetWriteDeadline(time.Now().Add(timeout)); err != nil {
return errs.Wrap(err, "GWebSocket.SetWriteDeadline failed")
}
return nil
}
func (d *GWebSocket) Dial(urlStr string, requestHeader http.Header) (*http.Response, error) {

@ -19,17 +19,15 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"sync"
"github.com/OpenIMSDK/protocol/push"
"github.com/OpenIMSDK/tools/discoveryregistry"
"github.com/go-playground/validator/v10"
"google.golang.org/protobuf/proto"
"github.com/OpenIMSDK/protocol/msg"
"github.com/OpenIMSDK/protocol/push"
"github.com/OpenIMSDK/protocol/sdkws"
"github.com/OpenIMSDK/tools/discoveryregistry"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/utils"
"github.com/go-playground/validator/v10"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"google.golang.org/protobuf/proto"
)
type Req struct {
@ -120,10 +118,10 @@ func NewGrpcHandler(validate *validator.Validate, client discoveryregistry.SvcDi
func (g GrpcHandler) GetSeq(context context.Context, data *Req) ([]byte, error) {
req := sdkws.GetMaxSeqReq{}
if err := proto.Unmarshal(data.Data, &req); err != nil {
return nil, err
return nil, errs.Wrap(err, "GetSeq: error unmarshaling request")
}
if err := g.validate.Struct(&req); err != nil {
return nil, err
return nil, errs.Wrap(err, "GetSeq: validation failed")
}
resp, err := g.msgRpcClient.GetMaxSeq(context, &req)
if err != nil {
@ -131,28 +129,37 @@ func (g GrpcHandler) GetSeq(context context.Context, data *Req) ([]byte, error)
}
c, err := proto.Marshal(resp)
if err != nil {
return nil, err
return nil, errs.Wrap(err, "GetSeq: error marshaling response")
}
return c, nil
}
func (g GrpcHandler) SendMessage(context context.Context, data *Req) ([]byte, error) {
msgData := sdkws.MsgData{}
// SendMessage handles the sending of messages through gRPC. It unmarshals the request data,
// validates the message, and then sends it using the message RPC client.
func (g GrpcHandler) SendMessage(ctx context.Context, data *Req) ([]byte, error) {
// Unmarshal the message data from the request.
var msgData sdkws.MsgData
if err := proto.Unmarshal(data.Data, &msgData); err != nil {
return nil, err
return nil, errs.Wrap(err, "error unmarshalling message data")
}
// Validate the message data structure.
if err := g.validate.Struct(&msgData); err != nil {
return nil, err
return nil, errs.Wrap(err, "message data validation failed")
}
req := msg.SendMsgReq{MsgData: &msgData}
resp, err := g.msgRpcClient.SendMsg(context, &req)
resp, err := g.msgRpcClient.SendMsg(ctx, &req)
if err != nil {
return nil, err
}
c, err := proto.Marshal(resp)
if err != nil {
return nil, err
return nil, errs.Wrap(err, "error marshaling response")
}
return c, nil
}
@ -163,7 +170,7 @@ func (g GrpcHandler) SendSignalMessage(context context.Context, data *Req) ([]by
}
c, err := proto.Marshal(resp)
if err != nil {
return nil, err
return nil, errs.Wrap(err, "error marshaling response")
}
return c, nil
}
@ -171,10 +178,10 @@ func (g GrpcHandler) SendSignalMessage(context context.Context, data *Req) ([]by
func (g GrpcHandler) PullMessageBySeqList(context context.Context, data *Req) ([]byte, error) {
req := sdkws.PullMessageBySeqsReq{}
if err := proto.Unmarshal(data.Data, &req); err != nil {
return nil, err
return nil, errs.Wrap(err, "error unmarshaling request")
}
if err := g.validate.Struct(data); err != nil {
return nil, err
return nil, errs.Wrap(err, "validation failed")
}
resp, err := g.msgRpcClient.PullMessageBySeqList(context, &req)
if err != nil {
@ -182,7 +189,7 @@ func (g GrpcHandler) PullMessageBySeqList(context context.Context, data *Req) ([
}
c, err := proto.Marshal(resp)
if err != nil {
return nil, err
return nil, errs.Wrap(err, "error marshaling response")
}
return c, nil
}
@ -190,7 +197,7 @@ func (g GrpcHandler) PullMessageBySeqList(context context.Context, data *Req) ([
func (g GrpcHandler) UserLogout(context context.Context, data *Req) ([]byte, error) {
req := push.DelUserPushTokenReq{}
if err := proto.Unmarshal(data.Data, &req); err != nil {
return nil, err
return nil, errs.Wrap(err, "error unmarshaling request")
}
resp, err := g.pushClient.DelUserPushToken(context, &req)
if err != nil {
@ -198,7 +205,7 @@ func (g GrpcHandler) UserLogout(context context.Context, data *Req) ([]byte, err
}
c, err := proto.Marshal(resp)
if err != nil {
return nil, err
return nil, errs.Wrap(err, "error marshaling response")
}
return c, nil
}
@ -206,10 +213,10 @@ func (g GrpcHandler) UserLogout(context context.Context, data *Req) ([]byte, err
func (g GrpcHandler) SetUserDeviceBackground(_ context.Context, data *Req) ([]byte, bool, error) {
req := sdkws.SetAppBackgroundStatusReq{}
if err := proto.Unmarshal(data.Data, &req); err != nil {
return nil, false, err
return nil, false, errs.Wrap(err, "error unmarshaling request")
}
if err := g.validate.Struct(data); err != nil {
return nil, false, err
return nil, false, errs.Wrap(err, "validation failed")
}
return nil, req.IsBackground, nil
}

@ -25,24 +25,21 @@ import (
"sync/atomic"
"time"
"github.com/OpenIMSDK/tools/apiresp"
"github.com/go-playground/validator/v10"
"github.com/redis/go-redis/v9"
"golang.org/x/sync/errgroup"
"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/protocol/msggateway"
"github.com/OpenIMSDK/tools/apiresp"
"github.com/OpenIMSDK/tools/discoveryregistry"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/utils"
"github.com/go-playground/validator/v10"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/redis/go-redis/v9"
"golang.org/x/sync/errgroup"
)
type LongConnServer interface {
@ -89,6 +86,7 @@ type WsServer struct {
Encoder
MessageHandler
}
type kickHandler struct {
clientOK bool
oldClients []*Client
@ -130,7 +128,9 @@ func (ws *WsServer) UnRegister(c *Client) {
}
func (ws *WsServer) Validate(s any) error {
//?question?
if s == nil {
return errs.Wrap(errors.New("input cannot be nil"))
}
return nil
}
@ -278,7 +278,7 @@ func (ws *WsServer) registerClient(client *Client) {
log.ZDebug(client.ctx, "user exist", "userID", client.UserID, "platformID", client.PlatformID)
if clientOK {
ws.clients.Set(client.UserID, client)
// 已经有同平台的连接存在
// There is already a connection to the platform
log.ZInfo(client.ctx, "repeat login", "userID", client.UserID, "platformID", client.PlatformID, "old remote addr", getRemoteAdders(oldClients))
ws.onlineUserConnNum.Add(1)
} else {

@ -19,15 +19,15 @@ import "time"
type (
Option func(opt *configs)
configs struct {
// 长连接监听端口
// Long connection listening port
port int
// 长连接允许最大链接数
// Maximum number of connections allowed for long connection
maxConnNum int64
// 连接握手超时时间
// Connection handshake timeout
handshakeTimeout time.Duration
// 允许消息最大长度
// Maximum length allowed for messages
messageMaxMsgLength int
// websocket write buffer, default: 4096, 4kb.
// Websocket write buffer, default: 4096, 4kb.
writeBufferSize int
}
)

@ -18,23 +18,13 @@ import (
"context"
"errors"
"fmt"
"github.com/OpenIMSDK/tools/errs"
util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
"net/http"
"os"
"os/signal"
"syscall"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/mw"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
@ -42,11 +32,22 @@ import (
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
type MsgTransfer struct {
historyCH *OnlineHistoryRedisConsumerHandler // 这个消费者聚合消息, 订阅的topicws2ms_chat, 修改通知发往msg_to_modify topic, 消息存入redis后Incr Redis, 再发消息到ms2pschat topic推送 发消息到msg_to_mongo topic持久化
historyMongoCH *OnlineHistoryMongoConsumerHandler // mongoDB批量插入, 成功后删除redis中消息以及处理删除通知消息删除的 订阅的topic: msg_to_mongo
// This consumer aggregated messages, subscribed to the topic:ws2ms_chat,
// the modification notification is sent to msg_to_modify topic, the message is stored in redis, Incr Redis,
// and then the message is sent to ms2pschat topic for push, and the message is sent to msg_to_mongo topic for persistence
historyCH *OnlineHistoryRedisConsumerHandler
historyMongoCH *OnlineHistoryMongoConsumerHandler
// mongoDB batch insert, delete messages in redis after success,
// and handle the deletion notification message deleted subscriptions topic: msg_to_mongo
ctx context.Context
cancel context.CancelFunc
config *config.GlobalConfig
@ -74,6 +75,7 @@ func StartTransfer(config *config.GlobalConfig, prometheusPort int) error {
if err := client.CreateRpcRootNodes(config.GetServiceNames()); err != nil {
return err
}
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
msgModel := cache.NewMsgCacheModel(rdb, config)
msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase(config.Mongo.Database))

@ -22,24 +22,20 @@ import (
"sync/atomic"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/OpenIMSDK/tools/errs"
"github.com/IBM/sarama"
"github.com/go-redis/redis"
"google.golang.org/protobuf/proto"
"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/protocol/sdkws"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/mcontext"
"github.com/OpenIMSDK/tools/utils"
"github.com/go-redis/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/kafka"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"google.golang.org/protobuf/proto"
)
const (
@ -174,21 +170,13 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
notStorageNotificationList,
)
if err := och.msgDatabase.MsgToModifyMQ(ctx, msgChannelValue.uniqueKey, conversationIDNotification, modifyMsgList); err != nil {
log.ZError(
ctx,
"msg to modify mq error",
err,
"uniqueKey",
msgChannelValue.uniqueKey,
"modifyMsgList",
modifyMsgList,
)
log.ZError(ctx, "msg to modify mq error", err, "uniqueKey", msgChannelValue.uniqueKey, "modifyMsgList", modifyMsgList)
}
}
}
}
// 获取消息/通知 存储的消息列表, 不存储并且推送的消息列表,.
// Get messages/notifications stored message list, not stored and pushed message list.
func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(
totalMsgs []*ContextMsg,
) (storageMsgList, notStorageMsgList, storageNotificatoinList, notStorageNotificationList, modifyMsgList []*sdkws.MsgData) {
@ -209,7 +197,7 @@ func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(
// clone msg from notificationMsg
if options.IsSendMsg() {
msg := proto.Clone(v.message).(*sdkws.MsgData)
// 消息
// message
if v.message.Options != nil {
msg.Options = msgprocessor.NewMsgOptions()
}
@ -284,7 +272,8 @@ func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(
msgs []*sdkws.MsgData,
) {
for _, v := range msgs {
och.msgDatabase.MsgToPushMQ(ctx, key, conversationID, v)
och.msgDatabase.MsgToPushMQ(ctx, key, conversationID, v) // nolint: errcheck
}
}

@ -18,15 +18,13 @@ import (
"context"
"github.com/IBM/sarama"
"google.golang.org/protobuf/proto"
pbmsg "github.com/OpenIMSDK/protocol/msg"
"github.com/OpenIMSDK/tools/log"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
kfk "github.com/openimsdk/open-im-server/v3/pkg/common/kafka"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"google.golang.org/protobuf/proto"
)
type OnlineHistoryMongoConsumerHandler struct {

@ -21,7 +21,6 @@ import (
"github.com/OpenIMSDK/protocol/sdkws"
"github.com/OpenIMSDK/tools/mcontext"
"github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/http"
@ -56,10 +55,12 @@ func callbackOfflinePush(
AtUserIDs: msg.AtUserIDList,
Content: GetContent(msg),
}
resp := &callbackstruct.CallbackBeforePushResp{}
if err := http.CallBackPostReturn(ctx, config.Callback.CallbackUrl, req, resp, config.Callback.CallbackOfflinePush); err != nil {
return err
}
if len(resp.UserIDs) != 0 {
*offlinePushUserIDs = resp.UserIDs
}

@ -39,6 +39,8 @@ type Fcm struct {
cache cache.MsgModel
}
// NewClient initializes a new FCM client using the Firebase Admin SDK.
// It requires the FCM service account credentials file located within the project's configuration directory.
func NewClient(globalConfig *config.GlobalConfig, cache cache.MsgModel) *Fcm {
projectRoot := config.GetProjectRoot()
credentialsFilePath := filepath.Join(projectRoot, "config", globalConfig.Push.Fcm.ServiceAccount)
@ -47,12 +49,12 @@ func NewClient(globalConfig *config.GlobalConfig, cache cache.MsgModel) *Fcm {
if err != nil {
return nil
}
ctx := context.Background()
fcmMsgClient, err := fcmApp.Messaging(ctx)
if err != nil {
return nil
}
return &Fcm{fcmMsgCli: fcmMsgClient, cache: cache}
}
@ -125,7 +127,6 @@ func (f *Fcm) Push(ctx context.Context, userIDs []string, title, content string,
response, err := f.fcmMsgCli.SendAll(ctx, messages)
if err != nil {
Fail = Fail + messageCount
// log.Info(operationID, "some token push err", err.Error(), messageCount)
} else {
Success = Success + response.SuccessCount
Fail = Fail + response.FailureCount

@ -23,17 +23,15 @@ import (
"sync"
"time"
"github.com/redis/go-redis/v9"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/mcontext"
"github.com/OpenIMSDK/tools/utils/splitter"
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
http2 "github.com/openimsdk/open-im-server/v3/pkg/common/http"
"github.com/redis/go-redis/v9"
)
var (

@ -18,16 +18,14 @@ import (
"context"
"github.com/IBM/sarama"
"google.golang.org/protobuf/proto"
"github.com/OpenIMSDK/protocol/constant"
pbchat "github.com/OpenIMSDK/protocol/msg"
pbpush "github.com/OpenIMSDK/protocol/push"
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
kfk "github.com/openimsdk/open-im-server/v3/pkg/common/kafka"
"google.golang.org/protobuf/proto"
)
type ConsumerHandler struct {

@ -18,19 +18,16 @@ import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/OpenIMSDK/tools/utils"
"google.golang.org/grpc"
"github.com/OpenIMSDK/protocol/constant"
pbpush "github.com/OpenIMSDK/protocol/push"
"github.com/OpenIMSDK/tools/discoveryregistry"
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/localcache"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"google.golang.org/grpc"
)
type pushServer struct {

@ -20,10 +20,6 @@ import (
"errors"
"sync"
"google.golang.org/grpc"
"golang.org/x/sync/errgroup"
"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/protocol/conversation"
"github.com/OpenIMSDK/protocol/msggateway"
@ -32,7 +28,6 @@ import (
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/mcontext"
"github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush"
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/dummy"
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/fcm"
@ -45,6 +40,8 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
)
type Pusher struct {
@ -231,7 +228,8 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
}(groupID, kickedUsers)
pushToUserIDs = append(pushToUserIDs, kickedUsers...)
case constant.GroupDismissedNotification:
if msgprocessor.IsNotification(msgprocessor.GetConversationIDByMsg(msg)) { // 消息先到,通知后到
// Messages arrive first, notifications arrive later
if msgprocessor.IsNotification(msgprocessor.GetConversationIDByMsg(msg)) {
var tips sdkws.GroupDismissedTips
if p.UnmarshalNotificationElem(msg.Content, &tips) != nil {
return err

@ -17,10 +17,6 @@ package auth
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"google.golang.org/grpc"
pbauth "github.com/OpenIMSDK/protocol/auth"
"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/protocol/msggateway"
@ -29,12 +25,13 @@ import (
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/mcontext"
"github.com/OpenIMSDK/tools/tokenverify"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"google.golang.org/grpc"
)
type authServer struct {

@ -20,28 +20,23 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"sort"
"github.com/OpenIMSDK/protocol/sdkws"
"github.com/OpenIMSDK/tools/tx"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation"
"google.golang.org/grpc"
"github.com/OpenIMSDK/protocol/constant"
pbconversation "github.com/OpenIMSDK/protocol/conversation"
"github.com/OpenIMSDK/protocol/sdkws"
"github.com/OpenIMSDK/tools/discoveryregistry"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/tx"
"github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo"
tablerelation "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification"
"google.golang.org/grpc"
)
type conversationServer struct {
@ -313,7 +308,7 @@ func (c *conversationServer) SetConversations(ctx context.Context,
unequal++
}
}
if err := c.conversationDatabase.SetUsersConversationFiledTx(ctx, req.UserIDs, &conversation, m); err != nil {
if err := c.conversationDatabase.SetUsersConversationFieldTx(ctx, req.UserIDs, &conversation, m); err != nil {
return nil, err
}
if unequal > 0 {
@ -324,7 +319,7 @@ func (c *conversationServer) SetConversations(ctx context.Context,
return &pbconversation.SetConversationsResp{}, nil
}
// 获取超级大群开启免打扰的用户ID.
// Get user IDs with "Do Not Disturb" enabled in super large groups.
func (c *conversationServer) GetRecvMsgNotNotifyUserIDs(ctx context.Context, req *pbconversation.GetRecvMsgNotNotifyUserIDsReq) (*pbconversation.GetRecvMsgNotNotifyUserIDsResp, error) {
//userIDs, err := c.conversationDatabase.FindRecvMsgNotNotifyUserIDs(ctx, req.GroupID)
//if err != nil {
@ -381,7 +376,7 @@ func (c *conversationServer) CreateGroupChatConversations(ctx context.Context, r
}
func (c *conversationServer) SetConversationMaxSeq(ctx context.Context, req *pbconversation.SetConversationMaxSeqReq) (*pbconversation.SetConversationMaxSeqResp, error) {
if err := c.conversationDatabase.UpdateUsersConversationFiled(ctx, req.OwnerUserID, req.ConversationID,
if err := c.conversationDatabase.UpdateUsersConversationField(ctx, req.OwnerUserID, req.ConversationID,
map[string]any{"max_seq": req.MaxSeq}); err != nil {
return nil, err
}

@ -18,17 +18,15 @@ import (
"context"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
pbfriend "github.com/OpenIMSDK/protocol/friend"
"github.com/OpenIMSDK/tools/mcontext"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
)
func (s *friendServer) GetPaginationBlacks(ctx context.Context, req *pbfriend.GetPaginationBlacksReq) (resp *pbfriend.GetPaginationBlacksResp, err error) {
if accessErr := s.userRpcClient.Access(ctx, req.UserID); accessErr != nil {
if err := s.userRpcClient.Access(ctx, req.UserID); err != nil {
return nil, err
}
total, blacks, err := s.blackDatabase.FindOwnerBlacks(ctx, req.UserID, req.Pagination)

@ -19,7 +19,6 @@ import (
pbfriend "github.com/OpenIMSDK/protocol/friend"
"github.com/OpenIMSDK/tools/utils"
cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/http"

@ -118,7 +118,7 @@ func Start(config *config.GlobalConfig, client registry.SvcDiscoveryRegistry, se
// ok.
func (s *friendServer) ApplyToAddFriend(ctx context.Context, req *pbfriend.ApplyToAddFriendReq) (resp *pbfriend.ApplyToAddFriendResp, err error) {
resp = &pbfriend.ApplyToAddFriendResp{}
if accessErr := authverify.CheckAccessV3(ctx, req.FromUserID, s.config); accessErr != nil {
if err := authverify.CheckAccessV3(ctx, req.FromUserID,s.config); err != nil {
return nil, err
}
if req.ToUserID == req.FromUserID {
@ -127,7 +127,7 @@ func (s *friendServer) ApplyToAddFriend(ctx context.Context, req *pbfriend.Apply
if err = CallbackBeforeAddFriend(ctx, s.config, req); err != nil && err != errs.ErrCallbackContinue {
return nil, err
}
if _, getUserInfoErr := s.userRpcClient.GetUsersInfoMap(ctx, []string{req.ToUserID, req.FromUserID}); getUserInfoErr != nil {
if _, err := s.userRpcClient.GetUsersInfoMap(ctx, []string{req.ToUserID, req.FromUserID}); err != nil {
return nil, err
}
in1, in2, err := s.friendDatabase.CheckIn(ctx, req.FromUserID, req.ToUserID)
@ -222,7 +222,7 @@ func (s *friendServer) RespondFriendApply(ctx context.Context, req *pbfriend.Res
func (s *friendServer) DeleteFriend(ctx context.Context, req *pbfriend.DeleteFriendReq) (resp *pbfriend.DeleteFriendResp, err error) {
defer log.ZInfo(ctx, utils.GetFuncName()+" Return")
resp = &pbfriend.DeleteFriendResp{}
if accessErr := s.userRpcClient.Access(ctx, req.OwnerUserID); accessErr != nil {
if err := s.userRpcClient.Access(ctx, req.OwnerUserID); err != nil {
return nil, err
}
_, err = s.friendDatabase.FindFriendsWithError(ctx, req.OwnerUserID, []string{req.FriendUserID})
@ -247,7 +247,7 @@ func (s *friendServer) SetFriendRemark(ctx context.Context, req *pbfriend.SetFri
return nil, err
}
resp = &pbfriend.SetFriendRemarkResp{}
if accessErr := s.userRpcClient.Access(ctx, req.OwnerUserID); accessErr != nil {
if err := s.userRpcClient.Access(ctx, req.OwnerUserID); err != nil {
return nil, err
}
_, err = s.friendDatabase.FindFriendsWithError(ctx, req.OwnerUserID, []string{req.FriendUserID})
@ -281,6 +281,7 @@ func (s *friendServer) GetDesignatedFriends(ctx context.Context, req *pbfriend.G
return resp, nil
}
// Get the list of friend requests sent out proactively.
func (s *friendServer) GetDesignatedFriendsApply(ctx context.Context,
req *pbfriend.GetDesignatedFriendsApplyReq) (resp *pbfriend.GetDesignatedFriendsApplyResp, err error) {
friendRequests, err := s.friendDatabase.FindBothFriendRequests(ctx, req.FromUserID, req.ToUserID)
@ -295,10 +296,10 @@ func (s *friendServer) GetDesignatedFriendsApply(ctx context.Context,
return resp, nil
}
// ok 获取接收到的好友申请(即别人主动申请的).
// Get received friend requests (i.e., those initiated by others).
func (s *friendServer) GetPaginationFriendsApplyTo(ctx context.Context, req *pbfriend.GetPaginationFriendsApplyToReq) (resp *pbfriend.GetPaginationFriendsApplyToResp, err error) {
defer log.ZInfo(ctx, utils.GetFuncName()+" Return")
if accessErr := s.userRpcClient.Access(ctx, req.UserID); accessErr != nil {
if err := s.userRpcClient.Access(ctx, req.UserID); err != nil {
return nil, err
}
total, friendRequests, err := s.friendDatabase.PageFriendRequestToMe(ctx, req.UserID, req.Pagination)
@ -314,11 +315,10 @@ func (s *friendServer) GetPaginationFriendsApplyTo(ctx context.Context, req *pbf
return resp, nil
}
// ok 获取主动发出去的好友申请列表.
func (s *friendServer) GetPaginationFriendsApplyFrom(ctx context.Context, req *pbfriend.GetPaginationFriendsApplyFromReq) (resp *pbfriend.GetPaginationFriendsApplyFromResp, err error) {
defer log.ZInfo(ctx, utils.GetFuncName()+" Return")
resp = &pbfriend.GetPaginationFriendsApplyFromResp{}
if accessErr := s.userRpcClient.Access(ctx, req.UserID); accessErr != nil {
if err := s.userRpcClient.Access(ctx, req.UserID); err != nil {
return nil, err
}
total, friendRequests, err := s.friendDatabase.PageFriendRequestFromMe(ctx, req.UserID, req.Pagination)
@ -346,7 +346,7 @@ func (s *friendServer) IsFriend(ctx context.Context, req *pbfriend.IsFriendReq)
func (s *friendServer) GetPaginationFriends(ctx context.Context, req *pbfriend.GetPaginationFriendsReq) (resp *pbfriend.GetPaginationFriendsResp, err error) {
defer log.ZInfo(ctx, utils.GetFuncName()+" Return")
if accessErr := s.userRpcClient.Access(ctx, req.UserID); accessErr != nil {
if err := s.userRpcClient.Access(ctx, req.UserID); err != nil {
return nil, err
}
total, friends, err := s.friendDatabase.PageOwnerFriends(ctx, req.UserID, req.Pagination)
@ -364,7 +364,7 @@ func (s *friendServer) GetPaginationFriends(ctx context.Context, req *pbfriend.G
func (s *friendServer) GetFriendIDs(ctx context.Context, req *pbfriend.GetFriendIDsReq) (resp *pbfriend.GetFriendIDsResp, err error) {
defer log.ZInfo(ctx, utils.GetFuncName()+" Return")
if accessErr := s.userRpcClient.Access(ctx, req.UserID); accessErr != nil {
if err := s.userRpcClient.Access(ctx, req.UserID); err != nil {
return nil, err
}
resp = &pbfriend.GetFriendIDsResp{}

@ -18,7 +18,6 @@ import (
"context"
pbgroup "github.com/OpenIMSDK/protocol/group"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
)

@ -18,16 +18,13 @@ import (
"context"
"time"
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/protocol/group"
pbgroup "github.com/OpenIMSDK/protocol/group"
"github.com/OpenIMSDK/protocol/wrapperspb"
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/mcontext"
"github.com/OpenIMSDK/tools/utils"
pbgroup "github.com/OpenIMSDK/protocol/group"
"github.com/openimsdk/open-im-server/v3/pkg/apistruct"
"github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"

@ -16,7 +16,6 @@ package group
import (
"github.com/OpenIMSDK/protocol/sdkws"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
)

@ -18,10 +18,9 @@ import (
"context"
"time"
"github.com/OpenIMSDK/tools/mcontext"
pbgroup "github.com/OpenIMSDK/protocol/group"
"github.com/OpenIMSDK/protocol/sdkws"
"github.com/OpenIMSDK/tools/mcontext"
)
func UpdateGroupInfoMap(ctx context.Context, group *sdkws.GroupInfoForSet) map[string]any {

@ -24,40 +24,31 @@ import (
"strings"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
pbconversation "github.com/OpenIMSDK/protocol/conversation"
"github.com/OpenIMSDK/protocol/wrapperspb"
"github.com/OpenIMSDK/tools/tx"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient/grouphash"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification"
"github.com/OpenIMSDK/tools/mw/specialerror"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"google.golang.org/grpc"
"github.com/OpenIMSDK/protocol/constant"
pbconversation "github.com/OpenIMSDK/protocol/conversation"
pbgroup "github.com/OpenIMSDK/protocol/group"
"github.com/OpenIMSDK/protocol/sdkws"
"github.com/OpenIMSDK/protocol/wrapperspb"
"github.com/OpenIMSDK/tools/discoveryregistry"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/mcontext"
"github.com/OpenIMSDK/tools/mw/specialerror"
"github.com/OpenIMSDK/tools/tx"
"github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo"
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient/grouphash"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification"
"google.golang.org/grpc"
)
func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
@ -540,7 +531,7 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbgroup.KickGrou
if err != nil {
return nil, err
}
if populateErr := s.PopulateGroupMember(ctx, members...); populateErr != nil {
if err := s.PopulateGroupMember(ctx, members...); err != nil {
return nil, err
}
memberMap := make(map[string]*relationtb.GroupMemberModel)
@ -767,11 +758,11 @@ func (s *groupServer) GroupApplicationResponse(ctx context.Context, req *pbgroup
}
var inGroup bool
if _, takeErr := s.db.TakeGroupMember(ctx, req.GroupID, req.FromUserID); takeErr == nil {
inGroup = true // 已经在群里了
inGroup = true // Already in group
} else if !s.IsNotFound(err) {
return nil, err
}
if _, getInfoErr := s.User.GetPublicUserInfo(ctx, req.FromUserID); getInfoErr != nil {
if _, err := s.User.GetPublicUserInfo(ctx, req.FromUserID); err != nil {
return nil, err
}
var member *relationtb.GroupMemberModel
@ -858,14 +849,14 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbgroup.JoinGroupReq)
JoinTime: time.Now(),
MuteEndTime: time.UnixMilli(0),
}
if callbackErr := CallbackBeforeMemberJoinGroup(ctx, s.config, groupMember, group.Ex); callbackErr != nil {
if err := CallbackBeforeMemberJoinGroup(ctx, s.config, groupMember, group.Ex); err != nil {
return nil, err
}
if createErr := s.db.CreateGroup(ctx, nil, []*relationtb.GroupMemberModel{groupMember}); createErr != nil {
if err := s.db.CreateGroup(ctx, nil, []*relationtb.GroupMemberModel{groupMember}); err != nil {
return nil, err
}
if createErr := s.conversationRpcClient.GroupChatFirstCreateConversation(ctx, req.GroupID, []string{req.InviterUserID}); createErr != nil {
if err := s.conversationRpcClient.GroupChatFirstCreateConversation(ctx, req.GroupID, []string{req.InviterUserID}); err != nil {
return nil, err
}
s.Notification.MemberEnterNotification(ctx, req.GroupID, req.InviterUserID)
@ -906,7 +897,7 @@ func (s *groupServer) QuitGroup(ctx context.Context, req *pbgroup.QuitGroupReq)
if member.RoleLevel == constant.GroupOwner {
return nil, errs.ErrNoPermission.Wrap("group owner can't quit")
}
if populateErr := s.PopulateGroupMember(ctx, member); populateErr != nil {
if err := s.PopulateGroupMember(ctx, member); err != nil {
return nil, err
}
err = s.db.DeleteGroupMember(ctx, req.GroupID, []string{req.UserID})
@ -968,14 +959,14 @@ func (s *groupServer) SetGroupInfo(ctx context.Context, req *pbgroup.SetGroupInf
if err != nil {
return nil, err
}
if populateErr := s.PopulateGroupMember(ctx, owner); populateErr != nil {
if err := s.PopulateGroupMember(ctx, owner); err != nil {
return nil, err
}
update := UpdateGroupInfoMap(ctx, req.GroupInfoForSet)
if len(update) == 0 {
return resp, nil
}
if updateErr := s.db.UpdateGroup(ctx, group.GroupID, update); updateErr != nil {
if err := s.db.UpdateGroup(ctx, group.GroupID, update); err != nil {
return nil, err
}
group, err = s.db.TakeGroup(ctx, req.GroupInfoForSet.GroupID)
@ -1169,7 +1160,7 @@ func (s *groupServer) GetUserReqApplicationList(ctx context.Context, req *pbgrou
if err != nil {
return nil, err
}
if populateErr := s.PopulateGroupMember(ctx, owners...); populateErr != nil {
if err := s.PopulateGroupMember(ctx, owners...); err != nil {
return nil, err
}
ownerMap := utils.SliceToMap(owners, func(e *relationtb.GroupMemberModel) string {
@ -1201,7 +1192,7 @@ func (s *groupServer) DismissGroup(ctx context.Context, req *pbgroup.DismissGrou
return nil, errs.ErrNoPermission.Wrap("not group owner")
}
}
if populateErr := s.PopulateGroupMember(ctx, owner); populateErr != nil {
if err := s.PopulateGroupMember(ctx, owner); err != nil {
return nil, err
}
group, err := s.db.TakeGroup(ctx, req.GroupID)
@ -1211,7 +1202,7 @@ func (s *groupServer) DismissGroup(ctx context.Context, req *pbgroup.DismissGrou
if !req.DeleteMember && group.Status == constant.GroupStatusDismissed {
return nil, errs.ErrDismissedAlready.Wrap("group status is dismissed")
}
if dismissErr := s.db.DismissGroup(ctx, req.GroupID, req.DeleteMember); dismissErr != nil {
if err := s.db.DismissGroup(ctx, req.GroupID, req.DeleteMember); err != nil {
return nil, err
}
if !req.DeleteMember {
@ -1567,7 +1558,7 @@ func (s *groupServer) GetGroupUsersReqApplicationList(ctx context.Context, req *
if err != nil {
return nil, err
}
if populateErr := s.PopulateGroupMember(ctx, owners...); populateErr != nil {
if err := s.PopulateGroupMember(ctx, owners...); err != nil {
return nil, err
}
ownerMap := utils.SliceToMap(owners, func(e *relationtb.GroupMemberModel) string {

@ -17,17 +17,14 @@ package msg
import (
"context"
utils2 "github.com/OpenIMSDK/tools/utils"
cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
"github.com/redis/go-redis/v9"
"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/protocol/msg"
"github.com/OpenIMSDK/protocol/sdkws"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/log"
utils2 "github.com/OpenIMSDK/tools/utils"
cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
"github.com/redis/go-redis/v9"
)
func (m *msgServer) GetConversationsHasReadAndMaxSeq(ctx context.Context, req *msg.GetConversationsHasReadAndMaxSeqReq) (resp *msg.GetConversationsHasReadAndMaxSeqResp, err error) {
@ -82,7 +79,7 @@ func (m *msgServer) SetConversationHasReadSeq(
if req.HasReadSeq > maxSeq {
return nil, errs.ErrArgs.Wrap("hasReadSeq must not be bigger than maxSeq")
}
if setErr := m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq); setErr != nil {
if err := m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq); err != nil {
return nil, err
}
if err = m.sendMarkAsReadNotification(ctx, req.ConversationID, constant.SingleChatType, req.UserID,

@ -17,19 +17,16 @@ package msg
import (
"context"
"github.com/OpenIMSDK/protocol/sdkws"
"google.golang.org/protobuf/proto"
"github.com/OpenIMSDK/protocol/constant"
pbchat "github.com/OpenIMSDK/protocol/msg"
"github.com/OpenIMSDK/protocol/sdkws"
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/mcontext"
"github.com/OpenIMSDK/tools/utils"
cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/http"
"google.golang.org/protobuf/proto"
)
func toCommonCallback(ctx context.Context, msg *pbchat.SendMsgReq, command string) cbapi.CommonCallbackReq {

@ -17,14 +17,13 @@ package msg
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/protocol/conversation"
"github.com/OpenIMSDK/protocol/msg"
"github.com/OpenIMSDK/protocol/sdkws"
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
)
func (m *msgServer) getMinSeqs(maxSeqs map[string]int64) map[string]int64 {

@ -21,7 +21,6 @@ import (
"github.com/OpenIMSDK/protocol/msg"
"github.com/OpenIMSDK/protocol/sdkws"
"github.com/OpenIMSDK/tools/errs"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
)

@ -67,7 +67,7 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg.
if !authverify.IsAppManagerUid(ctx, m.config) {
switch msgs[0].SessionType {
case constant.SingleChatType:
if accessErr := authverify.CheckAccessV3(ctx, msgs[0].SendID, m.config); accessErr != nil {
if err := authverify.CheckAccessV3(ctx, msgs[0].SendID, m.config); err != nil {
return nil, err
}
role = user.AppMangerLevel
@ -132,7 +132,7 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg.
} else {
recvID = msgs[0].RecvID
}
if notificationErr := m.notificationSender.NotificationWithSesstionType(ctx, req.UserID, recvID, constant.MsgRevokeNotification, msgs[0].SessionType, &tips); notificationErr != nil {
if err := m.notificationSender.NotificationWithSesstionType(ctx, req.UserID, recvID, constant.MsgRevokeNotification, msgs[0].SessionType, &tips); err != nil {
return nil, err
}
if err = CallbackAfterRevokeMsg(ctx, m.config, req); err != nil {

@ -66,7 +66,7 @@ func (m *msgServer) sendMsgSuperGroupChat(
return nil, err
}
if modifyErr := callbackMsgModify(ctx, m.config, req); modifyErr != nil {
if err := callbackMsgModify(ctx, m.config, req); err != nil {
return nil, err
}
err = m.MsgDatabase.MsgToMQ(ctx, utils.GenConversationUniqueKeyForGroup(req.MsgData.GroupID), req.MsgData)
@ -144,7 +144,7 @@ func (m *msgServer) sendMsgNotification(
}
func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq) (resp *pbmsg.SendMsgResp, err error) {
if verificationErr := m.messageVerification(ctx, req); verificationErr != nil {
if err := m.messageVerification(ctx, req); err != nil {
return nil, err
}
isSend := true
@ -169,10 +169,10 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq
return nil, err
}
if modifyErr := callbackMsgModify(ctx, m.config, req); modifyErr != nil {
if err := callbackMsgModify(ctx, m.config, req); err != nil {
return nil, err
}
if mqErr := m.MsgDatabase.MsgToMQ(ctx, utils.GenConversationUniqueKeyForSingle(req.MsgData.SendID, req.MsgData.RecvID), req.MsgData); mqErr != nil {
if err := m.MsgDatabase.MsgToMQ(ctx, utils.GenConversationUniqueKeyForSingle(req.MsgData.SendID, req.MsgData.RecvID), req.MsgData); err != nil {
prommetrics.SingleChatMsgProcessFailedCounter.Inc()
return nil, err
}

@ -72,7 +72,7 @@ func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryReg
if err != nil {
return err
}
if indexErr := mongo.CreateMsgIndex(); indexErr != nil {
if err := mongo.CreateMsgIndex(); err != nil {
return err
}
cacheModel := cache.NewMsgCacheModel(rdb, config)

@ -21,7 +21,6 @@ import (
"github.com/OpenIMSDK/protocol/msg"
"github.com/OpenIMSDK/protocol/sdkws"
"github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation"
)

@ -17,15 +17,13 @@ package msg
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/protocol/msg"
"github.com/OpenIMSDK/protocol/sdkws"
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
)
func (m *msgServer) PullMessageBySeqs(

@ -18,10 +18,9 @@ import (
"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/protocol/sdkws"
"github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/redis/go-redis/v9"
"go.mongodb.org/mongo-driver/mongo"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
)
func isMessageHasReadEnabled(msgData *sdkws.MsgData, config *config.GlobalConfig) bool {

@ -25,7 +25,6 @@ import (
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/utils"
utils2 "github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
)

@ -74,7 +74,7 @@ func (t *thirdServer) InitiateMultipartUpload(ctx context.Context, req *third.In
Group: req.Cause,
CreateTime: time.Now(),
}
if setObjectErr := t.s3dataBase.SetObject(ctx, obj); setObjectErr != nil {
if err := t.s3dataBase.SetObject(ctx, obj); err != nil {
return nil, err
}
return &third.InitiateMultipartUploadResp{
@ -256,10 +256,10 @@ func (t *thirdServer) CompleteFormData(ctx context.Context, req *third.CompleteF
return nil, errs.ErrArgs.Wrap("invalid id " + err.Error())
}
var mate FormDataMate
if unmarshalErr := json.Unmarshal(data, &mate); unmarshalErr != nil {
if err := json.Unmarshal(data, &mate); err != nil {
return nil, errs.ErrArgs.Wrap("invalid id " + err.Error())
}
if uploadErr := checkUploadName(ctx, mate.Name, t.config); uploadErr != nil {
if err := checkUploadName(ctx, mate.Name, t.config); err != nil {
return nil, err
}
info, err := t.s3dataBase.StatObject(ctx, mate.Key)

@ -17,7 +17,6 @@ package third
import (
"context"
"fmt"
"github.com/OpenIMSDK/tools/log"
"net/url"
"time"
@ -41,7 +40,6 @@ import (
)
func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
log.ZDebug(context.Background(), "config19999999999999999999999999999999999", config, "javadfdas")
mongo, err := unrelation.NewMongo(config)
if err != nil {
@ -59,7 +57,7 @@ func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryReg
if apiURL == "" {
return fmt.Errorf("api url is empty")
}
if _, parseErr := url.Parse(config.Object.ApiURL); parseErr != nil {
if _, err := url.Parse(config.Object.ApiURL); err != nil {
return err
}
if apiURL[len(apiURL)-1] != '/' {
@ -70,7 +68,7 @@ func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryReg
if err != nil {
return err
}
// 根据配置文件策略选择 oss 方式
// Select based on the configuration file strategy
enable := config.Object.Enable
var o s3.Interface
switch config.Object.Enable {

@ -22,11 +22,10 @@ import (
"strings"
"unicode/utf8"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/OpenIMSDK/protocol/third"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/mcontext"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
)
func toPbMapArray(m map[string][]string) []*third.KeyValues {

@ -19,7 +19,6 @@ import (
pbuser "github.com/OpenIMSDK/protocol/user"
"github.com/OpenIMSDK/tools/utils"
cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/http"

@ -125,11 +125,11 @@ func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserI
if err != nil {
return nil, err
}
if callbackErr := CallbackBeforeUpdateUserInfo(ctx, s.config, req); callbackErr != nil {
if err := CallbackBeforeUpdateUserInfo(ctx, s.config, req); err != nil {
return nil, err
}
data := convert.UserPb2DBMap(req.UserInfo)
if updateErr := s.UpdateByMap(ctx, req.UserInfo.UserID, data); updateErr != nil {
if err := s.UpdateByMap(ctx, req.UserInfo.UserID, data); err != nil {
return nil, err
}
_ = s.friendNotificationSender.UserInfoUpdatedNotification(ctx, req.UserInfo.UserID)

@ -20,11 +20,9 @@ import (
"time"
"github.com/OpenIMSDK/protocol/sdkws"
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/mcontext"
"github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
)
@ -58,9 +56,9 @@ import (
// continue
// }
// if len(seqs) > 0 {
// if err := c.conversationDatabase.UpdateUsersConversationFiled(ctx, []string{conversation.OwnerUserID}, conversation.ConversationID, map[string]interface{}{"latest_msg_destruct_time": now}); err
// if err := c.conversationDatabase.UpdateUsersConversationField(ctx, []string{conversation.OwnerUserID}, conversation.ConversationID, map[string]interface{}{"latest_msg_destruct_time": now}); err
// != nil {
// log.ZError(ctx, "updateUsersConversationFiled failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
// log.ZError(ctx, "updateUsersConversationField failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
// continue
// }
// if err := c.msgNotificationSender.UserDeleteMsgsNotification(ctx, conversation.OwnerUserID, conversation.ConversationID, seqs); err != nil {
@ -139,8 +137,8 @@ func (c *MsgTool) ConversationsDestructMsgs() {
continue
}
if len(seqs) > 0 {
if err := c.conversationDatabase.UpdateUsersConversationFiled(ctx, []string{conversation.OwnerUserID}, conversation.ConversationID, map[string]any{"latest_msg_destruct_time": now}); err != nil {
log.ZError(ctx, "updateUsersConversationFiled failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
if err := c.conversationDatabase.UpdateUsersConversationField(ctx, []string{conversation.OwnerUserID}, conversation.ConversationID, map[string]any{"latest_msg_destruct_time": now}); err != nil {
log.ZError(ctx, "updateUsersConversationField failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
continue
}
if err := c.msgNotificationSender.UserDeleteMsgsNotification(ctx, conversation.OwnerUserID, conversation.ConversationID, seqs); err != nil {

@ -23,7 +23,6 @@ import (
"time"
"github.com/OpenIMSDK/tools/errs"
"github.com/redis/go-redis/v9"
"github.com/robfig/cron/v3"
@ -48,16 +47,16 @@ func StartTask(config *config.GlobalConfig) error {
// register cron tasks
var crontab = cron.New()
fmt.Println("start chatRecordsClearTime cron task", "cron config", config.ChatRecordsClearTime)
fmt.Printf("Start chatRecordsClearTime cron task, cron config: %s\n", config.ChatRecordsClearTime)
_, err = crontab.AddFunc(config.ChatRecordsClearTime, cronWrapFunc(config,rdb, "cron_clear_msg_and_fix_seq", msgTool.AllConversationClearMsgAndFixSeq))
if err != nil {
return errs.Wrap(err)
}
fmt.Println("start msgDestruct cron task", "cron config", config.MsgDestructTime)
fmt.Printf("Start msgDestruct cron task, cron config: %s\n", config.MsgDestructTime)
_, err = crontab.AddFunc(config.MsgDestructTime, cronWrapFunc(config,rdb, "cron_conversations_destruct_msgs", msgTool.ConversationsDestructMsgs))
if err != nil {
return errs.Wrap(err)
return errs.Wrap(err, "cron_conversations_destruct_msgs")
}
// start crontab

@ -18,32 +18,26 @@ import (
"context"
"fmt"
"math"
"github.com/OpenIMSDK/protocol/sdkws"
"github.com/OpenIMSDK/tools/tx"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo"
"github.com/redis/go-redis/v9"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
"math/rand"
"github.com/OpenIMSDK/protocol/sdkws"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/mcontext"
"github.com/OpenIMSDK/tools/mw"
"github.com/OpenIMSDK/tools/tx"
"github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation"
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification"
"github.com/redis/go-redis/v9"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
type MsgTool struct {
@ -200,7 +194,8 @@ func (c *MsgTool) checkMaxSeqWithMongo(ctx context.Context, conversationID strin
return err
}
if math.Abs(float64(maxSeqMongo-maxSeqCache)) > 10 {
log.ZError(ctx, "cache max seq and mongo max seq is diff > 10", nil, "maxSeqMongo", maxSeqMongo, "minSeqMongo", minSeqMongo, "maxSeqCache", maxSeqCache, "conversationID", conversationID)
err = fmt.Errorf("cache max seq and mongo max seq is diff > 10, maxSeqMongo:%d,minSeqMongo:%d,maxSeqCache:%d,conversationID:%s", maxSeqMongo, minSeqMongo, maxSeqCache, conversationID)
return errs.Wrap(err)
}
return nil
}
@ -222,7 +217,6 @@ func (c *MsgTool) checkMaxSeq(ctx context.Context, conversationID string) error
func (c *MsgTool) FixAllSeq(ctx context.Context) error {
conversationIDs, err := c.conversationDatabase.GetAllConversationIDs(ctx)
if err != nil {
log.ZError(ctx, "GetAllConversationIDs failed", err)
return err
}
for _, conversationID := range conversationIDs {

@ -18,7 +18,6 @@ import (
"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/mcontext"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
)

@ -67,6 +67,7 @@ type LocationElem struct {
Longitude float64 `mapstructure:"longitude" validate:"required"`
Latitude float64 `mapstructure:"latitude" validate:"required"`
}
type CustomElem struct {
Data string `mapstructure:"data" validate:"required"`
Description string `mapstructure:"description"`

@ -23,7 +23,6 @@ import (
"github.com/OpenIMSDK/tools/tokenverify"
"github.com/OpenIMSDK/tools/utils"
"github.com/golang-jwt/jwt/v4"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
)

@ -16,7 +16,6 @@ package callbackstruct
import (
common "github.com/OpenIMSDK/protocol/sdkws"
"github.com/openimsdk/open-im-server/v3/pkg/apistruct"
)

@ -15,6 +15,9 @@
package cmd
import (
"errors"
"fmt"
"github.com/OpenIMSDK/protocol/constant"
"github.com/openimsdk/open-im-server/v3/internal/api"
"github.com/spf13/cobra"
@ -35,6 +38,7 @@ func NewApiCmd() *ApiCmd {
return ret
}
// 原来代码
func (a *ApiCmd) addPreRun() {
a.Command.PreRun = func(cmd *cobra.Command, args []string) {
a.port = a.getPortFlag(cmd)
@ -48,11 +52,17 @@ func (a *ApiCmd) addRunE() {
}
}
func (a *ApiCmd) GetPortFromConfig(portType string) int {
func (a *ApiCmd) GetPortFromConfig(portType string) (int, error) {
if portType == constant.FlagPort {
return a.config.Api.OpenImApiPort[0]
if len(a.config.Api.OpenImApiPort) > 0 {
return a.config.Api.OpenImApiPort[0], nil
}
return 0, errors.New("API port configuration is empty or missing")
} else if portType == constant.FlagPrometheusPort {
return a.config.Prometheus.ApiPrometheusPort[0]
if len(a.config.Prometheus.ApiPrometheusPort) > 0 {
return a.config.Prometheus.ApiPrometheusPort[0], nil
}
return 0, errors.New("Prometheus port configuration is empty or missing")
}
return 0
return 0, fmt.Errorf("unknown port type: %s", portType)
}

@ -42,3 +42,7 @@ func (c *CronTaskCmd) addRunE() {
func (c *CronTaskCmd) Exec() error {
return c.Execute()
}
func (c *CronTaskCmd) GetPortFromConfig(portType string) (int, error) {
return 0, nil
}

@ -44,7 +44,7 @@ func TestMsgGatewayCmd_GetPortFromConfig(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.portType, func(t *testing.T) {
got := msgGatewayCmd.GetPortFromConfig(tt.portType)
got, _ := msgGatewayCmd.GetPortFromConfig(tt.portType)
assert.Equal(t, tt.want, got)
})
}

@ -15,9 +15,9 @@
package cmd
import (
"github.com/spf13/cobra"
"github.com/openimsdk/open-im-server/v3/internal/tools"
util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
"github.com/spf13/cobra"
)
type MsgUtilsCmd struct {
@ -138,7 +138,7 @@ func (s *SeqCmd) GetSeqCmd() *cobra.Command {
s.Command.Run = func(cmdLines *cobra.Command, args []string) {
_, err := tools.InitMsgTool(s.MsgTool.Config)
if err != nil {
panic(err)
util.ExitWithError(err)
}
userID := s.getUserIDFlag(cmdLines)
superGroupID := s.getSuperGroupIDFlag(cmdLines)

@ -17,19 +17,18 @@ package cmd
import (
"fmt"
config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/spf13/cobra"
"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/log"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/spf13/cobra"
)
type RootCmdPt interface {
GetPortFromConfig(portType string) int
}
type RootCmd struct {
Command cobra.Command
Name string
@ -49,7 +48,7 @@ type CmdOpts struct {
func WithCronTaskLogName() func(*CmdOpts) {
return func(opts *CmdOpts) {
opts.loggerPrefixName = "openim.crontask.log.all"
opts.loggerPrefixName = "openim-crontask"
}
}
@ -82,7 +81,7 @@ func (rc *RootCmd) persistentPreRun(cmd *cobra.Command, opts ...func(*CmdOpts))
cmdOpts := rc.applyOptions(opts...)
if err := rc.initializeLogger(cmdOpts); err != nil {
return fmt.Errorf("failed to initialize from config: %w", err)
return errs.Wrap(err, "failed to initialize logger")
}
return nil
@ -119,7 +118,7 @@ func (rc *RootCmd) initializeLogger(cmdOpts *CmdOpts) error {
func defaultCmdOpts() *CmdOpts {
return &CmdOpts{
loggerPrefixName: "OpenIM.log.all",
loggerPrefixName: "openim-all",
}
}
@ -138,7 +137,8 @@ func (r *RootCmd) AddPortFlag() {
func (r *RootCmd) getPortFlag(cmd *cobra.Command) int {
port, err := cmd.Flags().GetInt(constant.FlagPort)
if err != nil {
fmt.Println("Error getting ws port flag:", err)
// Wrapping the error with additional context
return 0
}
if port == 0 {
port = r.PortFromConfig(constant.FlagPort)
@ -146,6 +146,7 @@ func (r *RootCmd) getPortFlag(cmd *cobra.Command) int {
return port
}
// // GetPortFlag returns the port flag.
func (r *RootCmd) GetPortFlag() int {
return r.port
}
@ -155,9 +156,12 @@ func (r *RootCmd) AddPrometheusPortFlag() {
}
func (r *RootCmd) getPrometheusPortFlag(cmd *cobra.Command) int {
port, _ := cmd.Flags().GetInt(constant.FlagPrometheusPort)
if port == 0 {
port, err := cmd.Flags().GetInt(constant.FlagPrometheusPort)
if err != nil || port == 0 {
port = r.PortFromConfig(constant.FlagPrometheusPort)
if err != nil {
return 0
}
}
return port
}
@ -180,10 +184,8 @@ func (r *RootCmd) AddCommand(cmds ...*cobra.Command) {
r.Command.AddCommand(cmds...)
}
func (r *RootCmd) GetPortFromConfig(portType string) int {
return 0
}
func (r *RootCmd) PortFromConfig(portType string) int {
return r.cmdItf.GetPortFromConfig(portType)
// Retrieve the port and cache it
port := r.cmdItf.GetPortFromConfig(portType)
return port
}

@ -83,6 +83,9 @@ func GetOptionsByNotification(cfg NotificationConf) msgprocessor.Options {
return opts
}
// initConfig loads configuration from a specified path into the provided config structure.
// If the specified config file does not exist, it attempts to load from the project's default "config" directory.
// It logs informative messages regarding the configuration path being used.
func initConfig(config any, configName, configFolderPath string) error {
configFolderPath = filepath.Join(configFolderPath, configName)
_, err := os.Stat(configFolderPath)

@ -31,7 +31,7 @@ func TestGetDefaultConfigPath(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := GetDefaultConfigPath(); got != tt.want {
if got, _ := GetDefaultConfigPath(); got != tt.want {
t.Errorf("GetDefaultConfigPath() = %v, want %v", got, tt.want)
}
})
@ -47,7 +47,7 @@ func TestGetProjectRoot(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := GetProjectRoot(); got != tt.want {
if got, _ := GetProjectRoot(); got != tt.want {
t.Errorf("GetProjectRoot() = %v, want %v", got, tt.want)
}
})

@ -19,15 +19,10 @@ import (
"github.com/OpenIMSDK/protocol/sdkws"
sdk "github.com/OpenIMSDK/protocol/sdkws"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
)
func BlackDB2Pb(
ctx context.Context,
blackDBs []*relation.BlackModel,
f func(ctx context.Context, userIDs []string) (map[string]*sdkws.UserInfo, error),
) (blackPbs []*sdk.BlackInfo, err error) {
func BlackDB2Pb(ctx context.Context, blackDBs []*relation.BlackModel, f func(ctx context.Context, userIDs []string) (map[string]*sdkws.UserInfo, error)) (blackPbs []*sdk.BlackInfo, err error) {
if len(blackDBs) == 0 {
return nil, nil
}

@ -17,7 +17,6 @@ package convert
import (
"github.com/OpenIMSDK/protocol/conversation"
"github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
)

@ -20,7 +20,6 @@ import (
"github.com/OpenIMSDK/protocol/sdkws"
"github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
)
@ -89,8 +88,7 @@ func FriendsDB2Pb(
}
func FriendRequestDB2Pb(
ctx context.Context,
func FriendRequestDB2Pb(ctx context.Context,
friendRequests []*relation.FriendRequestModel,
getUsers func(ctx context.Context, userIDs []string) (map[string]*sdkws.UserInfo, error),
) ([]*sdkws.FriendRequest, error) {

@ -19,7 +19,6 @@ import (
pbgroup "github.com/OpenIMSDK/protocol/group"
sdkws "github.com/OpenIMSDK/protocol/sdkws"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
)

@ -17,7 +17,6 @@ package convert
import (
"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/protocol/sdkws"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation"
)

@ -18,7 +18,6 @@ import (
"time"
"github.com/OpenIMSDK/protocol/sdkws"
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
)

@ -19,9 +19,8 @@ import (
"time"
"github.com/dtm-labs/rockscache"
"github.com/redis/go-redis/v9"
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
"github.com/redis/go-redis/v9"
)
const (
@ -46,11 +45,7 @@ type BlackCacheRedis struct {
blackDB relationtb.BlackModelInterface
}
func NewBlackCacheRedis(
rdb redis.UniversalClient,
blackDB relationtb.BlackModelInterface,
options rockscache.Options,
) BlackCache {
func NewBlackCacheRedis(rdb redis.UniversalClient, blackDB relationtb.BlackModelInterface, options rockscache.Options) BlackCache {
rcClient := rockscache.NewClient(rdb, options)
return &BlackCacheRedis{

@ -20,12 +20,10 @@ import (
"strings"
"time"
"github.com/dtm-labs/rockscache"
"github.com/redis/go-redis/v9"
"github.com/OpenIMSDK/tools/utils"
"github.com/dtm-labs/rockscache"
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
"github.com/redis/go-redis/v9"
)
const (

@ -18,12 +18,10 @@ import (
"context"
"time"
"github.com/dtm-labs/rockscache"
"github.com/redis/go-redis/v9"
"github.com/OpenIMSDK/tools/utils"
"github.com/dtm-labs/rockscache"
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
"github.com/redis/go-redis/v9"
)
const (

@ -22,15 +22,11 @@ import (
"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/log"
"github.com/dtm-labs/rockscache"
"github.com/redis/go-redis/v9"
"github.com/OpenIMSDK/tools/utils"
"github.com/dtm-labs/rockscache"
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
"github.com/redis/go-redis/v9"
)
const (

@ -22,12 +22,10 @@ import (
"strings"
"time"
"github.com/redis/go-redis/v9"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/mw/specialerror"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/redis/go-redis/v9"
)
var (
@ -77,8 +75,7 @@ func NewRedis(config *config.GlobalConfig) (redis.UniversalClient, error) {
defer cancel()
err = rdb.Ping(ctx).Err()
if err != nil {
uriFormat := "address:%s, username:%s, password:%s, clusterMode:%t, enablePipeline:%t"
errMsg := fmt.Sprintf(uriFormat, config.Redis.Address, config.Redis.Username,
errMsg := fmt.Sprintf("address:%s, username:%s, password:%s, clusterMode:%t, enablePipeline:%t", config.Redis.Address, config.Redis.Username,
config.Redis.Password, config.Redis.ClusterMode, config.Redis.EnablePipeline)
return nil, errs.Wrap(err, errMsg)
}
@ -99,9 +96,11 @@ func overrideConfigFromEnv(config *config.GlobalConfig) {
config.Redis.Address = strings.Split(envAddr, ",")
}
}
if envUser := os.Getenv("REDIS_USERNAME"); envUser != "" {
config.Redis.Username = envUser
}
if envPass := os.Getenv("REDIS_PASSWORD"); envPass != "" {
config.Redis.Password = envPass
}

@ -20,22 +20,16 @@ import (
"strconv"
"time"
"golang.org/x/sync/errgroup"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/OpenIMSDK/tools/errs"
"github.com/gogo/protobuf/jsonpb"
"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/protocol/sdkws"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/utils"
"github.com/gogo/protobuf/jsonpb"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/redis/go-redis/v9"
"golang.org/x/sync/errgroup"
)
const (
@ -434,7 +428,7 @@ func (c *msgCache) PipeSetMessageToCache(ctx context.Context, conversationID str
for _, msg := range msgs {
s, err := msgprocessor.Pb2String(msg)
if err != nil {
return 0, errs.Wrap(err, "pb.marshal")
return 0, err
}
key := c.getMessageCacheKey(conversationID, msg.Seq)
@ -443,7 +437,7 @@ func (c *msgCache) PipeSetMessageToCache(ctx context.Context, conversationID str
results, err := pipe.Exec(ctx)
if err != nil {
return 0, errs.Wrap(err, "pipe.set")
return 0, errs.Wrap(err)
}
for _, res := range results {

@ -20,10 +20,9 @@ import (
"time"
"github.com/dtm-labs/rockscache"
"github.com/redis/go-redis/v9"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/s3"
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
"github.com/redis/go-redis/v9"
)
type ObjectCache interface {

@ -293,7 +293,7 @@ func (u *UserCacheRedis) refreshStatusOnline(ctx context.Context, userID string,
onlineStatus.UserID = userID
newjsonData, err := json.Marshal(&onlineStatus)
if err != nil {
return errs.Wrap(err)
return errs.Wrap(err, "json.Marshal failed")
}
_, err = u.rdb.HSet(ctx, key, userID, string(newjsonData)).Result()
if err != nil {

@ -18,22 +18,18 @@ import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/OpenIMSDK/tools/errs"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/golang-jwt/jwt/v4"
"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/tokenverify"
"github.com/golang-jwt/jwt/v4"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
)
type AuthDatabase interface {
// 结果为空 不返回错误
// If the result is empty, no error is returned.
GetTokensWithoutError(ctx context.Context, userID string, platformID int) (map[string]int, error)
// 创建token
// Create token
CreateToken(ctx context.Context, userID string, platformID int) (string, error)
}
@ -48,16 +44,12 @@ func NewAuthDatabase(cache cache.MsgModel, accessSecret string, accessExpire int
return &authDatabase{cache: cache, accessSecret: accessSecret, accessExpire: accessExpire, config: config}
}
// 结果为空 不返回错误.
func (a *authDatabase) GetTokensWithoutError(
ctx context.Context,
userID string,
platformID int,
) (map[string]int, error) {
// If the result is empty.
func (a *authDatabase) GetTokensWithoutError(ctx context.Context, userID string, platformID int) (map[string]int, error) {
return a.cache.GetTokensWithoutError(ctx, userID, platformID)
}
// 创建token.
// Create Token.
func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformID int) (string, error) {
tokens, err := a.cache.GetTokensWithoutError(ctx, userID, platformID)
if err != nil {
@ -81,7 +73,7 @@ func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformI
token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
tokenString, err := token.SignedString([]byte(a.accessSecret))
if err != nil {
return "", errs.Wrap(err)
return "", errs.Wrap(err, "token.SignedString")
}
return tokenString, a.cache.AddTokenFlag(ctx, userID, platformID, tokenString, constant.NormalToken)
}

@ -17,24 +17,22 @@ package controller
import (
"context"
"github.com/OpenIMSDK/tools/pagination"
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/pagination"
"github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
)
type BlackDatabase interface {
// Create 增加黑名单
// Create add BlackList
Create(ctx context.Context, blacks []*relation.BlackModel) (err error)
// Delete 删除黑名单
// Delete delete BlackList
Delete(ctx context.Context, blacks []*relation.BlackModel) (err error)
// FindOwnerBlacks 获取黑名单列表
// FindOwnerBlacks get BlackList list
FindOwnerBlacks(ctx context.Context, ownerUserID string, pagination pagination.Pagination) (total int64, blacks []*relation.BlackModel, err error)
FindBlackInfos(ctx context.Context, ownerUserID string, userIDs []string) (blacks []*relation.BlackModel, err error)
// CheckIn 检查user2是否在user1的黑名单列表中(inUser1Blacks==true) 检查user1是否在user2的黑名单列表中(inUser2Blacks==true)
// CheckIn Check whether user2 is in the black list of user1 (inUser1Blacks==true) Check whether user1 is in the black list of user2 (inUser2Blacks==true)
CheckIn(ctx context.Context, userID1, userID2 string) (inUser1Blacks bool, inUser2Blacks bool, err error)
}
@ -47,7 +45,7 @@ func NewBlackDatabase(black relation.BlackModelInterface, cache cache.BlackCache
return &blackDatabase{black, cache}
}
// Create 增加黑名单.
// Create Add Blacklist.
func (b *blackDatabase) Create(ctx context.Context, blacks []*relation.BlackModel) (err error) {
if err := b.black.Create(ctx, blacks); err != nil {
return err
@ -55,7 +53,7 @@ func (b *blackDatabase) Create(ctx context.Context, blacks []*relation.BlackMode
return b.deleteBlackIDsCache(ctx, blacks)
}
// Delete 删除黑名单.
// Delete Delete Blacklist.
func (b *blackDatabase) Delete(ctx context.Context, blacks []*relation.BlackModel) (err error) {
if err := b.black.Delete(ctx, blacks); err != nil {
return err
@ -63,6 +61,7 @@ func (b *blackDatabase) Delete(ctx context.Context, blacks []*relation.BlackMode
return b.deleteBlackIDsCache(ctx, blacks)
}
// FindOwnerBlacks Get Blacklist List.
func (b *blackDatabase) deleteBlackIDsCache(ctx context.Context, blacks []*relation.BlackModel) (err error) {
cache := b.cache.NewCache()
for _, black := range blacks {
@ -71,16 +70,13 @@ func (b *blackDatabase) deleteBlackIDsCache(ctx context.Context, blacks []*relat
return cache.ExecDel(ctx)
}
// FindOwnerBlacks 获取黑名单列表.
// FindOwnerBlacks Get Blacklist List.
func (b *blackDatabase) FindOwnerBlacks(ctx context.Context, ownerUserID string, pagination pagination.Pagination) (total int64, blacks []*relation.BlackModel, err error) {
return b.black.FindOwnerBlacks(ctx, ownerUserID, pagination)
}
// CheckIn 检查user2是否在user1的黑名单列表中(inUser1Blacks==true) 检查user1是否在user2的黑名单列表中(inUser2Blacks==true).
func (b *blackDatabase) CheckIn(
ctx context.Context,
userID1, userID2 string,
) (inUser1Blacks bool, inUser2Blacks bool, err error) {
// FindOwnerBlacks Get Blacklist List.
func (b *blackDatabase) CheckIn(ctx context.Context, userID1, userID2 string) (inUser1Blacks bool, inUser2Blacks bool, err error) {
userID1BlackIDs, err := b.cache.GetBlackIDs(ctx, userID1)
if err != nil {
return
@ -93,10 +89,12 @@ func (b *blackDatabase) CheckIn(
return utils.IsContain(userID2, userID1BlackIDs), utils.IsContain(userID1, userID2BlackIDs), nil
}
// FindBlackIDs Get Blacklist List.
func (b *blackDatabase) FindBlackIDs(ctx context.Context, ownerUserID string) (blackIDs []string, err error) {
return b.cache.GetBlackIDs(ctx, ownerUserID)
}
// FindBlackInfos Get Blacklist List.
func (b *blackDatabase) FindBlackInfos(ctx context.Context, ownerUserID string, userIDs []string) (blacks []*relation.BlackModel, err error) {
return b.black.FindOwnerBlackInfos(ctx, ownerUserID, userIDs)
}

@ -18,46 +18,52 @@ import (
"context"
"time"
"github.com/OpenIMSDK/tools/pagination"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/pagination"
"github.com/OpenIMSDK/tools/tx"
"github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
)
type ConversationDatabase interface {
// UpdateUserConversationFiled 更新用户该会话的属性信息
UpdateUsersConversationFiled(ctx context.Context, userIDs []string, conversationID string, args map[string]any) error
// CreateConversation 创建一批新的会话
// UpdateUsersConversationField updates the properties of a conversation for specified users.
UpdateUsersConversationField(ctx context.Context, userIDs []string, conversationID string, args map[string]any) error
// CreateConversation creates a batch of new conversations.
CreateConversation(ctx context.Context, conversations []*relationtb.ConversationModel) error
// SyncPeerUserPrivateConversation 同步对端私聊会话内部保证事务操作
// SyncPeerUserPrivateConversationTx ensures transactional operation while syncing private conversations between peers.
SyncPeerUserPrivateConversationTx(ctx context.Context, conversation []*relationtb.ConversationModel) error
// FindConversations 根据会话ID获取某个用户的多个会话
// FindConversations retrieves multiple conversations of a user by conversation IDs.
FindConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationtb.ConversationModel, error)
// FindRecvMsgNotNotifyUserIDs 获取超级大群开启免打扰的用户ID
//FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error)
// GetUserAllConversation 获取一个用户在服务器上所有的会话
// GetUserAllConversation fetches all conversations of a user on the server.
GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*relationtb.ConversationModel, error)
// SetUserConversations 设置用户多个会话属性,如果会话不存在则创建,否则更新,内部保证原子性
// SetUserConversations sets multiple conversation properties for a user, creates new conversations if they do not exist, or updates them otherwise. This operation is atomic.
SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationtb.ConversationModel) error
// SetUsersConversationFiledTx 设置多个用户会话关于某个字段的更新操作,如果会话不存在则创建,否则更新,内部保证事务操作
SetUsersConversationFiledTx(ctx context.Context, userIDs []string, conversation *relationtb.ConversationModel, filedMap map[string]any) error
// SetUsersConversationFieldTx updates a specific field for multiple users' conversations, creating new conversations if they do not exist, or updates them otherwise. This operation is
// transactional.
SetUsersConversationFieldTx(ctx context.Context, userIDs []string, conversation *relationtb.ConversationModel, fieldMap map[string]any) error
// CreateGroupChatConversation creates a group chat conversation for the specified group ID and user IDs.
CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string) error
// GetConversationIDs retrieves conversation IDs for a given user.
GetConversationIDs(ctx context.Context, userID string) ([]string, error)
// GetUserConversationIDsHash gets the hash of conversation IDs for a given user.
GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error)
// GetAllConversationIDs fetches all conversation IDs.
GetAllConversationIDs(ctx context.Context) ([]string, error)
// GetAllConversationIDsNumber returns the number of all conversation IDs.
GetAllConversationIDsNumber(ctx context.Context) (int64, error)
// PageConversationIDs paginates through conversation IDs based on the specified pagination settings.
PageConversationIDs(ctx context.Context, pagination pagination.Pagination) (conversationIDs []string, err error)
//GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error)
// GetConversationsByConversationID retrieves conversations by their IDs.
GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relationtb.ConversationModel, error)
// GetConversationIDsNeedDestruct fetches conversations that need to be destructed based on specific criteria.
GetConversationIDsNeedDestruct(ctx context.Context) ([]*relationtb.ConversationModel, error)
// GetConversationNotReceiveMessageUserIDs gets user IDs for users in a conversation who have not received messages.
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
//GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error)
//FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error)
}
func NewConversationDatabase(conversation relationtb.ConversationModelInterface, cache cache.ConversationCache, tx tx.CtxTx) ConversationDatabase {
@ -74,7 +80,7 @@ type conversationDatabase struct {
tx tx.CtxTx
}
func (c *conversationDatabase) SetUsersConversationFiledTx(ctx context.Context, userIDs []string, conversation *relationtb.ConversationModel, filedMap map[string]any) (err error) {
func (c *conversationDatabase) SetUsersConversationFieldTx(ctx context.Context, userIDs []string, conversation *relationtb.ConversationModel, fieldMap map[string]any) (err error) {
return c.tx.Transaction(ctx, func(ctx context.Context) error {
cache := c.cache.NewCache()
if conversation.GroupID != "" {
@ -85,22 +91,22 @@ func (c *conversationDatabase) SetUsersConversationFiledTx(ctx context.Context,
return err
}
if len(haveUserIDs) > 0 {
_, err = c.conversationDB.UpdateByMap(ctx, haveUserIDs, conversation.ConversationID, filedMap)
_, err = c.conversationDB.UpdateByMap(ctx, haveUserIDs, conversation.ConversationID, fieldMap)
if err != nil {
return err
}
cache = cache.DelUsersConversation(conversation.ConversationID, haveUserIDs...)
if _, ok := filedMap["has_read_seq"]; ok {
if _, ok := fieldMap["has_read_seq"]; ok {
for _, userID := range haveUserIDs {
cache = cache.DelUserAllHasReadSeqs(userID, conversation.ConversationID)
}
}
if _, ok := filedMap["recv_msg_opt"]; ok {
if _, ok := fieldMap["recv_msg_opt"]; ok {
cache = cache.DelConversationNotReceiveMessageUserIDs(conversation.ConversationID)
}
}
NotUserIDs := utils.DifferenceString(haveUserIDs, userIDs)
log.ZDebug(ctx, "SetUsersConversationFiledTx", "NotUserIDs", NotUserIDs, "haveUserIDs", haveUserIDs, "userIDs", userIDs)
log.ZDebug(ctx, "SetUsersConversationFieldTx", "NotUserIDs", NotUserIDs, "haveUserIDs", haveUserIDs, "userIDs", userIDs)
var conversations []*relationtb.ConversationModel
now := time.Now()
for _, v := range NotUserIDs {
@ -123,7 +129,7 @@ func (c *conversationDatabase) SetUsersConversationFiledTx(ctx context.Context,
})
}
func (c *conversationDatabase) UpdateUsersConversationFiled(ctx context.Context, userIDs []string, conversationID string, args map[string]any) error {
func (c *conversationDatabase) UpdateUsersConversationField(ctx context.Context, userIDs []string, conversationID string, args map[string]any) error {
_, err := c.conversationDB.UpdateByMap(ctx, userIDs, conversationID, args)
if err != nil {
return err

@ -16,17 +16,16 @@ package controller
import (
"context"
"fmt"
"time"
"github.com/OpenIMSDK/tools/pagination"
"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/mcontext"
"github.com/OpenIMSDK/tools/pagination"
"github.com/OpenIMSDK/tools/tx"
"github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
)
@ -89,20 +88,30 @@ func NewFriendDatabase(friend relation.FriendModelInterface, friendRequest relat
return &friendDatabase{friend: friend, friendRequest: friendRequest, cache: cache, tx: tx}
}
// ok 检查user2是否在user1的好友列表中(inUser1Friends==true) 检查user1是否在user2的好友列表中(inUser2Friends==true).
// CheckIn verifies if user2 is in user1's friend list (inUser1Friends returns true) and
// if user1 is in user2's friend list (inUser2Friends returns true).
func (f *friendDatabase) CheckIn(ctx context.Context, userID1, userID2 string) (inUser1Friends bool, inUser2Friends bool, err error) {
// Retrieve friend IDs of userID1 from the cache
userID1FriendIDs, err := f.cache.GetFriendIDs(ctx, userID1)
if err != nil {
err = fmt.Errorf("error retrieving friend IDs for user %s: %w", userID1, err)
return
}
// Retrieve friend IDs of userID2 from the cache
userID2FriendIDs, err := f.cache.GetFriendIDs(ctx, userID2)
if err != nil {
err = fmt.Errorf("error retrieving friend IDs for user %s: %w", userID2, err)
return
}
return utils.IsContain(userID2, userID1FriendIDs), utils.IsContain(userID1, userID2FriendIDs), nil
// Check if userID2 is in userID1's friend list and vice versa
inUser1Friends = utils.IsContain(userID2, userID1FriendIDs)
inUser2Friends = utils.IsContain(userID1, userID2FriendIDs)
return inUser1Friends, inUser2Friends, nil
}
// 增加或者更新好友申请 如果之前有记录则更新,没有记录则新增.
// AddFriendRequest adds or updates a friend request.
func (f *friendDatabase) AddFriendRequest(ctx context.Context, fromUserID, toUserID string, reqMsg string, ex string) (err error) {
return f.tx.Transaction(ctx, func(ctx context.Context) error {
_, err := f.friendRequest.Take(ctx, fromUserID, toUserID)
@ -126,11 +135,11 @@ func (f *friendDatabase) AddFriendRequest(ctx context.Context, fromUserID, toUse
})
}
// (1)先判断是否在好友表 (在不在都不返回错误) (2)对于不在好友列表的 插入即可.
// (1) First determine whether it is in the friends list (in or out does not return an error) (2) for not in the friends list can be inserted.
func (f *friendDatabase) BecomeFriends(ctx context.Context, ownerUserID string, friendUserIDs []string, addSource int32) (err error) {
return f.tx.Transaction(ctx, func(ctx context.Context) error {
cache := f.cache.NewCache()
// 先find 找出重复的 去掉重复的
// User find friends
fs1, err := f.friend.FindFriends(ctx, ownerUserID, friendUserIDs)
if err != nil {
return err
@ -170,26 +179,37 @@ func (f *friendDatabase) BecomeFriends(ctx context.Context, ownerUserID string,
})
}
// 拒绝好友申请 (1)检查是否有申请记录且为未处理状态 (没有记录返回错误) (2)修改申请记录 已拒绝.
func (f *friendDatabase) RefuseFriendRequest(ctx context.Context, friendRequest *relation.FriendRequestModel) (err error) {
// RefuseFriendRequest rejects a friend request. It first checks for an existing, unprocessed request.
// If no such request exists, it returns an error. Otherwise, it marks the request as refused.
func (f *friendDatabase) RefuseFriendRequest(ctx context.Context, friendRequest *relation.FriendRequestModel) error {
// Attempt to retrieve the friend request from the database.
fr, err := f.friendRequest.Take(ctx, friendRequest.FromUserID, friendRequest.ToUserID)
if err != nil {
return err
return fmt.Errorf("failed to retrieve friend request from %s to %s: %w", friendRequest.FromUserID, friendRequest.ToUserID, err)
}
// Check if the friend request has already been handled.
if fr.HandleResult != 0 {
return errs.ErrArgs.Wrap("the friend request has been processed")
return fmt.Errorf("friend request from %s to %s has already been processed", friendRequest.FromUserID, friendRequest.ToUserID)
}
log.ZDebug(ctx, "refuse friend request", "friendRequest db", fr, "friendRequest arg", friendRequest)
// Log the action of refusing the friend request for debugging and auditing purposes.
log.ZDebug(ctx, "Refusing friend request", map[string]interface{}{
"DB_FriendRequest": fr,
"Arg_FriendRequest": friendRequest,
})
// Mark the friend request as refused and update the handle time.
friendRequest.HandleResult = constant.FriendResponseRefuse
friendRequest.HandleTime = time.Now()
err = f.friendRequest.Update(ctx, friendRequest)
if err != nil {
return err
if err := f.friendRequest.Update(ctx, friendRequest); err != nil {
return fmt.Errorf("failed to update friend request from %s to %s as refused: %w", friendRequest.FromUserID, friendRequest.ToUserID, err)
}
return nil
}
// AgreeFriendRequest 同意好友申请 (1)检查是否有申请记录且为未处理状态 (没有记录返回错误) (2)检查是否好友(不返回错误) (3) 建立双向好友关系(存在的忽略).
// AgreeFriendRequest accepts a friend request. It first checks for an existing, unprocessed request.
func (f *friendDatabase) AgreeFriendRequest(ctx context.Context, friendRequest *relation.FriendRequestModel) (err error) {
return f.tx.Transaction(ctx, func(ctx context.Context) error {
defer log.ZDebug(ctx, "return line")
@ -227,10 +247,10 @@ func (f *friendDatabase) AgreeFriendRequest(ctx context.Context, friendRequest *
return err
}
existsMap := utils.SliceSet(utils.Slice(exists, func(friend *relation.FriendModel) [2]string {
return [...]string{friend.OwnerUserID, friend.FriendUserID} // 自己 - 好友
return [...]string{friend.OwnerUserID, friend.FriendUserID} // My - Friend
}))
var adds []*relation.FriendModel
if _, ok := existsMap[[...]string{friendRequest.ToUserID, friendRequest.FromUserID}]; !ok { // 自己 - 好友
if _, ok := existsMap[[...]string{friendRequest.ToUserID, friendRequest.FromUserID}]; !ok { // My - Friend
adds = append(
adds,
&relation.FriendModel{
@ -241,7 +261,7 @@ func (f *friendDatabase) AgreeFriendRequest(ctx context.Context, friendRequest *
},
)
}
if _, ok := existsMap[[...]string{friendRequest.FromUserID, friendRequest.ToUserID}]; !ok { // 好友 - 自己
if _, ok := existsMap[[...]string{friendRequest.FromUserID, friendRequest.ToUserID}]; !ok { // My - Friend
adds = append(
adds,
&relation.FriendModel{
@ -261,7 +281,7 @@ func (f *friendDatabase) AgreeFriendRequest(ctx context.Context, friendRequest *
})
}
// 删除好友 外部判断是否好友关系.
// Delete removes a friend relationship. It is assumed that the external caller has verified the friendship status.
func (f *friendDatabase) Delete(ctx context.Context, ownerUserID string, friendUserIDs []string) (err error) {
if err := f.friend.Delete(ctx, ownerUserID, friendUserIDs); err != nil {
return err
@ -269,7 +289,7 @@ func (f *friendDatabase) Delete(ctx context.Context, ownerUserID string, friendU
return f.cache.DelFriendIDs(append(friendUserIDs, ownerUserID)...).ExecDel(ctx)
}
// 更新好友备注 零值也支持.
// UpdateRemark updates the remark for a friend. Zero value for remark is also supported.
func (f *friendDatabase) UpdateRemark(ctx context.Context, ownerUserID, friendUserID, remark string) (err error) {
if err := f.friend.UpdateRemark(ctx, ownerUserID, friendUserID, remark); err != nil {
return err
@ -277,27 +297,27 @@ func (f *friendDatabase) UpdateRemark(ctx context.Context, ownerUserID, friendUs
return f.cache.DelFriend(ownerUserID, friendUserID).ExecDel(ctx)
}
// 获取ownerUserID的好友列表 无结果不返回错误.
// PageOwnerFriends retrieves the list of friends for the ownerUserID. It does not return an error if the result is empty.
func (f *friendDatabase) PageOwnerFriends(ctx context.Context, ownerUserID string, pagination pagination.Pagination) (total int64, friends []*relation.FriendModel, err error) {
return f.friend.FindOwnerFriends(ctx, ownerUserID, pagination)
}
// friendUserID在哪些人的好友列表中.
// PageInWhoseFriends identifies in whose friend lists the friendUserID appears.
func (f *friendDatabase) PageInWhoseFriends(ctx context.Context, friendUserID string, pagination pagination.Pagination) (total int64, friends []*relation.FriendModel, err error) {
return f.friend.FindInWhoseFriends(ctx, friendUserID, pagination)
}
// 获取我发出去的好友申请 无结果不返回错误.
// PageFriendRequestFromMe retrieves friend requests sent by me. It does not return an error if the result is empty.
func (f *friendDatabase) PageFriendRequestFromMe(ctx context.Context, userID string, pagination pagination.Pagination) (total int64, friends []*relation.FriendRequestModel, err error) {
return f.friendRequest.FindFromUserID(ctx, userID, pagination)
}
// 获取我收到的的好友申请 无结果不返回错误.
// PageFriendRequestToMe retrieves friend requests received by me. It does not return an error if the result is empty.
func (f *friendDatabase) PageFriendRequestToMe(ctx context.Context, userID string, pagination pagination.Pagination) (total int64, friends []*relation.FriendRequestModel, err error) {
return f.friendRequest.FindToUserID(ctx, userID, pagination)
}
// 获取某人指定好友的信息 如果有好友不存在,也返回错误.
// FindFriendsWithError retrieves specified friends' information for ownerUserID. Returns an error if any friend does not exist.
func (f *friendDatabase) FindFriendsWithError(ctx context.Context, ownerUserID string, friendUserIDs []string) (friends []*relation.FriendModel, err error) {
friends, err = f.friend.FindFriends(ctx, ownerUserID, friendUserIDs)
if err != nil {

@ -18,60 +18,90 @@ import (
"context"
"time"
"github.com/OpenIMSDK/tools/pagination"
"github.com/dtm-labs/rockscache"
"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/tools/pagination"
"github.com/OpenIMSDK/tools/tx"
"github.com/OpenIMSDK/tools/utils"
"github.com/redis/go-redis/v9"
"github.com/dtm-labs/rockscache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
"github.com/redis/go-redis/v9"
)
type GroupDatabase interface {
// Group
// CreateGroup creates new groups along with their members.
CreateGroup(ctx context.Context, groups []*relationtb.GroupModel, groupMembers []*relationtb.GroupMemberModel) error
// TakeGroup retrieves a single group by its ID.
TakeGroup(ctx context.Context, groupID string) (group *relationtb.GroupModel, err error)
// FindGroup retrieves multiple groups by their IDs.
FindGroup(ctx context.Context, groupIDs []string) (groups []*relationtb.GroupModel, err error)
// SearchGroup searches for groups based on a keyword and pagination settings, returns total count and groups.
SearchGroup(ctx context.Context, keyword string, pagination pagination.Pagination) (int64, []*relationtb.GroupModel, error)
// UpdateGroup updates the properties of a group identified by its ID.
UpdateGroup(ctx context.Context, groupID string, data map[string]any) error
DismissGroup(ctx context.Context, groupID string, deleteMember bool) error // 解散群,并删除群成员
// DismissGroup disbands a group and optionally removes its members based on the deleteMember flag.
DismissGroup(ctx context.Context, groupID string, deleteMember bool) error
// TakeGroupMember retrieves a specific group member by group ID and user ID.
TakeGroupMember(ctx context.Context, groupID string, userID string) (groupMember *relationtb.GroupMemberModel, err error)
// TakeGroupOwner retrieves the owner of a group by group ID.
TakeGroupOwner(ctx context.Context, groupID string) (*relationtb.GroupMemberModel, error)
FindGroupMembers(ctx context.Context, groupID string, userIDs []string) (groupMembers []*relationtb.GroupMemberModel, err error) // *
FindGroupMemberUser(ctx context.Context, groupIDs []string, userID string) (groupMembers []*relationtb.GroupMemberModel, err error) // *
FindGroupMemberRoleLevels(ctx context.Context, groupID string, roleLevels []int32) (groupMembers []*relationtb.GroupMemberModel, err error) // *
FindGroupMemberAll(ctx context.Context, groupID string) (groupMembers []*relationtb.GroupMemberModel, err error) // *
// FindGroupMembers retrieves members of a group filtered by user IDs.
FindGroupMembers(ctx context.Context, groupID string, userIDs []string) (groupMembers []*relationtb.GroupMemberModel, err error)
// FindGroupMemberUser retrieves groups that a user is a member of, filtered by group IDs.
FindGroupMemberUser(ctx context.Context, groupIDs []string, userID string) (groupMembers []*relationtb.GroupMemberModel, err error)
// FindGroupMemberRoleLevels retrieves group members filtered by their role levels within a group.
FindGroupMemberRoleLevels(ctx context.Context, groupID string, roleLevels []int32) (groupMembers []*relationtb.GroupMemberModel, err error)
// FindGroupMemberAll retrieves all members of a group.
FindGroupMemberAll(ctx context.Context, groupID string) (groupMembers []*relationtb.GroupMemberModel, err error)
// FindGroupsOwner retrieves the owners for multiple groups.
FindGroupsOwner(ctx context.Context, groupIDs []string) ([]*relationtb.GroupMemberModel, error)
// FindGroupMemberUserID retrieves the user IDs of all members in a group.
FindGroupMemberUserID(ctx context.Context, groupID string) ([]string, error)
// FindGroupMemberNum retrieves the number of members in a group.
FindGroupMemberNum(ctx context.Context, groupID string) (uint32, error)
// FindUserManagedGroupID retrieves group IDs managed by a user.
FindUserManagedGroupID(ctx context.Context, userID string) (groupIDs []string, err error)
// PageGroupRequest paginates through group requests for specified groups.
PageGroupRequest(ctx context.Context, groupIDs []string, pagination pagination.Pagination) (int64, []*relationtb.GroupRequestModel, error)
// GetGroupRoleLevelMemberIDs retrieves user IDs of group members with a specific role level.
GetGroupRoleLevelMemberIDs(ctx context.Context, groupID string, roleLevel int32) ([]string, error)
// PageGetJoinGroup paginates through groups that a user has joined.
PageGetJoinGroup(ctx context.Context, userID string, pagination pagination.Pagination) (total int64, totalGroupMembers []*relationtb.GroupMemberModel, err error)
// PageGetGroupMember paginates through members of a group.
PageGetGroupMember(ctx context.Context, groupID string, pagination pagination.Pagination) (total int64, totalGroupMembers []*relationtb.GroupMemberModel, err error)
// SearchGroupMember searches for group members based on a keyword, group ID, and pagination settings.
SearchGroupMember(ctx context.Context, keyword string, groupID string, pagination pagination.Pagination) (int64, []*relationtb.GroupMemberModel, error)
// HandlerGroupRequest processes a group join request with a specified result.
HandlerGroupRequest(ctx context.Context, groupID string, userID string, handledMsg string, handleResult int32, member *relationtb.GroupMemberModel) error
// DeleteGroupMember removes specified users from a group.
DeleteGroupMember(ctx context.Context, groupID string, userIDs []string) error
// MapGroupMemberUserID maps group IDs to their members' simplified user IDs.
MapGroupMemberUserID(ctx context.Context, groupIDs []string) (map[string]*relationtb.GroupSimpleUserID, error)
// MapGroupMemberNum maps group IDs to their member count.
MapGroupMemberNum(ctx context.Context, groupIDs []string) (map[string]uint32, error)
TransferGroupOwner(ctx context.Context, groupID string, oldOwnerUserID, newOwnerUserID string, roleLevel int32) error // 转让群
// TransferGroupOwner transfers the ownership of a group to another user.
TransferGroupOwner(ctx context.Context, groupID string, oldOwnerUserID, newOwnerUserID string, roleLevel int32) error
// UpdateGroupMember updates properties of a group member.
UpdateGroupMember(ctx context.Context, groupID string, userID string, data map[string]any) error
// UpdateGroupMembers batch updates properties of group members.
UpdateGroupMembers(ctx context.Context, data []*relationtb.BatchUpdateGroupMember) error
// GroupRequest
// CreateGroupRequest creates new group join requests.
CreateGroupRequest(ctx context.Context, requests []*relationtb.GroupRequestModel) error
// TakeGroupRequest retrieves a specific group join request.
TakeGroupRequest(ctx context.Context, groupID string, userID string) (*relationtb.GroupRequestModel, error)
// FindGroupRequests retrieves multiple group join requests.
FindGroupRequests(ctx context.Context, groupID string, userIDs []string) ([]*relationtb.GroupRequestModel, error)
// PageGroupRequestUser paginates through group join requests made by a user.
PageGroupRequestUser(ctx context.Context, userID string, pagination pagination.Pagination) (int64, []*relationtb.GroupRequestModel, error)
// 获取群总数
// CountTotal counts the total number of groups as of a certain date.
CountTotal(ctx context.Context, before *time.Time) (count int64, err error)
// 获取范围内群增量
// CountRangeEverydayTotal counts the daily group creation total within a specified date range.
CountRangeEverydayTotal(ctx context.Context, start time.Time, end time.Time) (map[string]int64, error)
// DeleteGroupMemberHash deletes the hash entries for group members in specified groups.
DeleteGroupMemberHash(ctx context.Context, groupIDs []string) error
}

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save