|
|
|
@ -10,7 +10,6 @@ import (
|
|
|
|
|
"Open_IM/pkg/common/tracelog"
|
|
|
|
|
cacheRpc "Open_IM/pkg/proto/cache"
|
|
|
|
|
"Open_IM/pkg/proto/msg"
|
|
|
|
|
pbPush "Open_IM/pkg/proto/push"
|
|
|
|
|
sdkws "Open_IM/pkg/proto/sdkws"
|
|
|
|
|
"Open_IM/pkg/utils"
|
|
|
|
|
"context"
|
|
|
|
@ -310,40 +309,6 @@ func (rpc *msgServer) encapsulateMsgData(msg *sdkws.MsgData) {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (rpc *msgServer) sendMsgToWriter(ctx context.Context, m *pbChat.MsgDataToMQ, key string, status string) error {
|
|
|
|
|
switch status {
|
|
|
|
|
case constant.OnlineStatus:
|
|
|
|
|
if m.MsgData.ContentType == constant.SignalingNotification {
|
|
|
|
|
rpcPushMsg := pbPush.PushMsgReq{OperationID: m.OperationID, MsgData: m.MsgData, PushToUserID: key}
|
|
|
|
|
grpcConn, err := rpc.GetConn(ctx, config.Config.RpcRegisterName.OpenImPushName)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
msgClient := pbPush.NewPushMsgServiceClient(grpcConn)
|
|
|
|
|
_, err = msgClient.PushMsg(context.Background(), &rpcPushMsg)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error(rpcPushMsg.OperationID, "rpc send failed", rpcPushMsg.OperationID, "push data", rpcPushMsg.String(), "err", err.Error())
|
|
|
|
|
return err
|
|
|
|
|
} else {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
pid, offset, err := rpc.messageWriter.SendMessage(m, key, m.OperationID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error(m.OperationID, "kafka send failed", "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key, status)
|
|
|
|
|
} else {
|
|
|
|
|
// log.NewWarn(m.OperationID, "sendMsgToWriter client msgID ", m.MsgData.ClientMsgID)
|
|
|
|
|
}
|
|
|
|
|
return err
|
|
|
|
|
case constant.OfflineStatus:
|
|
|
|
|
pid, offset, err := rpc.messageWriter.SendMessage(m, key, m.OperationID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error(m.OperationID, "kafka send failed", "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key, status)
|
|
|
|
|
}
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
return errors.New("status error")
|
|
|
|
|
}
|
|
|
|
|
func GetMsgID(sendID string) string {
|
|
|
|
|
t := time.Now().Format("2006-01-02 15:04:05")
|
|
|
|
|
return utils.Md5(t + "-" + sendID + "-" + strconv.Itoa(rand.Int()))
|
|
|
|
@ -390,29 +355,6 @@ func modifyMessageByUserMessageReceiveOpt(userID, sourceID string, sessionType i
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func modifyMessageByUserMessageReceiveOptoptimization(userID, sourceID string, sessionType int, operationID string, options *map[string]bool) bool {
|
|
|
|
|
conversationID := utils.GetConversationIDBySessionType(sourceID, sessionType)
|
|
|
|
|
opt, err := db.DB.GetSingleConversationRecvMsgOpt(userID, conversationID)
|
|
|
|
|
if err != nil && err != go_redis.Nil {
|
|
|
|
|
log.NewError(operationID, "GetSingleConversationMsgOpt from redis err", userID, conversationID, err.Error())
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
switch opt {
|
|
|
|
|
case constant.ReceiveMessage:
|
|
|
|
|
return true
|
|
|
|
|
case constant.NotReceiveMessage:
|
|
|
|
|
return false
|
|
|
|
|
case constant.ReceiveNotNotifyMessage:
|
|
|
|
|
if *options == nil {
|
|
|
|
|
*options = make(map[string]bool, 10)
|
|
|
|
|
}
|
|
|
|
|
utils.SetSwitchFromOptions(*options, constant.IsOfflinePush, false)
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func valueCopy(pb *msg.SendMsgReq) *msg.SendMsgReq {
|
|
|
|
|
offlinePushInfo := sdkws.OfflinePushInfo{}
|
|
|
|
|
if pb.MsgData.OfflinePushInfo != nil {
|
|
|
|
|