1、统一结构体方法 receiver,都用 pointer
2、使用 errors.Is 来做错误判断
3、修复单词拼写的错误
pull/2541/head
Mew151 1 year ago committed by GitHub
parent 7f6b4da8eb
commit 1022b297d4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -49,14 +49,14 @@ func NewMessageApi(msgRpcClient *rpcclient.Message, userRpcClient *rpcclient.Use
userRpcClient: rpcclient.NewUserRpcClientByUser(userRpcClient), imAdminUserID: imAdminUserID} userRpcClient: rpcclient.NewUserRpcClientByUser(userRpcClient), imAdminUserID: imAdminUserID}
} }
func (MessageApi) SetOptions(options map[string]bool, value bool) { func (*MessageApi) SetOptions(options map[string]bool, value bool) {
datautil.SetSwitchFromOptions(options, constant.IsHistory, value) datautil.SetSwitchFromOptions(options, constant.IsHistory, value)
datautil.SetSwitchFromOptions(options, constant.IsPersistent, value) datautil.SetSwitchFromOptions(options, constant.IsPersistent, value)
datautil.SetSwitchFromOptions(options, constant.IsSenderSync, value) datautil.SetSwitchFromOptions(options, constant.IsSenderSync, value)
datautil.SetSwitchFromOptions(options, constant.IsConversationUpdate, value) datautil.SetSwitchFromOptions(options, constant.IsConversationUpdate, value)
} }
func (m MessageApi) newUserSendMsgReq(_ *gin.Context, params *apistruct.SendMsg) *msg.SendMsgReq { func (m *MessageApi) newUserSendMsgReq(_ *gin.Context, params *apistruct.SendMsg) *msg.SendMsgReq {
var newContent string var newContent string
options := make(map[string]bool, 5) options := make(map[string]bool, 5)
switch params.ContentType { switch params.ContentType {
@ -231,7 +231,7 @@ func (m *MessageApi) SendMessage(c *gin.Context) {
} }
// Set the status to successful if the message is sent. // Set the status to successful if the message is sent.
var status int = constant.MsgSendSuccessed var status = constant.MsgSendSuccessed
// Attempt to update the message sending status in the system. // Attempt to update the message sending status in the system.
_, err = m.Client.SetSendMsgStatus(c, &msg.SetSendMsgStatusReq{ _, err = m.Client.SetSendMsgStatus(c, &msg.SetSendMsgStatusReq{

@ -16,6 +16,7 @@ package msgtransfer
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
@ -137,7 +138,7 @@ func (m *MsgTransfer) Start(index int, config *Config) error {
return return
} }
if err := prommetrics.TransferInit(prometheusPort); err != nil && err != http.ErrServerClosed { if err := prommetrics.TransferInit(prometheusPort); err != nil && !errors.Is(err, http.ErrServerClosed) {
netErr = errs.WrapMsg(err, "prometheus start error", "prometheusPort", prometheusPort) netErr = errs.WrapMsg(err, "prometheus start error", "prometheusPort", prometheusPort)
netDone <- struct{}{} netDone <- struct{}{}
} }

@ -16,6 +16,7 @@ package msgtransfer
import ( import (
"context" "context"
"errors"
"github.com/IBM/sarama" "github.com/IBM/sarama"
"github.com/go-redis/redis" "github.com/go-redis/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
@ -187,7 +188,7 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key
if len(storageMessageList) > 0 { if len(storageMessageList) > 0 {
msg := storageMessageList[0] msg := storageMessageList[0]
lastSeq, isNewConversation, err := och.msgDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList) lastSeq, isNewConversation, err := och.msgDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList)
if err != nil && errs.Unwrap(err) != redis.Nil { if err != nil && !errors.Is(errs.Unwrap(err), redis.Nil) {
log.ZError(ctx, "batch data insert to redis err", err, "storageMsgList", storageMessageList) log.ZError(ctx, "batch data insert to redis err", err, "storageMsgList", storageMessageList)
return return
} }

@ -91,13 +91,13 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont
} }
} }
func (OnlineHistoryMongoConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } func (*OnlineHistoryMongoConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (OnlineHistoryMongoConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } func (*OnlineHistoryMongoConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (mc *OnlineHistoryMongoConsumerHandler) ConsumeClaim( func (mc *OnlineHistoryMongoConsumerHandler) ConsumeClaim(
sess sarama.ConsumerGroupSession, sess sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim, claim sarama.ConsumerGroupClaim,
) error { // a instance in the consumer group ) error { // an instance in the consumer group
log.ZDebug(context.Background(), "online new session msg come", "highWaterMarkOffset", log.ZDebug(context.Background(), "online new session msg come", "highWaterMarkOffset",
claim.HighWaterMarkOffset(), "topic", claim.Topic(), "partition", claim.Partition()) claim.HighWaterMarkOffset(), "topic", claim.Topic(), "partition", claim.Partition())
for msg := range claim.Messages() { for msg := range claim.Messages() {

@ -19,20 +19,20 @@ type OnlinePusher interface {
pushToUserIDs *[]string) []string pushToUserIDs *[]string) []string
} }
type emptyOnlinePUsher struct{} type emptyOnlinePusher struct{}
func newEmptyOnlinePUsher() *emptyOnlinePUsher { func newEmptyOnlinePusher() *emptyOnlinePusher {
return &emptyOnlinePUsher{} return &emptyOnlinePusher{}
} }
func (emptyOnlinePUsher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, func (emptyOnlinePusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData,
pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) { pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) {
log.ZWarn(ctx, "emptyOnlinePUsher GetConnsAndOnlinePush", nil) log.ZWarn(ctx, "emptyOnlinePusher GetConnsAndOnlinePush", nil)
return nil, nil return nil, nil
} }
func (u emptyOnlinePUsher) GetOnlinePushFailedUserIDs(ctx context.Context, msg *sdkws.MsgData, func (u emptyOnlinePusher) GetOnlinePushFailedUserIDs(ctx context.Context, msg *sdkws.MsgData,
wsResults []*msggateway.SingleMsgToUserResults, pushToUserIDs *[]string) []string { wsResults []*msggateway.SingleMsgToUserResults, pushToUserIDs *[]string) []string {
log.ZWarn(ctx, "emptyOnlinePUsher GetOnlinePushFailedUserIDs", nil) log.ZWarn(ctx, "emptyOnlinePusher GetOnlinePushFailedUserIDs", nil)
return nil return nil
} }
@ -45,7 +45,7 @@ func NewOnlinePusher(disCov discovery.SvcDiscoveryRegistry, config *Config) Onli
case "etcd": case "etcd":
return NewDefaultAllNode(disCov, config) return NewDefaultAllNode(disCov, config)
default: default:
return newEmptyOnlinePUsher() return newEmptyOnlinePusher()
} }
} }

@ -154,17 +154,17 @@ func (c *ConsumerHandler) Push2User(ctx context.Context, userIDs []string, msg *
return nil return nil
} }
} }
offlinePUshUserID := []string{msg.RecvID} offlinePushUserID := []string{msg.RecvID}
//receiver offline push //receiver offline push
if err = c.webhookBeforeOfflinePush(ctx, &c.config.WebhooksConfig.BeforeOfflinePush, if err = c.webhookBeforeOfflinePush(ctx, &c.config.WebhooksConfig.BeforeOfflinePush,
offlinePUshUserID, msg, nil); err != nil { offlinePushUserID, msg, nil); err != nil {
return err return err
} }
err = c.offlinePushMsg(ctx, msg, offlinePUshUserID) err = c.offlinePushMsg(ctx, msg, offlinePushUserID)
if err != nil { if err != nil {
log.ZWarn(ctx, "offlinePushMsg failed", err, "offlinePUshUserID", offlinePUshUserID, "msg", msg) log.ZWarn(ctx, "offlinePushMsg failed", err, "offlinePushUserID", offlinePushUserID, "msg", msg)
return nil return nil
} }

@ -90,8 +90,8 @@ type CommonMsgDatabase interface {
// to mq // to mq
MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error
MsgToPushMQ(ctx context.Context, key, conversarionID string, msg2mq *sdkws.MsgData) (int32, int64, error) MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error)
MsgToMongoMQ(ctx context.Context, key, conversarionID string, msgs []*sdkws.MsgData, lastSeq int64) error MsgToMongoMQ(ctx context.Context, key, conversationID string, msgs []*sdkws.MsgData, lastSeq int64) error
RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error) RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error)
RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*model.GroupCount, dateCount map[string]int64, err error) RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*model.GroupCount, dateCount map[string]int64, err error)

@ -118,9 +118,9 @@ func (m *MsgMgo) GetMsgBySeqIndexIn1Doc(ctx context.Context, userID, docID strin
} }
func (m *MsgMgo) getMsgBySeqIndexIn1Doc(ctx context.Context, userID, docID string, seqs []int64) ([]*model.MsgInfoModel, error) { func (m *MsgMgo) getMsgBySeqIndexIn1Doc(ctx context.Context, userID, docID string, seqs []int64) ([]*model.MsgInfoModel, error) {
indexs := make([]int64, 0, len(seqs)) indexes := make([]int64, 0, len(seqs))
for _, seq := range seqs { for _, seq := range seqs {
indexs = append(indexs, m.model.GetMsgIndex(seq)) indexes = append(indexes, m.model.GetMsgIndex(seq))
} }
pipeline := mongo.Pipeline{ pipeline := mongo.Pipeline{
bson.D{{Key: "$match", Value: bson.D{ bson.D{{Key: "$match", Value: bson.D{
@ -131,7 +131,7 @@ func (m *MsgMgo) getMsgBySeqIndexIn1Doc(ctx context.Context, userID, docID strin
{Key: "doc_id", Value: 1}, {Key: "doc_id", Value: 1},
{Key: "msgs", Value: bson.D{ {Key: "msgs", Value: bson.D{
{Key: "$map", Value: bson.D{ {Key: "$map", Value: bson.D{
{Key: "input", Value: indexs}, {Key: "input", Value: indexes},
{Key: "as", Value: "index"}, {Key: "as", Value: "index"},
{Key: "in", Value: bson.D{ {Key: "in", Value: bson.D{
{Key: "$arrayElemAt", Value: bson.A{"$msgs", "$$index"}}, {Key: "$arrayElemAt", Value: bson.A{"$msgs", "$$index"}},

@ -92,15 +92,15 @@ type GroupCount struct {
Count int64 `bson:"count"` Count int64 `bson:"count"`
} }
func (MsgDocModel) TableName() string { func (*MsgDocModel) TableName() string {
return MsgTableName return MsgTableName
} }
func (MsgDocModel) GetSingleGocMsgNum() int64 { func (*MsgDocModel) GetSingleGocMsgNum() int64 {
return singleGocMsgNum return singleGocMsgNum
} }
func (MsgDocModel) GetSingleGocMsgNum5000() int64 { func (*MsgDocModel) GetSingleGocMsgNum5000() int64 {
return singleGocMsgNum5000 return singleGocMsgNum5000
} }
@ -108,12 +108,12 @@ func (m *MsgDocModel) IsFull() bool {
return m.Msg[len(m.Msg)-1].Msg != nil return m.Msg[len(m.Msg)-1].Msg != nil
} }
func (m MsgDocModel) GetDocID(conversationID string, seq int64) string { func (m *MsgDocModel) GetDocID(conversationID string, seq int64) string {
seqSuffix := (seq - 1) / singleGocMsgNum seqSuffix := (seq - 1) / singleGocMsgNum
return m.indexGen(conversationID, seqSuffix) return m.indexGen(conversationID, seqSuffix)
} }
func (m MsgDocModel) GetDocIDSeqsMap(conversationID string, seqs []int64) map[string][]int64 { func (m *MsgDocModel) GetDocIDSeqsMap(conversationID string, seqs []int64) map[string][]int64 {
t := make(map[string][]int64) t := make(map[string][]int64)
for i := 0; i < len(seqs); i++ { for i := 0; i < len(seqs); i++ {
docID := m.GetDocID(conversationID, seqs[i]) docID := m.GetDocID(conversationID, seqs[i])
@ -127,15 +127,15 @@ func (m MsgDocModel) GetDocIDSeqsMap(conversationID string, seqs []int64) map[st
return t return t
} }
func (MsgDocModel) GetMsgIndex(seq int64) int64 { func (*MsgDocModel) GetMsgIndex(seq int64) int64 {
return (seq - 1) % singleGocMsgNum return (seq - 1) % singleGocMsgNum
} }
func (MsgDocModel) indexGen(conversationID string, seqSuffix int64) string { func (*MsgDocModel) indexGen(conversationID string, seqSuffix int64) string {
return conversationID + ":" + strconv.FormatInt(seqSuffix, 10) return conversationID + ":" + strconv.FormatInt(seqSuffix, 10)
} }
func (MsgDocModel) GenExceptionMessageBySeqs(seqs []int64) (exceptionMsg []*sdkws.MsgData) { func (*MsgDocModel) GenExceptionMessageBySeqs(seqs []int64) (exceptionMsg []*sdkws.MsgData) {
for _, v := range seqs { for _, v := range seqs {
msgModel := new(sdkws.MsgData) msgModel := new(sdkws.MsgData)
msgModel.Seq = v msgModel.Seq = v

Loading…
Cancel
Save