feat: optimize server code (#1931)

* fix: GroupApplicationAcceptedNotification

* fix: GroupApplicationAcceptedNotification

* fix: NotificationUserInfoUpdate

* cicd: robot automated Change

* utils.Wrap -> errs.Wrap

* utils.Wrap -> errs.Wrap

---------

Co-authored-by: withchao <withchao@users.noreply.github.com>
pull/1936/head
chao 4 months ago committed by GitHub
parent 01886eee06
commit d5d2803e76
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -5,7 +5,7 @@ go 1.19
require (
firebase.google.com/go v3.13.0+incompatible
github.com/OpenIMSDK/protocol v0.0.55
github.com/OpenIMSDK/tools v0.0.35
github.com/OpenIMSDK/tools v0.0.36
github.com/bwmarrin/snowflake v0.3.0 // indirect
github.com/dtm-labs/rockscache v0.1.1
github.com/gin-gonic/gin v1.9.1

@ -20,8 +20,8 @@ github.com/IBM/sarama v1.42.2 h1:VoY4hVIZ+WQJ8G9KNY/SQlWguBQXQ9uvFPOnrcu8hEw=
github.com/IBM/sarama v1.42.2/go.mod h1:FLPGUGwYqEs62hq2bVG6Io2+5n+pS6s/WOXVKWSLFtE=
github.com/OpenIMSDK/protocol v0.0.55 h1:eBjg8DyuhxGmuCUjpoZjg6MJJJXU/xJ3xJwFhrn34yA=
github.com/OpenIMSDK/protocol v0.0.55/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y=
github.com/OpenIMSDK/tools v0.0.35 h1:YH8UYoaErXqfNrwpUvQxe8nhL++gFH6qCisQPyzk0w8=
github.com/OpenIMSDK/tools v0.0.35/go.mod h1:wBfR5CYmEyvxl03QJbTkhz1CluK6J4/lX0lviu8JAjE=
github.com/OpenIMSDK/tools v0.0.36 h1:BT0q64l4f3QJDW16Rc0uJYt1gQFkiPoUQYQ33vo0EcE=
github.com/OpenIMSDK/tools v0.0.36/go.mod h1:wBfR5CYmEyvxl03QJbTkhz1CluK6J4/lX0lviu8JAjE=
github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM=
github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7 h1:uSoVVbwJiQipAclBbw+8quDsfcvFjOpI5iCf4p/cqCs=
github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7/go.mod h1:6zEj6s6u/ghQa61ZWa/C2Aw3RkjiTBOix7dkqa1VLIs=

@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
"github.com/OpenIMSDK/tools/errs"
"runtime/debug"
"sync"
"sync/atomic"
@ -173,7 +174,7 @@ func (c *Client) handleMessage(message []byte) error {
var err error
message, err = c.longConnServer.DecompressWithPool(message)
if err != nil {
return utils.Wrap(err, "")
return errs.Wrap(err)
}
}
@ -182,15 +183,15 @@ func (c *Client) handleMessage(message []byte) error {
err := c.longConnServer.Decode(message, binaryReq)
if err != nil {
return utils.Wrap(err, "")
return errs.Wrap(err)
}
if err := c.longConnServer.Validate(binaryReq); err != nil {
return utils.Wrap(err, "")
return errs.Wrap(err)
}
if binaryReq.SendID != c.UserID {
return utils.Wrap(errors.New("exception conn userID not same to req userID"), binaryReq.String())
return errs.Wrap(errors.New("exception conn userID not same to req userID"), binaryReq.String())
}
ctx := mcontext.WithMustInfoCtx(
@ -313,7 +314,7 @@ func (c *Client) writeBinaryMsg(resp Resp) error {
encodedBuf, err := c.longConnServer.Encode(resp)
if err != nil {
return utils.Wrap(err, "")
return errs.Wrap(err)
}
c.w.Lock()
@ -323,7 +324,7 @@ func (c *Client) writeBinaryMsg(resp Resp) error {
if c.IsCompress {
resultBuf, compressErr := c.longConnServer.CompressWithPool(encodedBuf)
if compressErr != nil {
return utils.Wrap(compressErr, "")
return errs.Wrap(compressErr)
}
return c.conn.WriteMessage(MessageBinary, resultBuf)
}
@ -341,7 +342,7 @@ func (c *Client) writePongMsg() error {
err := c.conn.SetWriteDeadline(writeWait)
if err != nil {
return utils.Wrap(err, "")
return errs.Wrap(err)
}
return c.conn.WriteMessage(PongMessage, nil)

@ -18,10 +18,9 @@ import (
"bytes"
"compress/gzip"
"errors"
"github.com/OpenIMSDK/tools/errs"
"io"
"sync"
"github.com/OpenIMSDK/tools/utils"
)
var (
@ -47,10 +46,10 @@ func (g *GzipCompressor) Compress(rawData []byte) ([]byte, error) {
gzipBuffer := bytes.Buffer{}
gz := gzip.NewWriter(&gzipBuffer)
if _, err := gz.Write(rawData); err != nil {
return nil, utils.Wrap(err, "")
return nil, errs.Wrap(err)
}
if err := gz.Close(); err != nil {
return nil, utils.Wrap(err, "")
return nil, errs.Wrap(err)
}
return gzipBuffer.Bytes(), nil
}
@ -63,10 +62,10 @@ func (g *GzipCompressor) CompressWithPool(rawData []byte) ([]byte, error) {
gz.Reset(&gzipBuffer)
if _, err := gz.Write(rawData); err != nil {
return nil, utils.Wrap(err, "")
return nil, errs.Wrap(err)
}
if err := gz.Close(); err != nil {
return nil, utils.Wrap(err, "")
return nil, errs.Wrap(err)
}
return gzipBuffer.Bytes(), nil
}
@ -75,11 +74,11 @@ func (g *GzipCompressor) DeCompress(compressedData []byte) ([]byte, error) {
buff := bytes.NewBuffer(compressedData)
reader, err := gzip.NewReader(buff)
if err != nil {
return nil, utils.Wrap(err, "NewReader failed")
return nil, errs.Wrap(err, "NewReader failed")
}
compressedData, err = io.ReadAll(reader)
if err != nil {
return nil, utils.Wrap(err, "ReadAll failed")
return nil, errs.Wrap(err, "ReadAll failed")
}
_ = reader.Close()
return compressedData, nil
@ -88,18 +87,18 @@ func (g *GzipCompressor) DeCompress(compressedData []byte) ([]byte, error) {
func (g *GzipCompressor) DecompressWithPool(compressedData []byte) ([]byte, error) {
reader := gzipReaderPool.Get().(*gzip.Reader)
if reader == nil {
return nil, errors.New("NewReader failed")
return nil, errs.Wrap(errors.New("NewReader failed"))
}
defer gzipReaderPool.Put(reader)
err := reader.Reset(bytes.NewReader(compressedData))
if err != nil {
return nil, utils.Wrap(err, "NewReader failed")
return nil, errs.Wrap(err, "NewReader failed")
}
compressedData, err = io.ReadAll(reader)
if err != nil {
return nil, utils.Wrap(err, "ReadAll failed")
return nil, errs.Wrap(err, "ReadAll failed")
}
_ = reader.Close()
return compressedData, nil

@ -17,8 +17,7 @@ package msggateway
import (
"bytes"
"encoding/gob"
"github.com/OpenIMSDK/tools/utils"
"github.com/OpenIMSDK/tools/errs"
)
type Encoder interface {
@ -47,7 +46,7 @@ func (g *GobEncoder) Decode(encodeData []byte, decodeData any) error {
dec := gob.NewDecoder(buff)
err := dec.Decode(decodeData)
if err != nil {
return utils.Wrap(err, "")
return errs.Wrap(err)
}
return nil
}

@ -34,8 +34,6 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
http2 "github.com/openimsdk/open-im-server/v3/pkg/common/http"
"github.com/OpenIMSDK/tools/utils"
)
var (
@ -137,7 +135,7 @@ func (g *Client) GetTaskID(ctx context.Context, token string, pushReq PushReq) (
pushReq.Settings = &Settings{TTL: &ttl}
err := g.request(ctx, taskURL, pushReq, token, &respTask)
if err != nil {
return "", utils.Wrap(err, "")
return "", errs.Wrap(err)
}
return respTask.TaskID, nil
}

@ -29,8 +29,6 @@ import (
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/mcontext"
"github.com/OpenIMSDK/tools/tokenverify"
"github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
@ -105,7 +103,7 @@ func (s *authServer) GetUserToken(ctx context.Context, req *pbauth.GetUserTokenR
func (s *authServer) parseToken(ctx context.Context, tokensString string) (claims *tokenverify.Claims, err error) {
claims, err = tokenverify.GetClaimFromToken(tokensString, authverify.Secret())
if err != nil {
return nil, utils.Wrap(err, "")
return nil, errs.Wrap(err)
}
m, err := s.authDatabase.GetTokensWithoutError(ctx, claims.UserID, claims.PlatformID)
if err != nil {
@ -121,7 +119,7 @@ func (s *authServer) parseToken(ctx context.Context, tokensString string) (claim
case constant.KickedToken:
return nil, errs.ErrTokenKicked.Wrap()
default:
return nil, utils.Wrap(errs.ErrTokenUnknown, "")
return nil, errs.Wrap(errs.ErrTokenUnknown)
}
}
return nil, errs.ErrTokenNotExist.Wrap()

@ -956,7 +956,7 @@ func (s *groupServer) SetGroupInfo(ctx context.Context, req *pbgroup.SetGroupInf
return nil, err
}
if group.Status == constant.GroupStatusDismissed {
return nil, utils.Wrap(errs.ErrDismissedAlready, "")
return nil, errs.Wrap(errs.ErrDismissedAlready)
}
resp := &pbgroup.SetGroupInfoResp{}
count, err := s.db.FindGroupMemberNum(ctx, group.GroupID)

@ -134,7 +134,7 @@ func getCache[T any](ctx context.Context, rcClient *rockscache.Client, key strin
}
bs, err := json.Marshal(t)
if err != nil {
return "", utils.Wrap(err, "")
return "", errs.Wrap(err)
}
write = true
@ -153,7 +153,7 @@ func getCache[T any](ctx context.Context, rcClient *rockscache.Client, key strin
if err != nil {
log.ZError(ctx, "cache json.Unmarshal failed", err, "key", key, "value", v, "expire", expire)
return t, utils.Wrap(err, "")
return t, errs.Wrap(err)
}
return t, nil

@ -149,11 +149,15 @@ func (c *msgCache) getHasReadSeqKey(conversationID string, userID string) string
}
func (c *msgCache) setSeq(ctx context.Context, conversationID string, seq int64, getkey func(conversationID string) string) error {
return utils.Wrap1(c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err())
return errs.Wrap(c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err())
}
func (c *msgCache) getSeq(ctx context.Context, conversationID string, getkey func(conversationID string) string) (int64, error) {
return utils.Wrap2(c.rdb.Get(ctx, getkey(conversationID)).Int64())
val, err := c.rdb.Get(ctx, getkey(conversationID)).Int64()
if err != nil {
return 0, errs.Wrap(err)
}
return val, nil
}
func (c *msgCache) getSeqs(ctx context.Context, items []string, getkey func(s string) string) (m map[string]int64, err error) {
@ -214,7 +218,11 @@ func (c *msgCache) getConversationUserMinSeqKey(conversationID, userID string) s
}
func (c *msgCache) GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
return utils.Wrap2(c.rdb.Get(ctx, c.getConversationUserMinSeqKey(conversationID, userID)).Int64())
val, err := c.rdb.Get(ctx, c.getConversationUserMinSeqKey(conversationID, userID)).Int64()
if err != nil {
return 0, errs.Wrap(err)
}
return val, nil
}
func (c *msgCache) GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (m map[string]int64, err error) {
@ -224,7 +232,7 @@ func (c *msgCache) GetConversationUserMinSeqs(ctx context.Context, conversationI
}
func (c *msgCache) SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error {
return utils.Wrap1(c.rdb.Set(ctx, c.getConversationUserMinSeqKey(conversationID, userID), minSeq, 0).Err())
return errs.Wrap(c.rdb.Set(ctx, c.getConversationUserMinSeqKey(conversationID, userID), minSeq, 0).Err())
}
func (c *msgCache) SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) {
@ -240,7 +248,7 @@ func (c *msgCache) SetUserConversationsMinSeqs(ctx context.Context, userID strin
}
func (c *msgCache) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error {
return utils.Wrap1(c.rdb.Set(ctx, c.getHasReadSeqKey(conversationID, userID), hasReadSeq, 0).Err())
return errs.Wrap(c.rdb.Set(ctx, c.getHasReadSeqKey(conversationID, userID), hasReadSeq, 0).Err())
}
func (c *msgCache) SetHasReadSeqs(ctx context.Context, conversationID string, hasReadSeqs map[string]int64) error {
@ -262,12 +270,15 @@ func (c *msgCache) GetHasReadSeqs(ctx context.Context, userID string, conversati
}
func (c *msgCache) GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) {
return utils.Wrap2(c.rdb.Get(ctx, c.getHasReadSeqKey(conversationID, userID)).Int64())
val, err := c.rdb.Get(ctx, c.getHasReadSeqKey(conversationID, userID)).Int64()
if err != nil {
return 0, err
}
return val, nil
}
func (c *msgCache) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error {
key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID)
return errs.Wrap(c.rdb.HSet(ctx, key, token, flag).Err())
}
@ -694,7 +705,11 @@ func (c *msgCache) SetGetuiToken(ctx context.Context, token string, expireTime i
}
func (c *msgCache) GetGetuiToken(ctx context.Context) (string, error) {
return utils.Wrap2(c.rdb.Get(ctx, getuiToken).Result())
val, err := c.rdb.Get(ctx, getuiToken).Result()
if err != nil {
return "", errs.Wrap(err)
}
return val, nil
}
func (c *msgCache) SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error {
@ -702,7 +717,11 @@ func (c *msgCache) SetGetuiTaskID(ctx context.Context, taskID string, expireTime
}
func (c *msgCache) GetGetuiTaskID(ctx context.Context) (string, error) {
return utils.Wrap2(c.rdb.Get(ctx, getuiTaskID).Result())
val, err := c.rdb.Get(ctx, getuiTaskID).Result()
if err != nil {
return "", errs.Wrap(err)
}
return val, nil
}
func (c *msgCache) SetSendMsgStatus(ctx context.Context, id string, status int32) error {
@ -720,7 +739,11 @@ func (c *msgCache) SetFcmToken(ctx context.Context, account string, platformID i
}
func (c *msgCache) GetFcmToken(ctx context.Context, account string, platformID int) (string, error) {
return utils.Wrap2(c.rdb.Get(ctx, FCM_TOKEN+account+":"+strconv.Itoa(platformID)).Result())
val, err := c.rdb.Get(ctx, FCM_TOKEN+account+":"+strconv.Itoa(platformID)).Result()
if err != nil {
return "", errs.Wrap(err)
}
return val, nil
}
func (c *msgCache) DelFcmToken(ctx context.Context, account string, platformID int) error {
@ -738,7 +761,8 @@ func (c *msgCache) SetUserBadgeUnreadCountSum(ctx context.Context, userID string
}
func (c *msgCache) GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) {
return utils.Wrap2(c.rdb.Get(ctx, userBadgeUnreadCountSum+userID).Int())
val, err := c.rdb.Get(ctx, userBadgeUnreadCountSum+userID).Int()
return val, errs.Wrap(err)
}
func (c *msgCache) LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error {
@ -771,42 +795,31 @@ func (c *msgCache) getMessageReactionExPrefix(clientMsgID string, sessionType in
func (c *msgCache) JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) {
n, err := c.rdb.Exists(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType)).Result()
if err != nil {
return false, utils.Wrap(err, "")
return false, errs.Wrap(err)
}
return n > 0, nil
}
func (c *msgCache) SetMessageTypeKeyValue(
ctx context.Context,
clientMsgID string,
sessionType int32,
typeKey, value string,
) error {
func (c *msgCache) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error {
return errs.Wrap(c.rdb.HSet(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey, value).Err())
}
func (c *msgCache) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) {
return utils.Wrap2(c.rdb.Expire(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), expiration).Result())
val, err := c.rdb.Expire(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), expiration).Result()
return val, errs.Wrap(err)
}
func (c *msgCache) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) {
return utils.Wrap2(c.rdb.HGet(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey).Result())
val, err := c.rdb.HGet(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey).Result()
return val, errs.Wrap(err)
}
func (c *msgCache) GetOneMessageAllReactionList(
ctx context.Context,
clientMsgID string,
sessionType int32,
) (map[string]string, error) {
return utils.Wrap2(c.rdb.HGetAll(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType)).Result())
func (c *msgCache) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) {
val, err := c.rdb.HGetAll(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType)).Result()
return val, errs.Wrap(err)
}
func (c *msgCache) DeleteOneMessageKey(
ctx context.Context,
clientMsgID string,
sessionType int32,
subKey string,
) error {
func (c *msgCache) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error {
return errs.Wrap(c.rdb.HDel(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), subKey).Err())
}

@ -16,6 +16,7 @@ package controller
import (
"context"
"github.com/OpenIMSDK/tools/errs"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
@ -23,8 +24,6 @@ import (
"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/tools/tokenverify"
"github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
)
@ -78,7 +77,7 @@ func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformI
token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
tokenString, err := token.SignedString([]byte(a.accessSecret))
if err != nil {
return "", utils.Wrap(err, "")
return "", errs.Wrap(err)
}
return tokenString, a.cache.AddTokenFlag(ctx, userID, platformID, tokenString, constant.NormalToken)
}

@ -408,7 +408,7 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa
log.ZError(ctx, "SetHasReadSeqs error", err2, "userSeqMap", userSeqMap, "conversationID", conversationID)
prommetrics.SeqSetFailedCounter.Inc()
}
return lastMaxSeq, isNew, utils.Wrap(err, "")
return lastMaxSeq, isNew, errs.Wrap(err)
}
func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) {

@ -35,8 +35,6 @@ import (
"github.com/OpenIMSDK/protocol/sdkws"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/utils"
table "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation"
)
@ -79,7 +77,7 @@ func (m *MsgMongoDriver) UpdateMsg(
update := bson.M{"$set": bson.M{field: value}}
res, err := m.MsgCollection.UpdateOne(ctx, filter, update)
if err != nil {
return nil, utils.Wrap(err, "")
return nil, errs.Wrap(err)
}
return res, nil
}
@ -106,7 +104,7 @@ func (m *MsgMongoDriver) PushUnique(
}
res, err := m.MsgCollection.UpdateOne(ctx, filter, update)
if err != nil {
return nil, utils.Wrap(err, "")
return nil, errs.Wrap(err)
}
return res, nil
}
@ -118,7 +116,7 @@ func (m *MsgMongoDriver) UpdateMsgContent(ctx context.Context, docID string, ind
bson.M{"$set": bson.M{fmt.Sprintf("msgs.%d.msg", index): msg}},
)
if err != nil {
return utils.Wrap(err, "")
return errs.Wrap(err)
}
return nil
}
@ -133,7 +131,7 @@ func (m *MsgMongoDriver) UpdateMsgStatusByIndexInOneDoc(
msg.Status = status
bytes, err := proto.Marshal(msg)
if err != nil {
return utils.Wrap(err, "")
return errs.Wrap(err)
}
_, err = m.MsgCollection.UpdateOne(
ctx,
@ -141,7 +139,7 @@ func (m *MsgMongoDriver) UpdateMsgStatusByIndexInOneDoc(
bson.M{"$set": bson.M{fmt.Sprintf("msgs.%d.msg", seqIndex): bytes}},
)
if err != nil {
return utils.Wrap(err, "")
return errs.Wrap(err)
}
return nil
}
@ -167,12 +165,12 @@ func (m *MsgMongoDriver) GetMsgDocModelByIndex(
findOpts,
)
if err != nil {
return nil, utils.Wrap(err, "")
return nil, errs.Wrap(err)
}
var msgs []table.MsgDocModel
err = cursor.All(ctx, &msgs)
if err != nil {
return nil, utils.Wrap(err, fmt.Sprintf("cursor is %s", cursor.Current.String()))
return nil, errs.Wrap(err, fmt.Sprintf("cursor is %s", cursor.Current.String()))
}
if len(msgs) > 0 {
return &msgs[0], nil
@ -223,7 +221,7 @@ func (m *MsgMongoDriver) DeleteMsgsInOneDocByIndex(ctx context.Context, docID st
}
_, err := m.MsgCollection.UpdateMany(ctx, bson.M{"doc_id": docID}, updates)
if err != nil {
return utils.Wrap(err, "")
return errs.Wrap(err)
}
return nil
}

@ -18,7 +18,6 @@ import (
"context"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/utils"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
@ -119,7 +118,7 @@ func (u *UserMongoDriver) AddSubscriptionList(ctx context.Context, userID string
opts,
)
if err != nil {
return utils.Wrap(err, "transaction failed")
return errs.Wrap(err, "transaction failed")
}
}
return nil

@ -106,31 +106,31 @@ func PostReturn(ctx context.Context, url string, header map[string]string, input
}
func callBackPostReturn(ctx context.Context, url, command string, input interface{}, output callbackstruct.CallbackResp, callbackConfig config.CallBackConfig) error {
defer log.ZDebug(ctx, "callback", "url", url, "command", command, "input", input, "output", output, "callbackConfig", callbackConfig)
//
//v := urllib.Values{}
//v.Set(constant.CallbackCommand, command)
//url = url + "/" + v.Encode()
url = url + "/" + command
log.ZInfo(ctx, "callback", "url", url, "input", input, "config", callbackConfig)
b, err := Post(ctx, url, nil, input, callbackConfig.CallbackTimeOut)
if err != nil {
if callbackConfig.CallbackFailedContinue != nil && *callbackConfig.CallbackFailedContinue {
log.ZWarn(ctx, "callback failed but continue", err, "url", url)
log.ZInfo(ctx, "callback failed but continue", err, "url", url)
return nil
}
log.ZWarn(ctx, "callback network failed", err, "url", url, "input", input)
return errs.ErrNetwork.Wrap(err.Error())
}
defer log.ZDebug(ctx, "callback", "data", string(b))
if err = json.Unmarshal(b, output); err != nil {
if callbackConfig.CallbackFailedContinue != nil && *callbackConfig.CallbackFailedContinue {
log.ZWarn(ctx, "callback failed but continue", err, "url", url)
return nil
}
log.ZWarn(ctx, "callback json unmarshal failed", err, "url", url, "input", input, "response", string(b))
return errs.ErrData.WithDetail(err.Error() + "response format error")
}
return output.Parse()
if err := output.Parse(); err != nil {
log.ZWarn(ctx, "callback parse failed", err, "url", url, "input", input, "response", string(b))
return err
}
log.ZInfo(ctx, "callback success", "url", url, "input", input, "response", string(b))
return nil
}
func CallBackPostReturn(ctx context.Context, url string, req callbackstruct.CallbackReq, resp callbackstruct.CallbackResp, callbackConfig config.CallBackConfig) error {

@ -27,7 +27,6 @@ import (
"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/mcontext"
"github.com/OpenIMSDK/tools/utils"
"google.golang.org/protobuf/proto"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
@ -153,10 +152,10 @@ func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Messag
// Marshal the protobuf message
bMsg, err := proto.Marshal(msg)
if err != nil {
return 0, 0, utils.Wrap(err, "kafka proto Marshal err")
return 0, 0, errs.Wrap(err, "kafka proto Marshal err")
}
if len(bMsg) == 0 {
return 0, 0, utils.Wrap(errEmptyMsg, "")
return 0, 0, errs.Wrap(errEmptyMsg, "")
}
// Prepare Kafka message
@ -168,13 +167,13 @@ func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Messag
// Validate message key and value
if kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 {
return 0, 0, utils.Wrap(errEmptyMsg, "")
return 0, 0, errs.Wrap(errEmptyMsg)
}
// Attach context metadata as headers
header, err := GetMQHeaderWithContext(ctx)
if err != nil {
return 0, 0, utils.Wrap(err, "")
return 0, 0, errs.Wrap(err)
}
kMsg.Headers = header
@ -182,7 +181,7 @@ func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Messag
partition, offset, err := p.producer.SendMessage(kMsg)
if err != nil {
log.ZWarn(ctx, "p.producer.SendMessage error", err)
return 0, 0, utils.Wrap(err, "")
return 0, 0, errs.Wrap(err)
}
log.ZDebug(ctx, "ByteEncoder SendMessage end", "key", kMsg.Key, "key length", kMsg.Value.Length())

Loading…
Cancel
Save