fix: fix the comflect

pull/1987/head
luhaoling 2 years ago
commit 41eceaaf7a

@ -51,6 +51,15 @@
</p> </p>
## :busts_in_silhouette: Community
+ 💬 [Follow our Twitter account](https://twitter.com/founder_im63606)
+ 👫 [Join our Reddit](https://www.reddit.com/r/OpenIMessaging)
+ 🚀 [Join our Slack community](https://join.slack.com/t/openimsdk/shared_invite/zt-22720d66b-o_FvKxMTGXtcnnnHiMqe9Q)
+ :eyes: [Join our wechat (微信群)](https://openim-1253691595.cos.ap-nanjing.myqcloud.com/WechatIMG20.jpeg)
+ 📚 [OpenIM Community](https://github.com/OpenIMSDK/community)
+ 💕 [OpenIM Interest Group](https://github.com/Openim-sigs)
## Ⓜ️ About OpenIM ## Ⓜ️ About OpenIM
OpenIM is a service platform specifically designed for integrating chat, audio-video calls, notifications, and AI chatbots into applications. It provides a range of powerful APIs and Webhooks, enabling developers to easily incorporate these interactive features into their applications. OpenIM is not a standalone chat application, but rather serves as a platform to support other applications in achieving rich communication functionalities. The following diagram illustrates the interaction between AppServer, AppClient, OpenIMServer, and OpenIMSDK to explain in detail. OpenIM is a service platform specifically designed for integrating chat, audio-video calls, notifications, and AI chatbots into applications. It provides a range of powerful APIs and Webhooks, enabling developers to easily incorporate these interactive features into their applications. OpenIM is not a standalone chat application, but rather serves as a platform to support other applications in achieving rich communication functionalities. The following diagram illustrates the interaction between AppServer, AppClient, OpenIMServer, and OpenIMSDK to explain in detail.
@ -154,16 +163,6 @@ Before you start, please make sure your changes are in demand. The best for that
- [Manage backend and monitor deployment](https://github.com/openimsdk/open-im-server/tree/main/docs/contrib/prometheus-grafana.md) - [Manage backend and monitor deployment](https://github.com/openimsdk/open-im-server/tree/main/docs/contrib/prometheus-grafana.md)
- [Mac Developer Deployment Guide for OpenIM](https://github.com/openimsdk/open-im-server/tree/main/docs/contrib/mac-developer-deployment-guide.md) - [Mac Developer Deployment Guide for OpenIM](https://github.com/openimsdk/open-im-server/tree/main/docs/contrib/mac-developer-deployment-guide.md)
## :busts_in_silhouette: Community
+ 📚 [OpenIM Community](https://github.com/OpenIMSDK/community)
+ 💕 [OpenIM Interest Group](https://github.com/Openim-sigs)
+ 🚀 [Join our Slack community](https://join.slack.com/t/openimsdk/shared_invite/zt-22720d66b-o_FvKxMTGXtcnnnHiMqe9Q)
+ :eyes: [Join our wechat (微信群)](https://openim-1253691595.cos.ap-nanjing.myqcloud.com/WechatIMG20.jpeg)
+ 👫 [Join our Reddit](https://www.reddit.com/r/OpenIMessaging)
+ 💬 [Follow our Twitter account](https://twitter.com/openimsdk)
## :calendar: Community Meetings ## :calendar: Community Meetings
We want anyone to get involved in our community and contributing code, we offer gifts and rewards, and we welcome you to join us every Thursday night. We want anyone to get involved in our community and contributing code, we offer gifts and rewards, and we welcome you to join us every Thursday night.

@ -153,6 +153,13 @@ object:
accessKeySecret: '' accessKeySecret: ''
sessionToken: '' sessionToken: ''
publicRead: false publicRead: false
aws:
endpoint: ""
region: ""
bucket: "demo-9999999"
accessKeyID: ''
accessKeySecret: ''
publicRead: false
###################### RPC Port Configuration ###################### ###################### RPC Port Configuration ######################
# RPC service ports # RPC service ports

@ -153,6 +153,13 @@ object:
accessKeySecret: ${KODO_ACCESS_KEY_SECRET} accessKeySecret: ${KODO_ACCESS_KEY_SECRET}
sessionToken: ${KODO_SESSION_TOKEN} sessionToken: ${KODO_SESSION_TOKEN}
publicRead: ${KODO_PUBLIC_READ} publicRead: ${KODO_PUBLIC_READ}
aws:
endpoint: "${AWS_ENDPOINT}" # This might not be necessary unless you're using a custom endpoint
region: "${AWS_REGION}"
bucket: "${AWS_BUCKET}"
accessKeyID: ${AWS_ACCESS_KEY_ID}
accessKeySecret: ${AWS_SECRET_ACCESS_KEY}
publicRead: ${AWS_PUBLIC_READ}
###################### RPC Port Configuration ###################### ###################### RPC Port Configuration ######################
# RPC service ports # RPC service ports

@ -0,0 +1,186 @@
<p align="center">
<a href="https://openim.io">
<img src="../../assets/logo-gif/openim-logo.gif" width="60%" height="30%"/>
</a>
</p>
<div align="center">
[![Stars](https://img.shields.io/github/stars/openimsdk/open-im-server?style=for-the-badge&logo=github&colorB=ff69b4)](https://github.com/openimsdk/open-im-server/stargazers)
[![Forks](https://img.shields.io/github/forks/openimsdk/open-im-server?style=for-the-badge&logo=github&colorB=blue)](https://github.com/openimsdk/open-im-server/network/members)
[![Codecov](https://img.shields.io/codecov/c/github/openimsdk/open-im-server?style=for-the-badge&logo=codecov&colorB=orange)](https://app.codecov.io/gh/openimsdk/open-im-server)
[![Go Report Card](https://goreportcard.com/badge/github.com/openimsdk/open-im-server?style=for-the-badge)](https://goreportcard.com/report/github.com/openimsdk/open-im-server)
[![Go Reference](https://img.shields.io/badge/Go%20Reference-blue.svg?style=for-the-badge&logo=go&logoColor=white)](https://pkg.go.dev/github.com/openimsdk/open-im-server/v3)
[![License](https://img.shields.io/badge/license-Apache--2.0-green?style=for-the-badge)](https://github.com/openimsdk/open-im-server/blob/main/LICENSE)
[![Slack](https://img.shields.io/badge/Slack-500%2B-blueviolet?style=for-the-badge&logo=slack&logoColor=white)](https://join.slack.com/t/openimsdk/shared_invite/zt-22720d66b-o_FvKxMTGXtcnnnHiMqe9Q)
[![Best Practices](https://img.shields.io/badge/Best%20Practices-purple?style=for-the-badge)](https://www.bestpractices.dev/projects/8045)
[![Good First Issues](https://img.shields.io/github/issues/openimsdk/open-im-server/good%20first%20issue?style=for-the-badge&logo=github)](https://github.com/openimsdk/open-im-server/issues?q=is%3Aissue+is%3Aopen+sort%3Aupdated-desc+label%3A%22good+first+issue%22)
[![Language](https://img.shields.io/badge/Language-Go-blue.svg?style=for-the-badge&logo=go&logoColor=white)](https://golang.org/)
<p align="center">
<a href="../../README.md">Englist</a> ·
<a href="../../README_zh_CN.md">中文</a> ·
<a href="./README_uk.md">Українська</a> ·
<a href="./README_cs.md">Česky</a> ·
<a href="./README_hu.md">Magyar</a> ·
<a href="./README_es.md">Español</a> ·
<a href="./README_fa.md">فارسی</a> ·
<a href="./README_fr.md">Français</a> ·
<a href="./README_de.md">Deutsch</a> ·
<a href="./README_pl.md">Polski</a> ·
<a href="./README_id.md">Indonesian</a> ·
<a href="./README_fi.md">Suomi</a> ·
<a href="./README_ml.md">മലയാളം</a> ·
<a href="./README_ja.md">日本語</a> ·
<a href="./README_nl.md">Nederlands</a> ·
<a href="./README_it.md">Italiano</a> ·
<a href="./README_ru.md">Русский</a> ·
<a href="./README_pt_BR.md">Português (Brasil)</a> ·
<a href="./README_eo.md">Esperanto</a> ·
<a href="./README_ko.md">한국어</a> ·
<a href="./README_ar.md">العربي</a> ·
<a href="./README_vi.md">Tiếng Việt</a> ·
<a href="./README_da.md">Dansk</a> ·
<a href="./README_el.md">Ελληνικά</a> ·
<a href="./README_tr.md">Türkçe</a>
</p>
</div>
</p>
## Ⓜ️ Σχετικά με το OpenIM
Το OpenIM είναι μια πλατφόρμα υπηρεσιών σχεδιασμένη ειδικά για την ενσωμάτωση συνομιλίας, κλήσεων ήχου-βίντεο, ειδοποιήσεων και chatbots AI σε εφαρμογές. Παρέχει μια σειρά από ισχυρά API και Webhooks, επιτρέποντας στους προγραμματιστές να ενσωματώσουν εύκολα αυτές τις αλληλεπιδραστικές λειτουργίες στις εφαρμογές τους. Το OpenIM δεν είναι μια αυτόνομη εφαρμογή συνομιλίας, αλλά λειτουργεί ως πλατφόρμα υποστήριξης άλλων εφαρμογών για την επίτευξη πλούσιων λειτουργιών επικοινωνίας. Το παρακάτω διάγραμμα απεικονίζει την αλληλεπίδραση μεταξύ AppServer, AppClient, OpenIMServer και OpenIMSDK για να εξηγήσει αναλυτικά.
![App-OpenIM Relationship](../../docs/images/oepnim-design.png)
## 🚀 Σχετικά με το OpenIMSDK
Το **OpenIMSDK** είναι ένα SDK για αμεση ανταλλαγή μηνυμάτων σχεδιασμένο για το **OpenIMServer**, δημιουργήθηκε ειδικά για ενσωμάτωση σε εφαρμογές πελατών. Οι κύριες δυνατότητες και μονάδες του είναι οι εξής:
+ 🌟 Κύριες Δυνατότητες:
- 📦 Τοπική αποθήκευση
- 🔔 Callbacks ακροατών
- 🛡️ Περιτύλιγμα API
- 🌐 Διαχείριση σύνδεσης
+ 📚 Κύριες Μονάδες:
1. 🚀 Αρχικοποίηση και Σύνδεση
2. 👤 Διαχείριση Χρηστών
3. 👫 Διαχείριση Φίλων
4. 🤖 Λειτουργίες Ομάδας
5. 💬 Διαχείριση Συνομιλιών
Είναι κατασκευασμένο χρησιμοποιώντας Golang και υποστηρίζει διασταυρούμενη πλατφόρμα ανάπτυξης, διασφαλίζοντας μια συνεπή εμπειρία πρόσβασης σε όλες τις πλατφόρμες.
👉 **[Εξερευνήστε το GO SDK](https://github.com/openimsdk/openim-sdk-core)**
## 🌐 Σχετικά με το OpenIMServer
+ Το **OpenIMServer** έχει τις ακόλουθες χαρακτηριστικές:
- 🌐 Αρχιτεκτονική μικροϋπηρεσιών: Υποστηρίζει λειτουργία σε σύμπλεγμα, περιλαμβάνοντας έναν πύλη και πολλαπλές υπηρεσίες rpc.
- 🚀 Διάφοροι τρόποι ανάπτυξης: Υποστηρίζει ανάπτυξη μέσω πηγαίου κώδικα, Kubernetes, ή Docker.
- Υποστήριξη για τεράστια βάση χρηστών: Πολύ μεγάλες ομάδες με εκατοντάδες χιλιάδες χρήστες, δεκάδες εκατομμύρια χρήστες και δισεκατομμύρια μηνύματα.
### Ενισχυμένη Επιχειρηματική Λειτουργικότητα:
+ **REST API**: Το OpenIMServer προσφέρει REST APIs για επιχειρηματικά συστήματα, με στόχο την ενδυνάμωση των επιχειρήσεων με περισσότερες λειτουργικότητες, όπως η δημιουργία ομάδων και η αποστολή μηνυμάτων push μέσω backend διεπαφών.
+ **Webhooks**: Το OpenIMServer παρέχει δυνατότητες επανάκλησης για την επέκταση περισσότερων επιχειρηματικών μορφών. Μια επανάκληση σημαίνει ότι το OpenIMServer στέλνει ένα αίτημα στον επιχειρηματικό διακομιστή πριν ή μετά από ένα συγκεκριμένο γεγονός, όπως επανακλήσεις πριν ή μετά την αποστολή ενός μηνύματος.
👉 **[Μάθετε περισσότερα](https://docs.openim.io/guides/introduction/product)**
## :building_construction: Συνολική Αρχιτεκτονική
Εξερευνήστε σε βάθος τη λειτουργικότητα του Open-IM-Server με το διάγραμμα αρχιτεκτονικής μας.
![Overall Architecture](../../docs/images/architecture-layers.png)
## :rocket: Γρήγορη Εκκίνηση
Υποστηρίζουμε πολλές πλατφόρμες. Εδώ είναι οι διευθύνσεις για γρήγορη εμπειρία στην πλευρά του διαδικτύου:
👉 **[Διαδικτυακή επίδειξη του OpenIM](https://web-enterprise.rentsoft.cn/)**
🤲 Για να διευκολύνουμε την εμπειρία του χρήστη, προσφέρουμε διάφορες λύσεις ανάπτυξης. Μπορείτε να επιλέξετε τη μέθοδο ανάπτυξης σας από την παρακάτω λίστα:
+ **[Οδηγός Ανάπτυξης Κώδικα Πηγής](https://docs.openim.io/guides/gettingStarted/imSourceCodeDeployment)**
+ **[δηγός Ανάπτυξης μέσω Docker](https://docs.openim.io/guides/gettingStarted/dockerCompose)**
+ **[Οδηγός Ανάπτυξης Kubernetes](https://docs.openim.io/guides/gettingStarted/k8s-deployment)**
+ **[Οδηγός Ανάπτυξης για Αναπτυξιακούς στο Mac](https://docs.openim.io/guides/gettingstarted/mac-deployment-guide)**
## :hammer_and_wrench: Για να Αρχίσετε την Ανάπτυξη του OpenIM
[![Άνοιγμα σε Dev Container](https://img.shields.io/static/v1?label=Dev%20Container&message=Open&color=blue&logo=visualstudiocode)](https://vscode.dev/github/openimsdk/open-im-server)
OpenIM Στόχος μας είναι να δημιουργήσουμε μια κορυφαίου επιπέδου ανοιχτή πηγή κοινότητας. Διαθέτουμε ένα σύνολο προτύπων, στο [Αποθετήριο Κοινότητας](https://github.com/OpenIMSDK/community).
Εάν θέλετε να συνεισφέρετε σε αυτό το αποθετήριο Open-IM-Server, παρακαλούμε διαβάστε την [τεκμηρίωση συνεισφέροντος](https://github.com/openimsdk/open-im-server/blob/main/CONTRIBUTING.md).
Πριν ξεκινήσετε, παρακαλούμε βεβαιωθείτε ότι οι αλλαγές σας είναι ζητούμενες. Το καλύτερο για αυτό είναι να δημιουργήσετε ένα [νέα συζήτηση](https://github.com/openimsdk/open-im-server/discussions/new/choose) ή [Επικοινωνία Slack](https://join.slack.com/t/openimsdk/shared_invite/zt-22720d66b-o_FvKxMTGXtcnnnHiMqe9Q), ή αν βρείτε ένα ζήτημα, [αναφέρετέ το](https://github.com/openimsdk/open-im-server/issues/new/choose) πρώτα.
- [Αναφορά API του OpenIM](https://github.com/openimsdk/open-im-server/tree/main/docs/contrib/api.md)
- [Καταγραφή Bash του OpenIM](https://github.com/openimsdk/open-im-server/tree/main/docs/contrib/bash-log.md)
- [Ενέργειες CI/CD του OpenIMs](https://github.com/openimsdk/open-im-server/tree/main/docs/contrib/cicd-actions.md)
- [Συμβάσεις Κώδικα του OpenIM](https://github.com/openimsdk/open-im-server/tree/main/docs/contrib/code-conventions.md)
- [Οδηγίες Commit του OpenIM](https://github.com/openimsdk/open-im-server/tree/main/docs/contrib/commit.md)
- [Οδηγός Ανάπτυξης του OpenIM](https://github.com/openimsdk/open-im-server/tree/main/docs/contrib/development.md)
- [Δομή Καταλόγου του OpenIM](https://github.com/openimsdk/open-im-server/tree/main/docs/contrib/directory.md)
- [Ρύθμιση Περιβάλλοντος του OpenIM](https://github.com/openimsdk/open-im-server/tree/main/docs/contrib/environment.md)
- [Αναφορά Κωδικών Σφάλματος του OpenIM](https://github.com/openimsdk/open-im-server/tree/main/docs/contrib/error-code.md)
- [Ροή Εργασίας Git του OpenIM](https://github.com/openimsdk/open-im-server/tree/main/docs/contrib/git-workflow.md)
- [Οδηγός Cherry Pick του Git του OpenIM](https://github.com/openimsdk/open-im-server/tree/main/docs/contrib/gitcherry-pick.md)
- [Ροή Εργασίας GitHub του OpenIM](https://github.com/openimsdk/open-im-server/tree/main/docs/contrib/github-workflow.md)
- [Πρότυπα Κώδικα Go του OpenIM](https://github.com/openimsdk/open-im-server/tree/main/docs/contrib/go-code.md)
- [Οδηγίες Εικόνας του OpenIM](https://github.com/openimsdk/open-im-server/tree/main/docs/contrib/images.md)
- [Αρχική Διαμόρφωση του OpenIM](https://github.com/openimsdk/open-im-server/tree/main/docs/contrib/init-config.md)
- [Οδηγός Εγκατάστασης Docker του OpenIM](https://github.com/openimsdk/open-im-server/tree/main/docs/contrib/install-docker.md)
- [Οδηγός Εγκατάστασης Συστήματος Linux του Open](https://github.com/openimsdk/open-im-server/tree/main/docs/contrib/install-openim-linux-system.md)
- [Οδηγός Ανάπτυξης Linux του OpenIM](https://github.com/openimsdk/open-im-server/tree/main/docs/contrib/linux-development.md)
- [Οδηγός Τοπικών Δράσεων του OpenIM](https://github.com/openimsdk/open-im-server/tree/main/docs/contrib/local-actions.md)
- [Συμβάσεις Καταγραφής του OpenIM](https://github.com/openimsdk/open-im-server/tree/main/docs/contrib/logging.md)
- [Αποστολή Εκτός Σύνδεσης του OpenIM](https://github.com/openimsdk/open-im-server/tree/main/docs/contrib/offline-deployment.md)
- [Εργαλεία Protoc του OpenIM](https://github.com/openimsdk/open-im-server/tree/main/docs/contrib/protoc-tools.md)
- [Οδηγός Δοκιμών του OpenIM](https://github.com/openimsdk/open-im-server/tree/main/docs/contrib/test.md)
- [Χρησιμότητα Go του OpenIM](https://github.com/openimsdk/open-im-server/tree/main/docs/contrib/util-go.md)
- [Χρησιμότητες Makefile του OpenIM](https://github.com/openimsdk/open-im-server/tree/main/docs/contrib/util-makefile.md)
- [Χρησιμότητες Σεναρίου του OpenIM](https://github.com/openimsdk/open-im-server/tree/main/docs/contrib/util-scripts.md)
- [Έκδοση του OpenIM](https://github.com/openimsdk/open-im-server/tree/main/docs/contrib/version.md)
- [Διαχείριση backend και παρακολούθηση ανάπτυξης](https://github.com/openimsdk/open-im-server/tree/main/docs/contrib/prometheus-grafana.md)
- [Οδηγός Ανάπτυξης για Προγραμματιστές Mac του OpenIM](https://github.com/openimsdk/open-im-server/tree/main/docs/contrib/mac-developer-deployment-guide.md)
## :busts_in_silhouette: Κοινότητα
+ 📚 [Κοινότητα OpenIM](https://github.com/OpenIMSDK/community)
+ 💕 [Ομάδα Ενδιαφέροντος OpenIM](https://github.com/Openim-sigs)
+ 🚀 [Εγγραφείτε στην κοινότητα Slack μας](https://join.slack.com/t/openimsdk/shared_invite/zt-22720d66b-o_FvKxMTGXtcnnnHiMqe9Q)
+ :eyes: [γγραφείτε στην ομάδα μας wechat (微信群)](https://openim-1253691595.cos.ap-nanjing.myqcloud.com/WechatIMG20.jpeg)
## :calendar: Συναντήσεις της κοινότητας
Θέλουμε οποιονδήποτε να εμπλακεί στην κοινότητά μας και να συνεισφέρει κώδικα. Προσφέρουμε δώρα και ανταμοιβές και σας καλωσορίζουμε να μας ενταχθείτε κάθε Πέμπτη βράδυ.
Η διάσκεψή μας είναι στο [OpenIM Slack](https://join.slack.com/t/openimsdk/shared_invite/zt-22720d66b-o_FvKxMTGXtcnnnHiMqe9Q) 🎯, στη συνέχεια μπορείτε να αναζητήσετε τη διαδικασία Open-IM-Server για να συμμετάσχετε
Κάνουμε σημειώσεις για κάθε μια [Σημειώνουμε κάθε διμηνιαία συνάντηση](https://github.com/orgs/OpenIMSDK/discussions/categories/meeting) στις [συζητήσεις του GitHub](https://github.com/openimsdk/open-im-server/discussions/categories/meeting), Οι ιστορικές μας σημειώσεις συναντήσεων, καθώς και οι επαναλήψεις των συναντήσεων είναι διαθέσιμες στο[Έγγραφα της Google :bookmark_tabs:](https://docs.google.com/document/d/1nx8MDpuG74NASx081JcCpxPgDITNTpIIos0DS6Vr9GU/edit?usp=sharing).
## :eyes: Ποιοί Χρησιμοποιούν το OpenIM
Ελέγξτε τη σελίδα με τις [μελέτες περίπτωσης χρήσης ](https://github.com/OpenIMSDK/community/blob/main/ADOPTERS.md) μας για μια λίστα των χρηστών του έργου. Μην διστάσετε να αφήσετε ένα[📝σχόλιο](https://github.com/openimsdk/open-im-server/issues/379) και να μοιραστείτε την περίπτωση χρήσης σας.
## :page_facing_up: Άδεια Χρήσης
Το OpenIM διατίθεται υπό την άδεια Apache 2.0. Δείτε τη [ΑΔΕΙΑ ΧΡΗΣΗΣ](https://github.com/openimsdk/open-im-server/tree/main/LICENSE) για το πλήρες κείμενο της άδειας.
Το λογότυπο του OpenIM, συμπεριλαμβανομένων των παραλλαγών και των κινούμενων εικόνων, που εμφανίζονται σε αυτό το αποθετήριο[OpenIM](https://github.com/openimsdk/open-im-server) υπό τις διευθύνσεις [assets/logo](../../assets/logo) και [assets/logo-gif](../../assets/logo-gif) προστατεύονται από τους νόμους περί πνευματικής ιδιοκτησίας.
## 🔮 Ευχαριστούμε τους συνεισφέροντες μας!
<a href="https://github.com/openimsdk/open-im-server/graphs/contributors">
<img src="https://contrib.rocks/image?repo=openimsdk/open-im-server" />
</a>

@ -5,7 +5,7 @@ go 1.19
require ( require (
firebase.google.com/go v3.13.0+incompatible firebase.google.com/go v3.13.0+incompatible
github.com/OpenIMSDK/protocol v0.0.55 github.com/OpenIMSDK/protocol v0.0.55
github.com/OpenIMSDK/tools v0.0.35 github.com/OpenIMSDK/tools v0.0.36
github.com/bwmarrin/snowflake v0.3.0 // indirect github.com/bwmarrin/snowflake v0.3.0 // indirect
github.com/dtm-labs/rockscache v0.1.1 github.com/dtm-labs/rockscache v0.1.1
github.com/gin-gonic/gin v1.9.1 github.com/gin-gonic/gin v1.9.1
@ -36,6 +36,7 @@ require github.com/google/uuid v1.6.0
require ( require (
github.com/IBM/sarama v1.42.2 github.com/IBM/sarama v1.42.2
github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible
github.com/aws/aws-sdk-go v1.49.21
github.com/go-redis/redis v6.15.9+incompatible github.com/go-redis/redis v6.15.9+incompatible
github.com/redis/go-redis/v9 v9.4.0 github.com/redis/go-redis/v9 v9.4.0
github.com/spf13/pflag v1.0.5 github.com/spf13/pflag v1.0.5
@ -94,6 +95,7 @@ require (
github.com/jinzhu/copier v0.3.5 // indirect github.com/jinzhu/copier v0.3.5 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect github.com/jinzhu/now v1.1.5 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect github.com/json-iterator/go v1.1.12 // indirect
github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd // indirect github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd // indirect
github.com/klauspost/compress v1.17.4 // indirect github.com/klauspost/compress v1.17.4 // indirect

@ -20,8 +20,8 @@ github.com/IBM/sarama v1.42.2 h1:VoY4hVIZ+WQJ8G9KNY/SQlWguBQXQ9uvFPOnrcu8hEw=
github.com/IBM/sarama v1.42.2/go.mod h1:FLPGUGwYqEs62hq2bVG6Io2+5n+pS6s/WOXVKWSLFtE= github.com/IBM/sarama v1.42.2/go.mod h1:FLPGUGwYqEs62hq2bVG6Io2+5n+pS6s/WOXVKWSLFtE=
github.com/OpenIMSDK/protocol v0.0.55 h1:eBjg8DyuhxGmuCUjpoZjg6MJJJXU/xJ3xJwFhrn34yA= github.com/OpenIMSDK/protocol v0.0.55 h1:eBjg8DyuhxGmuCUjpoZjg6MJJJXU/xJ3xJwFhrn34yA=
github.com/OpenIMSDK/protocol v0.0.55/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= github.com/OpenIMSDK/protocol v0.0.55/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y=
github.com/OpenIMSDK/tools v0.0.35 h1:YH8UYoaErXqfNrwpUvQxe8nhL++gFH6qCisQPyzk0w8= github.com/OpenIMSDK/tools v0.0.36 h1:BT0q64l4f3QJDW16Rc0uJYt1gQFkiPoUQYQ33vo0EcE=
github.com/OpenIMSDK/tools v0.0.35/go.mod h1:wBfR5CYmEyvxl03QJbTkhz1CluK6J4/lX0lviu8JAjE= github.com/OpenIMSDK/tools v0.0.36/go.mod h1:wBfR5CYmEyvxl03QJbTkhz1CluK6J4/lX0lviu8JAjE=
github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM= github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM=
github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7 h1:uSoVVbwJiQipAclBbw+8quDsfcvFjOpI5iCf4p/cqCs= github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7 h1:uSoVVbwJiQipAclBbw+8quDsfcvFjOpI5iCf4p/cqCs=
github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7/go.mod h1:6zEj6s6u/ghQa61ZWa/C2Aw3RkjiTBOix7dkqa1VLIs= github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7/go.mod h1:6zEj6s6u/ghQa61ZWa/C2Aw3RkjiTBOix7dkqa1VLIs=
@ -31,6 +31,8 @@ github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239 h1:kFOfPq6dUM1hTo
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c= github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/aws/aws-sdk-go v1.49.21 h1:Rl8KW6HqkwzhATwvXhyr7vD4JFUMi7oXGAw9SrxxIFY=
github.com/aws/aws-sdk-go v1.49.21/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
@ -203,6 +205,10 @@ github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST4RZ4= github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST4RZ4=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
@ -532,6 +538,7 @@ gopkg.in/warnings.v0 v0.1.2 h1:wFXVbFY8DY5/xOe1ECiWdKCzZlxgshcYVNkBHstARME=
gopkg.in/warnings.v0 v0.1.2/go.mod h1:jksf8JmL6Qr/oQM2OXTHunEvvTAsrWBLb6OOjuVWRNI= gopkg.in/warnings.v0 v0.1.2/go.mod h1:jksf8JmL6Qr/oQM2OXTHunEvvTAsrWBLb6OOjuVWRNI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=

@ -22,6 +22,8 @@ import (
"sync" "sync"
"sync/atomic" "sync/atomic"
"github.com/OpenIMSDK/tools/errs"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
@ -76,16 +78,17 @@ type Client struct {
token string token string
} }
func newClient(ctx *UserConnContext, conn LongConn, isCompress bool) *Client { // function not used
return &Client{ // func newClient(ctx *UserConnContext, conn LongConn, isCompress bool) *Client {
w: new(sync.Mutex), // return &Client{
conn: conn, // w: new(sync.Mutex),
PlatformID: utils.StringToInt(ctx.GetPlatformID()), // conn: conn,
IsCompress: isCompress, // PlatformID: utils.StringToInt(ctx.GetPlatformID()),
UserID: ctx.GetUserID(), // IsCompress: isCompress,
ctx: ctx, // UserID: ctx.GetUserID(),
} // ctx: ctx,
} // }
// }
// ResetClient updates the client's state with new connection and context information. // ResetClient updates the client's state with new connection and context information.
func (c *Client) ResetClient( func (c *Client) ResetClient(
@ -173,7 +176,7 @@ func (c *Client) handleMessage(message []byte) error {
var err error var err error
message, err = c.longConnServer.DecompressWithPool(message) message, err = c.longConnServer.DecompressWithPool(message)
if err != nil { if err != nil {
return utils.Wrap(err, "") return errs.Wrap(err)
} }
} }
@ -182,15 +185,15 @@ func (c *Client) handleMessage(message []byte) error {
err := c.longConnServer.Decode(message, binaryReq) err := c.longConnServer.Decode(message, binaryReq)
if err != nil { if err != nil {
return utils.Wrap(err, "") return errs.Wrap(err)
} }
if err := c.longConnServer.Validate(binaryReq); err != nil { if err := c.longConnServer.Validate(binaryReq); err != nil {
return utils.Wrap(err, "") return errs.Wrap(err)
} }
if binaryReq.SendID != c.UserID { if binaryReq.SendID != c.UserID {
return utils.Wrap(errors.New("exception conn userID not same to req userID"), binaryReq.String()) return errs.Wrap(errors.New("exception conn userID not same to req userID"), binaryReq.String())
} }
ctx := mcontext.WithMustInfoCtx( ctx := mcontext.WithMustInfoCtx(
@ -313,7 +316,7 @@ func (c *Client) writeBinaryMsg(resp Resp) error {
encodedBuf, err := c.longConnServer.Encode(resp) encodedBuf, err := c.longConnServer.Encode(resp)
if err != nil { if err != nil {
return utils.Wrap(err, "") return errs.Wrap(err)
} }
c.w.Lock() c.w.Lock()
@ -323,7 +326,7 @@ func (c *Client) writeBinaryMsg(resp Resp) error {
if c.IsCompress { if c.IsCompress {
resultBuf, compressErr := c.longConnServer.CompressWithPool(encodedBuf) resultBuf, compressErr := c.longConnServer.CompressWithPool(encodedBuf)
if compressErr != nil { if compressErr != nil {
return utils.Wrap(compressErr, "") return errs.Wrap(compressErr)
} }
return c.conn.WriteMessage(MessageBinary, resultBuf) return c.conn.WriteMessage(MessageBinary, resultBuf)
} }
@ -341,7 +344,7 @@ func (c *Client) writePongMsg() error {
err := c.conn.SetWriteDeadline(writeWait) err := c.conn.SetWriteDeadline(writeWait)
if err != nil { if err != nil {
return utils.Wrap(err, "") return errs.Wrap(err)
} }
return c.conn.WriteMessage(PongMessage, nil) return c.conn.WriteMessage(PongMessage, nil)

@ -21,7 +21,7 @@ import (
"io" "io"
"sync" "sync"
"github.com/OpenIMSDK/tools/utils" "github.com/OpenIMSDK/tools/errs"
) )
var ( var (
@ -47,10 +47,10 @@ func (g *GzipCompressor) Compress(rawData []byte) ([]byte, error) {
gzipBuffer := bytes.Buffer{} gzipBuffer := bytes.Buffer{}
gz := gzip.NewWriter(&gzipBuffer) gz := gzip.NewWriter(&gzipBuffer)
if _, err := gz.Write(rawData); err != nil { if _, err := gz.Write(rawData); err != nil {
return nil, utils.Wrap(err, "") return nil, errs.Wrap(err)
} }
if err := gz.Close(); err != nil { if err := gz.Close(); err != nil {
return nil, utils.Wrap(err, "") return nil, errs.Wrap(err)
} }
return gzipBuffer.Bytes(), nil return gzipBuffer.Bytes(), nil
} }
@ -63,10 +63,10 @@ func (g *GzipCompressor) CompressWithPool(rawData []byte) ([]byte, error) {
gz.Reset(&gzipBuffer) gz.Reset(&gzipBuffer)
if _, err := gz.Write(rawData); err != nil { if _, err := gz.Write(rawData); err != nil {
return nil, utils.Wrap(err, "") return nil, errs.Wrap(err)
} }
if err := gz.Close(); err != nil { if err := gz.Close(); err != nil {
return nil, utils.Wrap(err, "") return nil, errs.Wrap(err)
} }
return gzipBuffer.Bytes(), nil return gzipBuffer.Bytes(), nil
} }
@ -75,11 +75,11 @@ func (g *GzipCompressor) DeCompress(compressedData []byte) ([]byte, error) {
buff := bytes.NewBuffer(compressedData) buff := bytes.NewBuffer(compressedData)
reader, err := gzip.NewReader(buff) reader, err := gzip.NewReader(buff)
if err != nil { if err != nil {
return nil, utils.Wrap(err, "NewReader failed") return nil, errs.Wrap(err, "NewReader failed")
} }
compressedData, err = io.ReadAll(reader) compressedData, err = io.ReadAll(reader)
if err != nil { if err != nil {
return nil, utils.Wrap(err, "ReadAll failed") return nil, errs.Wrap(err, "ReadAll failed")
} }
_ = reader.Close() _ = reader.Close()
return compressedData, nil return compressedData, nil
@ -88,18 +88,18 @@ func (g *GzipCompressor) DeCompress(compressedData []byte) ([]byte, error) {
func (g *GzipCompressor) DecompressWithPool(compressedData []byte) ([]byte, error) { func (g *GzipCompressor) DecompressWithPool(compressedData []byte) ([]byte, error) {
reader := gzipReaderPool.Get().(*gzip.Reader) reader := gzipReaderPool.Get().(*gzip.Reader)
if reader == nil { if reader == nil {
return nil, errors.New("NewReader failed") return nil, errs.Wrap(errors.New("NewReader failed"))
} }
defer gzipReaderPool.Put(reader) defer gzipReaderPool.Put(reader)
err := reader.Reset(bytes.NewReader(compressedData)) err := reader.Reset(bytes.NewReader(compressedData))
if err != nil { if err != nil {
return nil, utils.Wrap(err, "NewReader failed") return nil, errs.Wrap(err, "NewReader failed")
} }
compressedData, err = io.ReadAll(reader) compressedData, err = io.ReadAll(reader)
if err != nil { if err != nil {
return nil, utils.Wrap(err, "ReadAll failed") return nil, errs.Wrap(err, "ReadAll failed")
} }
_ = reader.Close() _ = reader.Close()
return compressedData, nil return compressedData, nil

@ -18,7 +18,7 @@ import (
"bytes" "bytes"
"encoding/gob" "encoding/gob"
"github.com/OpenIMSDK/tools/utils" "github.com/OpenIMSDK/tools/errs"
) )
type Encoder interface { type Encoder interface {
@ -47,7 +47,7 @@ func (g *GobEncoder) Decode(encodeData []byte, decodeData any) error {
dec := gob.NewDecoder(buff) dec := gob.NewDecoder(buff)
err := dec.Decode(decodeData) err := dec.Decode(decodeData)
if err != nil { if err != nil {
return utils.Wrap(err, "") return errs.Wrap(err)
} }
return nil return nil
} }

@ -108,10 +108,12 @@ func (d *GWebSocket) Dial(urlStr string, requestHeader http.Header) (*http.Respo
} }
func (d *GWebSocket) IsNil() bool { func (d *GWebSocket) IsNil() bool {
if d.conn != nil { return d.conn == nil
return false //
} // if d.conn != nil {
return true // return false
// }
// return true
} }
func (d *GWebSocket) SetConnNil() { func (d *GWebSocket) SetConnNil() {

@ -61,11 +61,12 @@ type LongConnServer interface {
MessageHandler MessageHandler
} }
var bufferPool = sync.Pool{ // bufferPool is unused
New: func() any { // var bufferPool = sync.Pool{
return make([]byte, 1024) // New: func() any {
}, // return make([]byte, 1024)
} // },
// }
type WsServer struct { type WsServer struct {
globalConfig *config.GlobalConfig globalConfig *config.GlobalConfig

@ -58,12 +58,12 @@ func (u *UserMap) Get(key string, platformID int) ([]*Client, bool, bool) {
func (u *UserMap) Set(key string, v *Client) { func (u *UserMap) Set(key string, v *Client) {
allClients, existed := u.m.Load(key) allClients, existed := u.m.Load(key)
if existed { if existed {
log.ZDebug(context.Background(), "Set existed", "user_id", key, "client", *v) log.ZDebug(context.Background(), "Set existed", "user_id", key, "client_user_id", v.UserID)
oldClients := allClients.([]*Client) oldClients := allClients.([]*Client)
oldClients = append(oldClients, v) oldClients = append(oldClients, v)
u.m.Store(key, oldClients) u.m.Store(key, oldClients)
} else { } else {
log.ZDebug(context.Background(), "Set not existed", "user_id", key, "client", *v) log.ZDebug(context.Background(), "Set not existed", "user_id", key, "client_user_id", v.UserID)
var clients []*Client var clients []*Client
clients = append(clients, v) clients = append(clients, v)
u.m.Store(key, clients) u.m.Store(key, clients)

@ -71,7 +71,7 @@ func StartTransfer(config *config.GlobalConfig, prometheusPort int) error {
return err return err
} }
if err := client.CreateRpcRootNodes(config.GetServiceNames()); err != nil { if err2 := client.CreateRpcRootNodes(config.GetServiceNames()); err2 != nil {
return err return err
} }
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))

@ -74,10 +74,10 @@ type OnlineHistoryRedisConsumerHandler struct {
chArrays [ChannelNum]chan Cmd2Value chArrays [ChannelNum]chan Cmd2Value
msgDistributionCh chan Cmd2Value msgDistributionCh chan Cmd2Value
singleMsgSuccessCount uint64 // singleMsgSuccessCount uint64
singleMsgFailedCount uint64 // singleMsgFailedCount uint64
singleMsgSuccessCountMutex sync.Mutex // singleMsgSuccessCountMutex sync.Mutex
singleMsgFailedCountMutex sync.Mutex // singleMsgFailedCountMutex sync.Mutex
msgDatabase controller.CommonMsgDatabase msgDatabase controller.CommonMsgDatabase
conversationRpcClient *rpcclient.ConversationRpcClient conversationRpcClient *rpcclient.ConversationRpcClient
@ -130,9 +130,7 @@ func NewOnlineHistoryRedisConsumerHandler(
} }
func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) { func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
for { for cmd := range och.chArrays[channelID] {
select {
case cmd := <-och.chArrays[channelID]:
switch cmd.Cmd { switch cmd.Cmd {
case SourceMessages: case SourceMessages:
msgChannelValue := cmd.Value.(MsgChannelValue) msgChannelValue := cmd.Value.(MsgChannelValue)
@ -189,7 +187,6 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
} }
} }
} }
}
// 获取消息/通知 存储的消息列表, 不存储并且推送的消息列表,. // 获取消息/通知 存储的消息列表, 不存储并且推送的消息列表,.
func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList( func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(

@ -34,8 +34,6 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
http2 "github.com/openimsdk/open-im-server/v3/pkg/common/http" http2 "github.com/openimsdk/open-im-server/v3/pkg/common/http"
"github.com/OpenIMSDK/tools/utils"
) )
var ( var (
@ -142,7 +140,7 @@ func (g *Client) GetTaskID(ctx context.Context, token string, pushReq PushReq) (
pushReq.Settings = &Settings{TTL: &ttl} pushReq.Settings = &Settings{TTL: &ttl}
err := g.request(ctx, taskURL, pushReq, token, &respTask) err := g.request(ctx, taskURL, pushReq, token, &respTask)
if err != nil { if err != nil {
return "", utils.Wrap(err, "") return "", errs.Wrap(err)
} }
return respTask.TaskID, nil return respTask.TaskID, nil
} }

@ -29,7 +29,6 @@ import (
"github.com/OpenIMSDK/tools/log" "github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/mcontext" "github.com/OpenIMSDK/tools/mcontext"
"github.com/OpenIMSDK/tools/tokenverify" "github.com/OpenIMSDK/tools/tokenverify"
"github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
@ -108,7 +107,7 @@ func (s *authServer) GetUserToken(ctx context.Context, req *pbauth.GetUserTokenR
func (s *authServer) parseToken(ctx context.Context, tokensString string) (claims *tokenverify.Claims, err error) { func (s *authServer) parseToken(ctx context.Context, tokensString string) (claims *tokenverify.Claims, err error) {
claims, err = tokenverify.GetClaimFromToken(tokensString, authverify.Secret(s.config.Secret)) claims, err = tokenverify.GetClaimFromToken(tokensString, authverify.Secret(s.config.Secret))
if err != nil { if err != nil {
return nil, utils.Wrap(err, "") return nil, errs.Wrap(err)
} }
m, err := s.authDatabase.GetTokensWithoutError(ctx, claims.UserID, claims.PlatformID) m, err := s.authDatabase.GetTokensWithoutError(ctx, claims.UserID, claims.PlatformID)
if err != nil { if err != nil {
@ -124,7 +123,7 @@ func (s *authServer) parseToken(ctx context.Context, tokensString string) (claim
case constant.KickedToken: case constant.KickedToken:
return nil, errs.ErrTokenKicked.Wrap() return nil, errs.ErrTokenKicked.Wrap()
default: default:
return nil, utils.Wrap(errs.ErrTokenUnknown, "") return nil, errs.Wrap(errs.ErrTokenUnknown)
} }
} }
return nil, errs.ErrTokenNotExist.Wrap() return nil, errs.ErrTokenNotExist.Wrap()

@ -957,7 +957,7 @@ func (s *groupServer) SetGroupInfo(ctx context.Context, req *pbgroup.SetGroupInf
return nil, err return nil, err
} }
if group.Status == constant.GroupStatusDismissed { if group.Status == constant.GroupStatusDismissed {
return nil, utils.Wrap(errs.ErrDismissedAlready, "") return nil, errs.Wrap(errs.ErrDismissedAlready)
} }
resp := &pbgroup.SetGroupInfoResp{} resp := &pbgroup.SetGroupInfoResp{}
count, err := s.db.FindGroupMemberNum(ctx, group.GroupID) count, err := s.db.FindGroupMemberNum(ctx, group.GroupID)

@ -38,19 +38,19 @@ func (m *MsgUtilsCmd) AddFixAllFlag() {
m.Command.PersistentFlags().BoolP("fixAll", "f", false, "openIM fix all seqs") m.Command.PersistentFlags().BoolP("fixAll", "f", false, "openIM fix all seqs")
} }
func (m *MsgUtilsCmd) getFixAllFlag(cmdLines *cobra.Command) bool { /* func (m *MsgUtilsCmd) getFixAllFlag(cmdLines *cobra.Command) bool {
fixAll, _ := cmdLines.Flags().GetBool("fixAll") fixAll, _ := cmdLines.Flags().GetBool("fixAll")
return fixAll return fixAll
} } */
func (m *MsgUtilsCmd) AddClearAllFlag() { func (m *MsgUtilsCmd) AddClearAllFlag() {
m.Command.PersistentFlags().BoolP("clearAll", "c", false, "openIM clear all seqs") m.Command.PersistentFlags().BoolP("clearAll", "c", false, "openIM clear all seqs")
} }
func (m *MsgUtilsCmd) getClearAllFlag(cmdLines *cobra.Command) bool { /* func (m *MsgUtilsCmd) getClearAllFlag(cmdLines *cobra.Command) bool {
clearAll, _ := cmdLines.Flags().GetBool("clearAll") clearAll, _ := cmdLines.Flags().GetBool("clearAll")
return clearAll return clearAll
} } */
func (m *MsgUtilsCmd) AddSuperGroupIDFlag() { func (m *MsgUtilsCmd) AddSuperGroupIDFlag() {
m.Command.PersistentFlags().StringP("superGroupID", "g", "", "openIM superGroupID") m.Command.PersistentFlags().StringP("superGroupID", "g", "", "openIM superGroupID")
@ -65,19 +65,19 @@ func (m *MsgUtilsCmd) AddBeginSeqFlag() {
m.Command.PersistentFlags().Int64P("beginSeq", "b", 0, "openIM beginSeq") m.Command.PersistentFlags().Int64P("beginSeq", "b", 0, "openIM beginSeq")
} }
func (m *MsgUtilsCmd) getBeginSeqFlag(cmdLines *cobra.Command) int64 { /* func (m *MsgUtilsCmd) getBeginSeqFlag(cmdLines *cobra.Command) int64 {
beginSeq, _ := cmdLines.Flags().GetInt64("beginSeq") beginSeq, _ := cmdLines.Flags().GetInt64("beginSeq")
return beginSeq return beginSeq
} } */
func (m *MsgUtilsCmd) AddLimitFlag() { func (m *MsgUtilsCmd) AddLimitFlag() {
m.Command.PersistentFlags().Int64P("limit", "l", 0, "openIM limit") m.Command.PersistentFlags().Int64P("limit", "l", 0, "openIM limit")
} }
func (m *MsgUtilsCmd) getLimitFlag(cmdLines *cobra.Command) int64 { /* func (m *MsgUtilsCmd) getLimitFlag(cmdLines *cobra.Command) int64 {
limit, _ := cmdLines.Flags().GetInt64("limit") limit, _ := cmdLines.Flags().GetInt64("limit")
return limit return limit
} } */
func (m *MsgUtilsCmd) Execute() error { func (m *MsgUtilsCmd) Execute() error {
return m.Command.Execute() return m.Command.Execute()

@ -164,6 +164,14 @@ type GlobalConfig struct {
SessionToken string `yaml:"sessionToken"` SessionToken string `yaml:"sessionToken"`
PublicRead bool `yaml:"publicRead"` PublicRead bool `yaml:"publicRead"`
} `yaml:"kodo"` } `yaml:"kodo"`
Aws struct {
Endpoint string `yaml:"endpoint"`
Region string `yaml:"region"`
Bucket string `yaml:"bucket"`
AccessKeyID string `yaml:"accessKeyID"`
AccessKeySecret string `yaml:"accessKeySecret"`
PublicRead bool `yaml:"publicRead"`
} `yaml:"aws"`
} `yaml:"object"` } `yaml:"object"`
RpcPort struct { RpcPort struct {

@ -26,7 +26,10 @@ import (
func FriendPb2DB(friend *sdkws.FriendInfo) *relation.FriendModel { func FriendPb2DB(friend *sdkws.FriendInfo) *relation.FriendModel {
dbFriend := &relation.FriendModel{} dbFriend := &relation.FriendModel{}
utils.CopyStructFields(dbFriend, friend) err := utils.CopyStructFields(dbFriend, friend)
if err != nil {
return nil
}
dbFriend.FriendUserID = friend.FriendUser.UserID dbFriend.FriendUserID = friend.FriendUser.UserID
dbFriend.CreateTime = utils.UnixSecondToTime(friend.CreateTime) dbFriend.CreateTime = utils.UnixSecondToTime(friend.CreateTime)
return dbFriend return dbFriend
@ -69,7 +72,11 @@ func FriendsDB2Pb(
} }
for _, friend := range friendsDB { for _, friend := range friendsDB {
friendPb := &sdkws.FriendInfo{FriendUser: &sdkws.UserInfo{}} friendPb := &sdkws.FriendInfo{FriendUser: &sdkws.UserInfo{}}
utils.CopyStructFields(friendPb, friend) err := utils.CopyStructFields(friendPb, friend)
if err != nil {
return nil, err
}
friendPb.FriendUser.UserID = users[friend.FriendUserID].UserID friendPb.FriendUser.UserID = users[friend.FriendUserID].UserID
friendPb.FriendUser.Nickname = users[friend.FriendUserID].Nickname friendPb.FriendUser.Nickname = users[friend.FriendUserID].Nickname
friendPb.FriendUser.FaceURL = users[friend.FriendUserID].FaceURL friendPb.FriendUser.FaceURL = users[friend.FriendUserID].FaceURL
@ -79,6 +86,7 @@ func FriendsDB2Pb(
friendsPb = append(friendsPb, friendPb) friendsPb = append(friendsPb, friendPb)
} }
return friendsPb, nil return friendsPb, nil
} }
func FriendRequestDB2Pb( func FriendRequestDB2Pb(

@ -16,7 +16,6 @@ package cache
import ( import (
"context" "context"
"errors"
"math/big" "math/big"
"strings" "strings"
"time" "time"
@ -220,16 +219,16 @@ func (c *ConversationRedisCache) DelConversations(ownerUserID string, conversati
return cache return cache
} }
func (c *ConversationRedisCache) getConversationIndex(convsation *relationtb.ConversationModel, keys []string) (int, error) { // func (c *ConversationRedisCache) getConversationIndex(convsation *relationtb.ConversationModel, keys []string) (int, error) {
key := c.getConversationKey(convsation.OwnerUserID, convsation.ConversationID) // key := c.getConversationKey(convsation.OwnerUserID, convsation.ConversationID)
for _i, _key := range keys { // for _i, _key := range keys {
if _key == key { // if _key == key {
return _i, nil // return _i, nil
} // }
} // }
return 0, errors.New("not found key:" + key + " in keys") // return 0, errors.New("not found key:" + key + " in keys")
} // }
func (c *ConversationRedisCache) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationtb.ConversationModel, error) { func (c *ConversationRedisCache) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationtb.ConversationModel, error) {
//var keys []string //var keys []string
@ -333,7 +332,7 @@ func (c *ConversationRedisCache) DelSuperGroupRecvMsgNotNotifyUserIDsHash(groupI
return cache return cache
} }
func (c *ConversationRedisCache) getUserAllHasReadSeqsIndex(conversationID string, conversationIDs []string) (int, error) { /* func (c *ConversationRedisCache) getUserAllHasReadSeqsIndex(conversationID string, conversationIDs []string) (int, error) {
for _i, _conversationID := range conversationIDs { for _i, _conversationID := range conversationIDs {
if _conversationID == conversationID { if _conversationID == conversationID {
return _i, nil return _i, nil
@ -341,21 +340,21 @@ func (c *ConversationRedisCache) getUserAllHasReadSeqsIndex(conversationID strin
} }
return 0, errors.New("not found key:" + conversationID + " in keys") return 0, errors.New("not found key:" + conversationID + " in keys")
} } */
//func (c *ConversationRedisCache) GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) { /* func (c *ConversationRedisCache) GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) {
// conversationIDs, err := c.GetUserConversationIDs(ctx, ownerUserID) conversationIDs, err := c.GetUserConversationIDs(ctx, ownerUserID)
// if err != nil { if err != nil {
// return nil, err return nil, err
// } }
// var keys []string var keys []string
// for _, conversarionID := range conversationIDs { for _, conversarionID := range conversationIDs {
// keys = append(keys, c.getConversationHasReadSeqKey(ownerUserID, conversarionID)) keys = append(keys, c.getConversationHasReadSeqKey(ownerUserID, conversarionID))
// } }
// return batchGetCacheMap(ctx, c.rcClient, keys, conversationIDs, c.expireTime, c.getUserAllHasReadSeqsIndex, func(ctx context.Context) (map[string]int64, error) { return batchGetCacheMap(ctx, c.rcClient, keys, conversationIDs, c.expireTime, c.getUserAllHasReadSeqsIndex, func(ctx context.Context) (map[string]int64, error) {
// return c.conversationDB.GetUserAllHasReadSeqs(ctx, ownerUserID) return c.conversationDB.GetUserAllHasReadSeqs(ctx, ownerUserID)
// }) })
//} } */
func (c *ConversationRedisCache) DelUserAllHasReadSeqs(ownerUserID string, conversationIDs ...string) ConversationCache { func (c *ConversationRedisCache) DelUserAllHasReadSeqs(ownerUserID string, conversationIDs ...string) ConversationCache {
cache := c.NewCache() cache := c.NewCache()

@ -150,11 +150,15 @@ func (c *msgCache) getHasReadSeqKey(conversationID string, userID string) string
} }
func (c *msgCache) setSeq(ctx context.Context, conversationID string, seq int64, getkey func(conversationID string) string) error { func (c *msgCache) setSeq(ctx context.Context, conversationID string, seq int64, getkey func(conversationID string) string) error {
return utils.Wrap1(c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err()) return errs.Wrap(c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err())
} }
func (c *msgCache) getSeq(ctx context.Context, conversationID string, getkey func(conversationID string) string) (int64, error) { func (c *msgCache) getSeq(ctx context.Context, conversationID string, getkey func(conversationID string) string) (int64, error) {
return utils.Wrap2(c.rdb.Get(ctx, getkey(conversationID)).Int64()) val, err := c.rdb.Get(ctx, getkey(conversationID)).Int64()
if err != nil {
return 0, errs.Wrap(err)
}
return val, nil
} }
func (c *msgCache) getSeqs(ctx context.Context, items []string, getkey func(s string) string) (m map[string]int64, err error) { func (c *msgCache) getSeqs(ctx context.Context, items []string, getkey func(s string) string) (m map[string]int64, err error) {
@ -215,7 +219,11 @@ func (c *msgCache) getConversationUserMinSeqKey(conversationID, userID string) s
} }
func (c *msgCache) GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) { func (c *msgCache) GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
return utils.Wrap2(c.rdb.Get(ctx, c.getConversationUserMinSeqKey(conversationID, userID)).Int64()) val, err := c.rdb.Get(ctx, c.getConversationUserMinSeqKey(conversationID, userID)).Int64()
if err != nil {
return 0, errs.Wrap(err)
}
return val, nil
} }
func (c *msgCache) GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (m map[string]int64, err error) { func (c *msgCache) GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (m map[string]int64, err error) {
@ -225,7 +233,7 @@ func (c *msgCache) GetConversationUserMinSeqs(ctx context.Context, conversationI
} }
func (c *msgCache) SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error { func (c *msgCache) SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error {
return utils.Wrap1(c.rdb.Set(ctx, c.getConversationUserMinSeqKey(conversationID, userID), minSeq, 0).Err()) return errs.Wrap(c.rdb.Set(ctx, c.getConversationUserMinSeqKey(conversationID, userID), minSeq, 0).Err())
} }
func (c *msgCache) SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) { func (c *msgCache) SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) {
@ -241,7 +249,7 @@ func (c *msgCache) SetUserConversationsMinSeqs(ctx context.Context, userID strin
} }
func (c *msgCache) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error { func (c *msgCache) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error {
return utils.Wrap1(c.rdb.Set(ctx, c.getHasReadSeqKey(conversationID, userID), hasReadSeq, 0).Err()) return errs.Wrap(c.rdb.Set(ctx, c.getHasReadSeqKey(conversationID, userID), hasReadSeq, 0).Err())
} }
func (c *msgCache) SetHasReadSeqs(ctx context.Context, conversationID string, hasReadSeqs map[string]int64) error { func (c *msgCache) SetHasReadSeqs(ctx context.Context, conversationID string, hasReadSeqs map[string]int64) error {
@ -263,12 +271,15 @@ func (c *msgCache) GetHasReadSeqs(ctx context.Context, userID string, conversati
} }
func (c *msgCache) GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) { func (c *msgCache) GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) {
return utils.Wrap2(c.rdb.Get(ctx, c.getHasReadSeqKey(conversationID, userID)).Int64()) val, err := c.rdb.Get(ctx, c.getHasReadSeqKey(conversationID, userID)).Int64()
if err != nil {
return 0, err
}
return val, nil
} }
func (c *msgCache) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error { func (c *msgCache) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error {
key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID) key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID)
return errs.Wrap(c.rdb.HSet(ctx, key, token, flag).Err()) return errs.Wrap(c.rdb.HSet(ctx, key, token, flag).Err())
} }
@ -695,7 +706,11 @@ func (c *msgCache) SetGetuiToken(ctx context.Context, token string, expireTime i
} }
func (c *msgCache) GetGetuiToken(ctx context.Context) (string, error) { func (c *msgCache) GetGetuiToken(ctx context.Context) (string, error) {
return utils.Wrap2(c.rdb.Get(ctx, getuiToken).Result()) val, err := c.rdb.Get(ctx, getuiToken).Result()
if err != nil {
return "", errs.Wrap(err)
}
return val, nil
} }
func (c *msgCache) SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error { func (c *msgCache) SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error {
@ -703,7 +718,11 @@ func (c *msgCache) SetGetuiTaskID(ctx context.Context, taskID string, expireTime
} }
func (c *msgCache) GetGetuiTaskID(ctx context.Context) (string, error) { func (c *msgCache) GetGetuiTaskID(ctx context.Context) (string, error) {
return utils.Wrap2(c.rdb.Get(ctx, getuiTaskID).Result()) val, err := c.rdb.Get(ctx, getuiTaskID).Result()
if err != nil {
return "", errs.Wrap(err)
}
return val, nil
} }
func (c *msgCache) SetSendMsgStatus(ctx context.Context, id string, status int32) error { func (c *msgCache) SetSendMsgStatus(ctx context.Context, id string, status int32) error {
@ -721,7 +740,11 @@ func (c *msgCache) SetFcmToken(ctx context.Context, account string, platformID i
} }
func (c *msgCache) GetFcmToken(ctx context.Context, account string, platformID int) (string, error) { func (c *msgCache) GetFcmToken(ctx context.Context, account string, platformID int) (string, error) {
return utils.Wrap2(c.rdb.Get(ctx, FCM_TOKEN+account+":"+strconv.Itoa(platformID)).Result()) val, err := c.rdb.Get(ctx, FCM_TOKEN+account+":"+strconv.Itoa(platformID)).Result()
if err != nil {
return "", errs.Wrap(err)
}
return val, nil
} }
func (c *msgCache) DelFcmToken(ctx context.Context, account string, platformID int) error { func (c *msgCache) DelFcmToken(ctx context.Context, account string, platformID int) error {
@ -739,7 +762,8 @@ func (c *msgCache) SetUserBadgeUnreadCountSum(ctx context.Context, userID string
} }
func (c *msgCache) GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) { func (c *msgCache) GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) {
return utils.Wrap2(c.rdb.Get(ctx, userBadgeUnreadCountSum+userID).Int()) val, err := c.rdb.Get(ctx, userBadgeUnreadCountSum+userID).Int()
return val, errs.Wrap(err)
} }
func (c *msgCache) LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error { func (c *msgCache) LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error {
@ -772,42 +796,31 @@ func (c *msgCache) getMessageReactionExPrefix(clientMsgID string, sessionType in
func (c *msgCache) JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) { func (c *msgCache) JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) {
n, err := c.rdb.Exists(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType)).Result() n, err := c.rdb.Exists(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType)).Result()
if err != nil { if err != nil {
return false, utils.Wrap(err, "") return false, errs.Wrap(err)
} }
return n > 0, nil return n > 0, nil
} }
func (c *msgCache) SetMessageTypeKeyValue( func (c *msgCache) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error {
ctx context.Context,
clientMsgID string,
sessionType int32,
typeKey, value string,
) error {
return errs.Wrap(c.rdb.HSet(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey, value).Err()) return errs.Wrap(c.rdb.HSet(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey, value).Err())
} }
func (c *msgCache) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) { func (c *msgCache) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) {
return utils.Wrap2(c.rdb.Expire(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), expiration).Result()) val, err := c.rdb.Expire(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), expiration).Result()
return val, errs.Wrap(err)
} }
func (c *msgCache) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) { func (c *msgCache) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) {
return utils.Wrap2(c.rdb.HGet(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey).Result()) val, err := c.rdb.HGet(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey).Result()
return val, errs.Wrap(err)
} }
func (c *msgCache) GetOneMessageAllReactionList( func (c *msgCache) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) {
ctx context.Context, val, err := c.rdb.HGetAll(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType)).Result()
clientMsgID string, return val, errs.Wrap(err)
sessionType int32,
) (map[string]string, error) {
return utils.Wrap2(c.rdb.HGetAll(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType)).Result())
} }
func (c *msgCache) DeleteOneMessageKey( func (c *msgCache) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error {
ctx context.Context,
clientMsgID string,
sessionType int32,
subKey string,
) error {
return errs.Wrap(c.rdb.HDel(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), subKey).Err()) return errs.Wrap(c.rdb.HDel(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), subKey).Err())
} }

@ -18,13 +18,14 @@ import (
"context" "context"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/OpenIMSDK/tools/errs"
"github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/golang-jwt/jwt/v4" "github.com/golang-jwt/jwt/v4"
"github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/tools/tokenverify" "github.com/OpenIMSDK/tools/tokenverify"
"github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
) )
@ -70,16 +71,17 @@ func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformI
} }
} }
if len(deleteTokenKey) != 0 { if len(deleteTokenKey) != 0 {
err := a.cache.DeleteTokenByUidPid(ctx, userID, platformID, deleteTokenKey) err = a.cache.DeleteTokenByUidPid(ctx, userID, platformID, deleteTokenKey)
if err != nil { if err != nil {
return "", err return "", err
} }
} }
claims := tokenverify.BuildClaims(userID, platformID, a.accessExpire) claims := tokenverify.BuildClaims(userID, platformID, a.accessExpire)
token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims) token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
tokenString, err := token.SignedString([]byte(a.accessSecret)) tokenString, err := token.SignedString([]byte(a.accessSecret))
if err != nil { if err != nil {
return "", utils.Wrap(err, "") return "", errs.Wrap(err)
} }
return tokenString, a.cache.AddTokenFlag(ctx, userID, platformID, tokenString, constant.NormalToken) return tokenString, a.cache.AddTokenFlag(ctx, userID, platformID, tokenString, constant.NormalToken)
} }

@ -105,7 +105,7 @@ func (c *conversationDatabase) SetUsersConversationFiledTx(ctx context.Context,
now := time.Now() now := time.Now()
for _, v := range NotUserIDs { for _, v := range NotUserIDs {
temp := new(relationtb.ConversationModel) temp := new(relationtb.ConversationModel)
if err := utils.CopyStructFields(temp, conversation); err != nil { if err = utils.CopyStructFields(temp, conversation); err != nil {
return err return err
} }
temp.OwnerUserID = v temp.OwnerUserID = v

@ -425,7 +425,7 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa
log.ZError(ctx, "SetHasReadSeqs error", err2, "userSeqMap", userSeqMap, "conversationID", conversationID) log.ZError(ctx, "SetHasReadSeqs error", err2, "userSeqMap", userSeqMap, "conversationID", conversationID)
prommetrics.SeqSetFailedCounter.Inc() prommetrics.SeqSetFailedCounter.Inc()
} }
return lastMaxSeq, isNew, utils.Wrap(err, "") return lastMaxSeq, isNew, errs.Wrap(err)
} }
func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) { func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) {
@ -863,7 +863,7 @@ func (db *commonMsgDatabase) deleteMsgRecursion(ctx context.Context, conversatio
} }
} }
if len(delMsgIndexs) > 0 { if len(delMsgIndexs) > 0 {
if err := db.msgDocDatabase.DeleteMsgsInOneDocByIndex(ctx, msgDocModel.DocID, delMsgIndexs); err != nil { if err = db.msgDocDatabase.DeleteMsgsInOneDocByIndex(ctx, msgDocModel.DocID, delMsgIndexs); err != nil {
log.ZError(ctx, "deleteMsgRecursion DeleteMsgsInOneDocByIndex failed", err, "conversationID", conversationID, "index", index) log.ZError(ctx, "deleteMsgRecursion DeleteMsgsInOneDocByIndex failed", err, "conversationID", conversationID, "index", index)
} }
delStruct.minSeq = int64(msgDocModel.Msg[delMsgIndexs[len(delMsgIndexs)-1]].Msg.Seq) delStruct.minSeq = int64(msgDocModel.Msg[delMsgIndexs[len(delMsgIndexs)-1]].Msg.Seq)

@ -0,0 +1,276 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// docURL: https://docs.aws.amazon.com/AmazonS3/latest/API/Welcome.html
package aws
import (
"context"
"errors"
"fmt"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
sdk "github.com/aws/aws-sdk-go/service/s3"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/s3"
)
const (
minPartSize int64 = 1024 * 1024 * 1 // 1MB
maxPartSize int64 = 1024 * 1024 * 1024 * 5 // 5GB
maxNumSize int64 = 10000
)
// const (
// imagePng = "png"
// imageJpg = "jpg"
// imageJpeg = "jpeg"
// imageGif = "gif"
// imageWebp = "webp"
// )
// const successCode = http.StatusOK
// const (
// videoSnapshotImagePng = "png"
// videoSnapshotImageJpg = "jpg"
// )
func NewAWS() (s3.Interface, error) {
conf := config.Config.Object.Aws
credential := credentials.NewStaticCredentials(
conf.AccessKeyID, // accessKey
conf.AccessKeySecret, // secretKey
"") // sts的临时凭证
sess, err := session.NewSession(&aws.Config{
Region: aws.String(conf.Region), // 桶所在的区域
Credentials: credential,
})
if err != nil {
return nil, err
}
return &Aws{
bucket: conf.Bucket,
client: sdk.New(sess),
credential: credential,
}, nil
}
type Aws struct {
bucket string
client *sdk.S3
credential *credentials.Credentials
}
func (a *Aws) Engine() string {
return "aws"
}
func (a *Aws) InitiateMultipartUpload(ctx context.Context, name string) (*s3.InitiateMultipartUploadResult, error) {
input := &sdk.CreateMultipartUploadInput{
Bucket: aws.String(a.bucket), // TODO: To be verified whether it is required
Key: aws.String(name),
}
result, err := a.client.CreateMultipartUploadWithContext(ctx, input)
if err != nil {
return nil, err
}
return &s3.InitiateMultipartUploadResult{
Bucket: *result.Bucket,
Key: *result.Key,
UploadID: *result.UploadId,
}, nil
}
func (a *Aws) CompleteMultipartUpload(ctx context.Context, uploadID string, name string, parts []s3.Part) (*s3.CompleteMultipartUploadResult, error) {
sdkParts := make([]*sdk.CompletedPart, len(parts))
for i, part := range parts {
sdkParts[i] = &sdk.CompletedPart{
ETag: aws.String(part.ETag),
PartNumber: aws.Int64(int64(part.PartNumber)),
}
}
input := &sdk.CompleteMultipartUploadInput{
Bucket: aws.String(a.bucket), // TODO: To be verified whether it is required
Key: aws.String(name),
UploadId: aws.String(uploadID),
MultipartUpload: &sdk.CompletedMultipartUpload{
Parts: sdkParts,
},
}
result, err := a.client.CompleteMultipartUploadWithContext(ctx, input)
if err != nil {
return nil, err
}
return &s3.CompleteMultipartUploadResult{
Location: *result.Location,
Bucket: *result.Bucket,
Key: *result.Key,
ETag: *result.ETag,
}, nil
}
func (a *Aws) PartSize(ctx context.Context, size int64) (int64, error) {
if size <= 0 {
return 0, errors.New("size must be greater than 0")
}
if size > maxPartSize*maxNumSize {
return 0, fmt.Errorf("AWS size must be less than the maximum allowed limit")
}
if size <= minPartSize*maxNumSize {
return minPartSize, nil
}
partSize := size / maxNumSize
if size%maxNumSize != 0 {
partSize++
}
return partSize, nil
}
func (a *Aws) DeleteObject(ctx context.Context, name string) error {
_, err := a.client.DeleteObjectWithContext(ctx, &sdk.DeleteObjectInput{
Bucket: aws.String(a.bucket),
Key: aws.String(name),
})
return err
}
func (a *Aws) CopyObject(ctx context.Context, src string, dst string) (*s3.CopyObjectInfo, error) {
result, err := a.client.CopyObjectWithContext(ctx, &sdk.CopyObjectInput{
Bucket: aws.String(a.bucket),
Key: aws.String(dst),
CopySource: aws.String(src),
})
if err != nil {
return nil, err
}
return &s3.CopyObjectInfo{
ETag: *result.CopyObjectResult.ETag,
Key: dst,
}, nil
}
func (a *Aws) IsNotFound(err error) bool {
if err == nil {
return false
}
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case sdk.ErrCodeNoSuchKey:
return true
default:
return false
}
}
return false
}
func (a *Aws) AbortMultipartUpload(ctx context.Context, uploadID string, name string) error {
_, err := a.client.AbortMultipartUploadWithContext(ctx, &sdk.AbortMultipartUploadInput{
Bucket: aws.String(a.bucket),
Key: aws.String(name),
UploadId: aws.String(uploadID),
})
return err
}
func (a *Aws) ListUploadedParts(ctx context.Context, uploadID string, name string, partNumberMarker int, maxParts int) (*s3.ListUploadedPartsResult, error) {
result, err := a.client.ListPartsWithContext(ctx, &sdk.ListPartsInput{
Bucket: aws.String(a.bucket),
Key: aws.String(name),
UploadId: aws.String(uploadID),
MaxParts: aws.Int64(int64(maxParts)),
PartNumberMarker: aws.Int64(int64(partNumberMarker)),
})
if err != nil {
return nil, err
}
parts := make([]s3.UploadedPart, len(result.Parts))
for i, part := range result.Parts {
parts[i] = s3.UploadedPart{
PartNumber: int(*part.PartNumber),
LastModified: *part.LastModified,
Size: *part.Size,
ETag: *part.ETag,
}
}
return &s3.ListUploadedPartsResult{
Key: *result.Key,
UploadID: *result.UploadId,
NextPartNumberMarker: int(*result.NextPartNumberMarker),
MaxParts: int(*result.MaxParts),
UploadedParts: parts,
}, nil
}
func (a *Aws) PartLimit() *s3.PartLimit {
return &s3.PartLimit{
MinPartSize: minPartSize,
MaxPartSize: maxPartSize,
MaxNumSize: maxNumSize,
}
}
func (a *Aws) PresignedPutObject(ctx context.Context, name string, expire time.Duration) (string, error) {
req, _ := a.client.PutObjectRequest(&sdk.PutObjectInput{
Bucket: aws.String(a.bucket),
Key: aws.String(name),
})
url, err := req.Presign(expire)
if err != nil {
return "", err
}
return url, nil
}
func (a *Aws) StatObject(ctx context.Context, name string) (*s3.ObjectInfo, error) {
result, err := a.client.GetObjectWithContext(ctx, &sdk.GetObjectInput{
Bucket: aws.String(a.bucket),
Key: aws.String(name),
})
if err != nil {
return nil, err
}
res := &s3.ObjectInfo{
Key: name,
ETag: *result.ETag,
Size: *result.ContentLength,
LastModified: *result.LastModified,
}
return res, nil
}
// AccessURL todo
func (a *Aws) AccessURL(ctx context.Context, name string, expire time.Duration, opt *s3.AccessURLOption) (string, error) {
// todo
return "", nil
}
func (a *Aws) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) {
// todo
return nil, nil
}
func (a *Aws) AuthSign(ctx context.Context, uploadID string, name string, expire time.Duration, partNumbers []int) (*s3.AuthSignResult, error) {
// todo
return nil, nil
}

@ -106,7 +106,7 @@ func (c *Controller) InitiateUpload(ctx context.Context, hash string, size int64
partNumber++ partNumber++
} }
if maxParts > 0 && partNumber > 0 && partNumber < maxParts { if maxParts > 0 && partNumber > 0 && partNumber < maxParts {
return nil, errors.New(fmt.Sprintf("too many parts: %d", partNumber)) return nil, fmt.Errorf("too many parts: %d", partNumber)
} }
if info, err := c.StatObject(ctx, c.HashPath(hash)); err == nil { if info, err := c.StatObject(ctx, c.HashPath(hash)); err == nil {
return nil, &HashAlreadyExistsError{Object: info} return nil, &HashAlreadyExistsError{Object: info}

@ -52,8 +52,8 @@ const (
const successCode = http.StatusOK const successCode = http.StatusOK
const ( const (
videoSnapshotImagePng = "png" // videoSnapshotImagePng = "png"
videoSnapshotImageJpg = "jpg" // videoSnapshotImageJpg = "jpg"
) )
func NewCos(config *config.GlobalConfig) (s3.Interface, error) { func NewCos(config *config.GlobalConfig) (s3.Interface, error) {

@ -142,7 +142,7 @@ func (m *Minio) initMinio(ctx context.Context) error {
return fmt.Errorf("check bucket exists error: %w", err) return fmt.Errorf("check bucket exists error: %w", err)
} }
if !exists { if !exists {
if err := m.core.Client.MakeBucket(ctx, conf.Bucket, minio.MakeBucketOptions{}); err != nil { if err = m.core.Client.MakeBucket(ctx, conf.Bucket, minio.MakeBucketOptions{}); err != nil {
return fmt.Errorf("make bucket error: %w", err) return fmt.Errorf("make bucket error: %w", err)
} }
} }
@ -151,7 +151,7 @@ func (m *Minio) initMinio(ctx context.Context) error {
`{"Version": "2012-10-17","Statement": [{"Action": ["s3:GetObject","s3:PutObject"],"Effect": "Allow","Principal": {"AWS": ["*"]},"Resource": ["arn:aws:s3:::%s/*"],"Sid": ""}]}`, `{"Version": "2012-10-17","Statement": [{"Action": ["s3:GetObject","s3:PutObject"],"Effect": "Allow","Principal": {"AWS": ["*"]},"Resource": ["arn:aws:s3:::%s/*"],"Sid": ""}]}`,
conf.Bucket, conf.Bucket,
) )
if err := m.core.Client.SetBucketPolicy(ctx, conf.Bucket, policy); err != nil { if err = m.core.Client.SetBucketPolicy(ctx, conf.Bucket, policy); err != nil {
return err return err
} }
} }

@ -82,7 +82,8 @@ func (m *Minio) getImageThumbnailURL(ctx context.Context, name string, expire ti
} }
key, err := m.cache.GetThumbnailKey(ctx, name, opt.Format, opt.Width, opt.Height, func(ctx context.Context) (string, error) { key, err := m.cache.GetThumbnailKey(ctx, name, opt.Format, opt.Width, opt.Height, func(ctx context.Context) (string, error) {
if img == nil { if img == nil {
reader, err := m.core.Client.GetObject(ctx, m.bucket, name, minio.GetObjectOptions{}) var reader *minio.Object
reader, err = m.core.Client.GetObject(ctx, m.bucket, name, minio.GetObjectOptions{})
if err != nil { if err != nil {
return "", err return "", err
} }
@ -103,7 +104,7 @@ func (m *Minio) getImageThumbnailURL(ctx context.Context, name string, expire ti
err = gif.Encode(buf, thumbnail, nil) err = gif.Encode(buf, thumbnail, nil)
} }
cacheKey := filepath.Join(imageThumbnailPath, info.Etag, fmt.Sprintf("image_w%d_h%d.%s", opt.Width, opt.Height, opt.Format)) cacheKey := filepath.Join(imageThumbnailPath, info.Etag, fmt.Sprintf("image_w%d_h%d.%s", opt.Width, opt.Height, opt.Format))
if _, err := m.core.Client.PutObject(ctx, m.bucket, cacheKey, buf, int64(buf.Len()), minio.PutObjectOptions{}); err != nil { if _, err = m.core.Client.PutObject(ctx, m.bucket, cacheKey, buf, int64(buf.Len()), minio.PutObjectOptions{}); err != nil {
return "", err return "", err
} }
return cacheKey, nil return cacheKey, nil

@ -52,10 +52,10 @@ const (
const successCode = http.StatusOK const successCode = http.StatusOK
const ( /* const (
videoSnapshotImagePng = "png" videoSnapshotImagePng = "png"
videoSnapshotImageJpg = "jpg" videoSnapshotImageJpg = "jpg"
) ) */
func NewOSS(config *config.GlobalConfig) (s3.Interface, error) { func NewOSS(config *config.GlobalConfig) (s3.Interface, error) {
conf := config.Object.Oss conf := config.Object.Oss

@ -35,7 +35,6 @@ import (
"github.com/OpenIMSDK/protocol/sdkws" "github.com/OpenIMSDK/protocol/sdkws"
"github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/utils"
table "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation" table "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation"
) )
@ -79,7 +78,7 @@ func (m *MsgMongoDriver) UpdateMsg(
update := bson.M{"$set": bson.M{field: value}} update := bson.M{"$set": bson.M{field: value}}
res, err := m.MsgCollection.UpdateOne(ctx, filter, update) res, err := m.MsgCollection.UpdateOne(ctx, filter, update)
if err != nil { if err != nil {
return nil, utils.Wrap(err, "") return nil, errs.Wrap(err)
} }
return res, nil return res, nil
} }
@ -106,7 +105,7 @@ func (m *MsgMongoDriver) PushUnique(
} }
res, err := m.MsgCollection.UpdateOne(ctx, filter, update) res, err := m.MsgCollection.UpdateOne(ctx, filter, update)
if err != nil { if err != nil {
return nil, utils.Wrap(err, "") return nil, errs.Wrap(err)
} }
return res, nil return res, nil
} }
@ -118,7 +117,7 @@ func (m *MsgMongoDriver) UpdateMsgContent(ctx context.Context, docID string, ind
bson.M{"$set": bson.M{fmt.Sprintf("msgs.%d.msg", index): msg}}, bson.M{"$set": bson.M{fmt.Sprintf("msgs.%d.msg", index): msg}},
) )
if err != nil { if err != nil {
return utils.Wrap(err, "") return errs.Wrap(err)
} }
return nil return nil
} }
@ -133,7 +132,7 @@ func (m *MsgMongoDriver) UpdateMsgStatusByIndexInOneDoc(
msg.Status = status msg.Status = status
bytes, err := proto.Marshal(msg) bytes, err := proto.Marshal(msg)
if err != nil { if err != nil {
return utils.Wrap(err, "") return errs.Wrap(err)
} }
_, err = m.MsgCollection.UpdateOne( _, err = m.MsgCollection.UpdateOne(
ctx, ctx,
@ -141,7 +140,7 @@ func (m *MsgMongoDriver) UpdateMsgStatusByIndexInOneDoc(
bson.M{"$set": bson.M{fmt.Sprintf("msgs.%d.msg", seqIndex): bytes}}, bson.M{"$set": bson.M{fmt.Sprintf("msgs.%d.msg", seqIndex): bytes}},
) )
if err != nil { if err != nil {
return utils.Wrap(err, "") return errs.Wrap(err)
} }
return nil return nil
} }
@ -167,12 +166,12 @@ func (m *MsgMongoDriver) GetMsgDocModelByIndex(
findOpts, findOpts,
) )
if err != nil { if err != nil {
return nil, utils.Wrap(err, "") return nil, errs.Wrap(err)
} }
var msgs []table.MsgDocModel var msgs []table.MsgDocModel
err = cursor.All(ctx, &msgs) err = cursor.All(ctx, &msgs)
if err != nil { if err != nil {
return nil, utils.Wrap(err, fmt.Sprintf("cursor is %s", cursor.Current.String())) return nil, errs.Wrap(err, fmt.Sprintf("cursor is %s", cursor.Current.String()))
} }
if len(msgs) > 0 { if len(msgs) > 0 {
return &msgs[0], nil return &msgs[0], nil
@ -223,7 +222,7 @@ func (m *MsgMongoDriver) DeleteMsgsInOneDocByIndex(ctx context.Context, docID st
} }
_, err := m.MsgCollection.UpdateMany(ctx, bson.M{"doc_id": docID}, updates) _, err := m.MsgCollection.UpdateMany(ctx, bson.M{"doc_id": docID}, updates)
if err != nil { if err != nil {
return utils.Wrap(err, "") return errs.Wrap(err)
} }
return nil return nil
} }
@ -247,47 +246,42 @@ func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(
indexs = append(indexs, m.model.GetMsgIndex(seq)) indexs = append(indexs, m.model.GetMsgIndex(seq))
} }
pipeline := mongo.Pipeline{ pipeline := mongo.Pipeline{
{ bson.D{{Key: "$match", Value: bson.D{
{"$match", bson.D{ {Key: "doc_id", Value: docID},
{"doc_id", docID}, }}},
bson.D{{Key: "$project", Value: bson.D{
{Key: "_id", Value: 0},
{Key: "doc_id", Value: 1},
{Key: "msgs", Value: bson.D{
{Key: "$map", Value: bson.D{
{Key: "input", Value: indexs},
{Key: "as", Value: "index"},
{Key: "in", Value: bson.D{
{Key: "$let", Value: bson.D{
{Key: "vars", Value: bson.D{
{Key: "currentMsg", Value: bson.D{
{Key: "$arrayElemAt", Value: bson.A{"$msgs", "$$index"}},
}}, }},
},
{
{"$project", bson.D{
{"_id", 0},
{"doc_id", 1},
{"msgs", bson.D{
{"$map", bson.D{
{"input", indexs},
{"as", "index"},
{"in", bson.D{
{"$let", bson.D{
{"vars", bson.D{
{"currentMsg", bson.D{
{"$arrayElemAt", []string{"$msgs", "$$index"}},
}}, }},
{Key: "in", Value: bson.D{
{Key: "$cond", Value: bson.D{
{Key: "if", Value: bson.D{
{Key: "$in", Value: bson.A{userID, "$$currentMsg.del_list"}},
}}, }},
{"in", bson.D{ {Key: "then", Value: nil},
{"$cond", bson.D{ {Key: "else", Value: "$$currentMsg"},
{"if", bson.D{
{"$in", []string{userID, "$$currentMsg.del_list"}},
}}, }},
{"then", nil},
{"else", "$$currentMsg"},
}}, }},
}}, }},
}}, }},
}}, }},
}}, }},
}}, }}},
}}, bson.D{{Key: "$project", Value: bson.D{
}, {Key: "msgs.del_list", Value: 0},
{ }}},
{"$project", bson.D{
{"msgs.del_list", 0},
}},
},
} }
cur, err := m.MsgCollection.Aggregate(ctx, pipeline) cur, err := m.MsgCollection.Aggregate(ctx, pipeline)
if err != nil { if err != nil {
return nil, errs.Wrap(err) return nil, errs.Wrap(err)
@ -801,7 +795,7 @@ func (m *MsgMongoDriver) RangeUserSendCount(
} }
defer cur.Close(ctx) defer cur.Close(ctx)
var result []Result var result []Result
if err := cur.All(ctx, &result); err != nil { if err = cur.All(ctx, &result); err != nil {
return 0, 0, nil, nil, errs.Wrap(err) return 0, 0, nil, nil, errs.Wrap(err)
} }
if len(result) == 0 { if len(result) == 0 {
@ -1050,7 +1044,7 @@ func (m *MsgMongoDriver) RangeGroupSendCount(
} }
defer cur.Close(ctx) defer cur.Close(ctx)
var result []Result var result []Result
if err := cur.All(ctx, &result); err != nil { if err = cur.All(ctx, &result); err != nil {
return 0, 0, nil, nil, errs.Wrap(err) return 0, 0, nil, nil, errs.Wrap(err)
} }
if len(result) == 0 { if len(result) == 0 {

@ -18,7 +18,6 @@ import (
"context" "context"
"github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/utils"
"go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/mongo/options"
@ -119,7 +118,7 @@ func (u *UserMongoDriver) AddSubscriptionList(ctx context.Context, userID string
opts, opts,
) )
if err != nil { if err != nil {
return utils.Wrap(err, "transaction failed") return errs.Wrap(err, "transaction failed")
} }
} }
return nil return nil

@ -106,31 +106,31 @@ func PostReturn(ctx context.Context, url string, header map[string]string, input
} }
func callBackPostReturn(ctx context.Context, url, command string, input interface{}, output callbackstruct.CallbackResp, callbackConfig config.CallBackConfig) error { func callBackPostReturn(ctx context.Context, url, command string, input interface{}, output callbackstruct.CallbackResp, callbackConfig config.CallBackConfig) error {
defer log.ZDebug(ctx, "callback", "url", url, "command", command, "input", input, "output", output, "callbackConfig", callbackConfig)
//
//v := urllib.Values{}
//v.Set(constant.CallbackCommand, command)
//url = url + "/" + v.Encode()
url = url + "/" + command url = url + "/" + command
log.ZInfo(ctx, "callback", "url", url, "input", input, "config", callbackConfig)
b, err := Post(ctx, url, nil, input, callbackConfig.CallbackTimeOut) b, err := Post(ctx, url, nil, input, callbackConfig.CallbackTimeOut)
if err != nil { if err != nil {
if callbackConfig.CallbackFailedContinue != nil && *callbackConfig.CallbackFailedContinue { if callbackConfig.CallbackFailedContinue != nil && *callbackConfig.CallbackFailedContinue {
log.ZWarn(ctx, "callback failed but continue", err, "url", url) log.ZInfo(ctx, "callback failed but continue", err, "url", url)
return nil return nil
} }
log.ZWarn(ctx, "callback network failed", err, "url", url, "input", input)
return errs.ErrNetwork.Wrap(err.Error()) return errs.ErrNetwork.Wrap(err.Error())
} }
defer log.ZDebug(ctx, "callback", "data", string(b))
if err = json.Unmarshal(b, output); err != nil { if err = json.Unmarshal(b, output); err != nil {
if callbackConfig.CallbackFailedContinue != nil && *callbackConfig.CallbackFailedContinue { if callbackConfig.CallbackFailedContinue != nil && *callbackConfig.CallbackFailedContinue {
log.ZWarn(ctx, "callback failed but continue", err, "url", url) log.ZWarn(ctx, "callback failed but continue", err, "url", url)
return nil return nil
} }
log.ZWarn(ctx, "callback json unmarshal failed", err, "url", url, "input", input, "response", string(b))
return errs.ErrData.WithDetail(err.Error() + "response format error") return errs.ErrData.WithDetail(err.Error() + "response format error")
} }
if err := output.Parse(); err != nil {
return output.Parse() log.ZWarn(ctx, "callback parse failed", err, "url", url, "input", input, "response", string(b))
return err
}
log.ZInfo(ctx, "callback success", "url", url, "input", input, "response", string(b))
return nil
} }
func CallBackPostReturn(ctx context.Context, url string, req callbackstruct.CallbackReq, resp callbackstruct.CallbackResp, callbackConfig config.CallBackConfig) error { func CallBackPostReturn(ctx context.Context, url string, req callbackstruct.CallbackReq, resp callbackstruct.CallbackResp, callbackConfig config.CallBackConfig) error {

@ -27,7 +27,6 @@ import (
"github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/tools/log" "github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/mcontext" "github.com/OpenIMSDK/tools/mcontext"
"github.com/OpenIMSDK/tools/utils"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
) )
@ -158,10 +157,10 @@ func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Messag
// Marshal the protobuf message // Marshal the protobuf message
bMsg, err := proto.Marshal(msg) bMsg, err := proto.Marshal(msg)
if err != nil { if err != nil {
return 0, 0, utils.Wrap(err, "kafka proto Marshal err") return 0, 0, errs.Wrap(err, "kafka proto Marshal err")
} }
if len(bMsg) == 0 { if len(bMsg) == 0 {
return 0, 0, utils.Wrap(errEmptyMsg, "") return 0, 0, errs.Wrap(errEmptyMsg, "")
} }
// Prepare Kafka message // Prepare Kafka message
@ -173,13 +172,13 @@ func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Messag
// Validate message key and value // Validate message key and value
if kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 { if kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 {
return 0, 0, utils.Wrap(errEmptyMsg, "") return 0, 0, errs.Wrap(errEmptyMsg)
} }
// Attach context metadata as headers // Attach context metadata as headers
header, err := GetMQHeaderWithContext(ctx) header, err := GetMQHeaderWithContext(ctx)
if err != nil { if err != nil {
return 0, 0, utils.Wrap(err, "") return 0, 0, errs.Wrap(err)
} }
kMsg.Headers = header kMsg.Headers = header
@ -187,7 +186,7 @@ func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Messag
partition, offset, err := p.producer.SendMessage(kMsg) partition, offset, err := p.producer.SendMessage(kMsg)
if err != nil { if err != nil {
log.ZWarn(ctx, "p.producer.SendMessage error", err) log.ZWarn(ctx, "p.producer.SendMessage error", err)
return 0, 0, utils.Wrap(err, "") return 0, 0, errs.Wrap(err)
} }
log.ZDebug(ctx, "ByteEncoder SendMessage end", "key", kMsg.Key, "key length", kMsg.Value.Length()) log.ZDebug(ctx, "ByteEncoder SendMessage end", "key", kMsg.Key, "key length", kMsg.Value.Length())

@ -408,7 +408,7 @@ func (g *GroupNotificationSender) GroupApplicationAcceptedNotification(ctx conte
return err return err
} }
tips := &sdkws.GroupApplicationAcceptedTips{Group: group, HandleMsg: req.HandledMsg} tips := &sdkws.GroupApplicationAcceptedTips{Group: group, HandleMsg: req.HandledMsg}
if err := g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
return err return err
} }
for _, userID := range append(userIDs, req.FromUserID) { for _, userID := range append(userIDs, req.FromUserID) {
@ -441,7 +441,7 @@ func (g *GroupNotificationSender) GroupApplicationRejectedNotification(ctx conte
return err return err
} }
tips := &sdkws.GroupApplicationRejectedTips{Group: group, HandleMsg: req.HandledMsg} tips := &sdkws.GroupApplicationRejectedTips{Group: group, HandleMsg: req.HandledMsg}
if err := g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
return err return err
} }
for _, userID := range append(userIDs, req.FromUserID) { for _, userID := range append(userIDs, req.FromUserID) {

@ -43,6 +43,9 @@ func NewThird(discov discoveryregistry.SvcDiscoveryRegistry, config *config.Glob
} }
client := third.NewThirdClient(conn) client := third.NewThirdClient(conn)
minioClient, err := minioInit(config) minioClient, err := minioInit(config)
if err != nil {
panic(err)
}
return &Third{discov: discov, Client: client, conn: conn, MinioClient: minioClient,Config: config} return &Third{discov: discov, Client: client, conn: conn, MinioClient: minioClient,Config: config}
} }

@ -36,9 +36,7 @@ func (s *Statistics) output() {
var timeIntervalNum uint64 var timeIntervalNum uint64
for { for {
sum = *s.AllCount sum = *s.AllCount
select { <-t.C
case <-t.C:
}
if *s.AllCount-sum <= 0 { if *s.AllCount-sum <= 0 {
intervalCount = 0 intervalCount = 0
} else { } else {

@ -222,6 +222,14 @@ def "KODO_ACCESS_KEY_SECRET" # 七
def "KODO_SESSION_TOKEN" # 七牛云OSS的会话令牌 def "KODO_SESSION_TOKEN" # 七牛云OSS的会话令牌
def "KODO_PUBLIC_READ" "false" # 公有读 def "KODO_PUBLIC_READ" "false" # 公有读
# AWS Configuration Information
def "AWS_ENDPOINT" "" # AWS endpoint, generally not needed unless using a specific service
def "AWS_REGION" "us-east-1" # AWS Region
def "AWS_BUCKET" "demo-9999999" # AWS S3 Bucket Name
def "AWS_ACCESS_KEY_ID" # AWS Access Key ID
def "AWS_SECRET_ACCESS_KEY" # AWS Secret Access Key
def "AWS_PUBLIC_READ" "false" # Public read access
###################### Redis 配置信息 ###################### ###################### Redis 配置信息 ######################
def "REDIS_PORT" "16379" # Redis的端口 def "REDIS_PORT" "16379" # Redis的端口
def "REDIS_ADDRESS" "${DOCKER_BRIDGE_GATEWAY}" # Redis的地址 def "REDIS_ADDRESS" "${DOCKER_BRIDGE_GATEWAY}" # Redis的地址

@ -34,7 +34,7 @@ func mockInitCfg() error {
} }
func TestRedis(t *testing.T) { func TestRedis(t *testing.T) {
conf, err := initCfg(defaultCfgPath) conf, err := initCfg()
conf.Redis.Address = []string{ conf.Redis.Address = []string{
"172.16.8.142:7000", "172.16.8.142:7000",
//"172.16.8.142:7000", "172.16.8.142:7001", "172.16.8.142:7002", "172.16.8.142:7003", "172.16.8.142:7004", "172.16.8.142:7005", //"172.16.8.142:7000", "172.16.8.142:7001", "172.16.8.142:7002", "172.16.8.142:7003", "172.16.8.142:7004", "172.16.8.142:7005",

@ -29,7 +29,7 @@ var (
hyphenRegex = regexp.MustCompile(`^[a-zA-Z0-9\-]+\.[a-zA-Z0-9]+$`) hyphenRegex = regexp.MustCompile(`^[a-zA-Z0-9\-]+\.[a-zA-Z0-9]+$`)
) )
// CheckDirectoCheckDirectoryries initiates the checking process for the specified directories using configuration from config.Config. // CheckDirectory initiates the checking process for the specified directories using configuration from config.Config.
func CheckDirectory(cfg *config.Config) error { func CheckDirectory(cfg *config.Config) error {
ignoreMap := make(map[string]struct{}) ignoreMap := make(map[string]struct{})
for _, dir := range cfg.IgnoreDirs { for _, dir := range cfg.IgnoreDirs {

Loading…
Cancel
Save