diff --git a/internal/api/manage/management_chat.go b/internal/api/manage/management_chat.go index 9a74dcb60..d18dfc451 100644 --- a/internal/api/manage/management_chat.go +++ b/internal/api/manage/management_chat.go @@ -508,4 +508,5 @@ type MessageRevoked struct { ClientMsgID string `mapstructure:"clientMsgID" json:"clientMsgID" validate:"required"` RevokerNickname string `mapstructure:"revokerNickname" json:"revokerNickname"` SessionType int32 `mapstructure:"sessionType" json:"sessionType" validate:"required"` + Seq uint32 `mapstructure:"seq" json:"seq" validate:"required"` } diff --git a/internal/api/user/user.go b/internal/api/user/user.go index dff63db97..6ba0855e4 100644 --- a/internal/api/user/user.go +++ b/internal/api/user/user.go @@ -164,7 +164,7 @@ func GetUsersPublicInfo(c *gin.Context) { params := api.GetUsersInfoReq{} if err := c.BindJSON(¶ms); err != nil { log.NewError("0", "BindJSON failed ", err.Error()) - c.JSON(http.StatusBadRequest, gin.H{"errCode": http.StatusBadRequest, "errMsg": err.Error()}) + c.JSON(http.StatusOK, gin.H{"errCode": http.StatusBadRequest, "errMsg": err.Error()}) return } req := &rpc.GetUserInfoReq{} @@ -176,7 +176,7 @@ func GetUsersPublicInfo(c *gin.Context) { if !ok { errMsg := req.OperationID + " " + "GetUserIDFromToken failed " + errInfo + " token:" + c.Request.Header.Get("token") log.NewError(req.OperationID, errMsg) - c.JSON(http.StatusBadRequest, gin.H{"errCode": 500, "errMsg": errMsg}) + c.JSON(http.StatusOK, gin.H{"errCode": 500, "errMsg": errMsg}) return } diff --git a/internal/rpc/msg/query_msg.go b/internal/rpc/msg/query_msg.go new file mode 100644 index 000000000..31a088354 --- /dev/null +++ b/internal/rpc/msg/query_msg.go @@ -0,0 +1,48 @@ +package msg + +import ( + commonDB "Open_IM/pkg/common/db" + "Open_IM/pkg/common/log" + promePkg "Open_IM/pkg/common/prometheus" + "Open_IM/pkg/proto/msg" + "context" + go_redis "github.com/go-redis/redis/v8" +) + +func (rpc *rpcChat) GetSuperGroupMsg(context context.Context, req *msg.GetSuperGroupMsgReq) (*msg.GetSuperGroupMsgResp, error) { + resp := new(msg.GetSuperGroupMsgResp) + redisMsgList, failedSeqList, err := commonDB.DB.GetMessageListBySeq(req.GroupID, []uint32{req.Seq}, req.OperationID) + if err != nil { + if err != go_redis.Nil { + promePkg.PromeAdd(promePkg.MsgPullFromRedisFailedCounter, len(failedSeqList)) + log.Error(req.OperationID, "get message from redis exception", err.Error(), failedSeqList) + } else { + log.Debug(req.OperationID, "get message from redis is nil", failedSeqList) + } + msgList, err1 := commonDB.DB.GetSuperGroupMsgBySeqListMongo(req.GroupID, failedSeqList, req.OperationID) + if err1 != nil { + promePkg.PromeAdd(promePkg.MsgPullFromMongoFailedCounter, len(failedSeqList)) + log.Error(req.OperationID, "GetSuperGroupMsg data error", req.String(), err.Error()) + resp.ErrCode = 201 + resp.ErrMsg = err.Error() + return resp, nil + } else { + promePkg.PromeAdd(promePkg.MsgPullFromMongoSuccessCounter, len(msgList)) + redisMsgList = append(redisMsgList, msgList...) + for _, m := range msgList { + resp.MsgData = m + } + + } + } else { + promePkg.PromeAdd(promePkg.MsgPullFromRedisSuccessCounter, len(redisMsgList)) + for _, m := range redisMsgList { + resp.MsgData = m + } + } + return resp, nil +} + +func (rpc *rpcChat) GetWriteDiffMsg(context context.Context, req *msg.GetWriteDiffMsgReq) (*msg.GetWriteDiffMsgResp, error) { + panic("implement me") +} diff --git a/internal/rpc/msg/rpcChat.go b/internal/rpc/msg/rpcChat.go index c20a613db..415d6fff1 100644 --- a/internal/rpc/msg/rpcChat.go +++ b/internal/rpc/msg/rpcChat.go @@ -10,6 +10,7 @@ import ( "Open_IM/pkg/grpc-etcdv3/getcdv3" "Open_IM/pkg/proto/msg" "Open_IM/pkg/utils" + "github.com/golang/protobuf/proto" "net" "strconv" "strings" @@ -19,17 +20,15 @@ import ( "google.golang.org/grpc" ) -//var ( -// sendMsgSuccessCounter prometheus.Counter -// sendMsgFailedCounter prometheus.Counter -//) - +type MessageWriter interface { + SendMessage(m proto.Message, key string, operationID string) (int32, int64, error) +} type rpcChat struct { rpcPort int rpcRegisterName string etcdSchema string etcdAddr []string - onlineProducer *kafka.Producer + messageWriter MessageWriter //offlineProducer *kafka.Producer delMsgCh chan deleteMsg } @@ -49,7 +48,7 @@ func NewRpcChatServer(port int) *rpcChat { etcdSchema: config.Config.Etcd.EtcdSchema, etcdAddr: config.Config.Etcd.EtcdAddr, } - rc.onlineProducer = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic) + rc.messageWriter = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic) //rc.offlineProducer = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschatOffline.Addr, config.Config.Kafka.Ws2mschatOffline.Topic) rc.delMsgCh = make(chan deleteMsg, 1000) return &rc diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index bf553714f..bcb6ba524 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -36,6 +36,26 @@ var ( ExcludeContentType = []int{constant.HasReadReceipt, constant.GroupHasReadReceipt} ) +type Validator interface { + validate(pb *pbChat.SendMsgReq) (bool, int32, string) +} + +//type MessageValidator struct { +// +//} + +type MessageRevoked struct { + RevokerID string `json:"revokerID"` + RevokerRole int32 `json:"revokerRole"` + ClientMsgID string `json:"clientMsgID"` + RevokerNickname string `json:"revokerNickname"` + RevokeTime int64 `json:"revokeTime"` + SourceMessageSendTime int64 `json:"sourceMessageSendTime"` + SourceMessageSendID string `json:"sourceMessageSendID"` + SourceMessageSenderNickname string `json:"sourceMessageSenderNickname"` + SessionType int32 `json:"sessionType"` + Seq uint32 `json:"seq"` +} type MsgCallBackReq struct { SendID string `json:"sendID"` RecvID string `json:"recvID"` @@ -76,7 +96,7 @@ func isMessageHasReadEnabled(pb *pbChat.SendMsgReq) (bool, int32, string) { return true, 0, "" } -func messageVerification(data *pbChat.SendMsgReq) (bool, int32, string, []string) { +func (rpc *rpcChat) messageVerification(data *pbChat.SendMsgReq) (bool, int32, string, []string) { switch data.MsgData.SessionType { case constant.SingleChatType: if utils.IsContain(data.MsgData.SendID, config.Config.Manager.AppManagerUid) { @@ -140,6 +160,31 @@ func messageVerification(data *pbChat.SendMsgReq) (bool, int32, string, []string if err != nil { return false, 201, err.Error(), nil } + if data.MsgData.ContentType == constant.AdvancedRevoke { + revokeMesage := new(MessageRevoked) + err := utils.JsonStringToStruct(string(data.MsgData.Content), revokeMesage) + if err != nil { + log.Error(data.OperationID, "json unmarshal err:", err.Error()) + return false, 201, err.Error(), nil + } + req := pbChat.GetSuperGroupMsgReq{OperationID: data.OperationID, Seq: revokeMesage.Seq, GroupID: data.MsgData.GroupID} + resp, err := rpc.GetSuperGroupMsg(context.Background(), &req) + if err != nil { + log.Error(data.OperationID, "GetSuperGroupMsgReq err:", err.Error()) + } else if resp.ErrCode != 0 { + log.Error(data.OperationID, "GetSuperGroupMsgReq err:", err.Error()) + } else { + if resp.MsgData != nil && resp.MsgData.ClientMsgID == revokeMesage.ClientMsgID && resp.MsgData.Seq == revokeMesage.Seq { + revokeMesage.SourceMessageSendTime = resp.MsgData.SendTime + revokeMesage.SourceMessageSenderNickname = resp.MsgData.SenderNickname + revokeMesage.SourceMessageSendID = resp.MsgData.SendID + data.MsgData.Content = []byte(utils.StructToJsonString(revokeMesage)) + } else { + return false, 201, errors.New("msg err").Error(), nil + } + } + + } if groupInfo.GroupType == constant.SuperGroup { return true, 0, "", nil } else { @@ -149,29 +194,6 @@ func messageVerification(data *pbChat.SendMsgReq) (bool, int32, string, []string log.NewError(data.OperationID, errMsg) return false, 201, errMsg, nil } - - // - //getGroupMemberIDListFromCacheReq := &pbCache.GetGroupMemberIDListFromCacheReq{OperationID: data.OperationID, GroupID: data.MsgData.GroupID} - //etcdConn := getcdv3.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName, data.OperationID) - //if etcdConn == nil { - // errMsg := data.OperationID + "getcdv3.GetDefaultConn == nil" - // log.NewError(data.OperationID, errMsg) - // return false, 201, errMsg, nil - //} - //client := pbCache.NewCacheClient(etcdConn) - // cacheResp, err := client.GetGroupMemberIDListFromCache(context.Background(), getGroupMemberIDListFromCacheReq) - // - // - //if err != nil { - // log.NewError(data.OperationID, "GetGroupMemberIDListFromCache rpc call failed ", err.Error()) - // //return returnMsg(&replay, pb, 201, "GetGroupMemberIDListFromCache failed", "", 0) - // return false, 201, err.Error(), nil - //} - //if cacheResp.CommonResp.ErrCode != 0 { - // log.NewError(data.OperationID, "GetGroupMemberIDListFromCache rpc logic call failed ", cacheResp.String()) - // //return returnMsg(&replay, pb, 201, "GetGroupMemberIDListFromCache logic failed", "", 0) - // return false, cacheResp.CommonResp.ErrCode, cacheResp.CommonResp.ErrMsg, nil - //} if !token_verify.IsManagerUserID(data.MsgData.SendID) { if data.MsgData.ContentType <= constant.NotificationEnd && data.MsgData.ContentType >= constant.NotificationBegin { return true, 0, "", userIDList @@ -281,7 +303,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S return returnMsg(&replay, pb, int32(callbackResp.ErrCode), callbackResp.ErrMsg, "", 0) } t1 = time.Now() - flag, errCode, errMsg, _ = messageVerification(pb) + flag, errCode, errMsg, _ = rpc.messageVerification(pb) log.Debug(pb.OperationID, "messageVerification ", flag, " cost time: ", time.Since(t1)) if !flag { return returnMsg(&replay, pb, errCode, errMsg, "", 0) @@ -293,8 +315,8 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S msgToMQSingle.MsgData = pb.MsgData log.NewInfo(msgToMQSingle.OperationID, msgToMQSingle) t1 = time.Now() - err1 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.RecvID, constant.OnlineStatus) - log.Info(pb.OperationID, "sendMsgToKafka ", " cost time: ", time.Since(t1)) + err1 := rpc.sendMsgToWriter(&msgToMQSingle, msgToMQSingle.MsgData.RecvID, constant.OnlineStatus) + log.Info(pb.OperationID, "sendMsgToWriter ", " cost time: ", time.Since(t1)) if err1 != nil { log.NewError(msgToMQSingle.OperationID, "kafka send msg err :RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String(), err1.Error()) promePkg.PromeInc(promePkg.SingleChatMsgProcessFailedCounter) @@ -303,8 +325,8 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S } if msgToMQSingle.MsgData.SendID != msgToMQSingle.MsgData.RecvID { //Filter messages sent to yourself t1 = time.Now() - err2 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.SendID, constant.OnlineStatus) - log.Info(pb.OperationID, "sendMsgToKafka ", " cost time: ", time.Since(t1)) + err2 := rpc.sendMsgToWriter(&msgToMQSingle, msgToMQSingle.MsgData.SendID, constant.OnlineStatus) + log.Info(pb.OperationID, "sendMsgToWriter ", " cost time: ", time.Since(t1)) if err2 != nil { log.NewError(msgToMQSingle.OperationID, "kafka send msg err:SendID", msgToMQSingle.MsgData.SendID, msgToMQSingle.String()) promePkg.PromeInc(promePkg.SingleChatMsgProcessFailedCounter) @@ -336,7 +358,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S return returnMsg(&replay, pb, int32(callbackResp.ErrCode), callbackResp.ErrMsg, "", 0) } var memberUserIDList []string - if flag, errCode, errMsg, memberUserIDList = messageVerification(pb); !flag { + if flag, errCode, errMsg, memberUserIDList = rpc.messageVerification(pb); !flag { promePkg.PromeInc(promePkg.GroupChatMsgProcessFailedCounter) return returnMsg(&replay, pb, errCode, errMsg, "", 0) } @@ -473,14 +495,14 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S t1 = time.Now() msgToMQSingle.MsgData = pb.MsgData log.NewInfo(msgToMQSingle.OperationID, msgToMQSingle) - err1 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.RecvID, constant.OnlineStatus) + err1 := rpc.sendMsgToWriter(&msgToMQSingle, msgToMQSingle.MsgData.RecvID, constant.OnlineStatus) if err1 != nil { log.NewError(msgToMQSingle.OperationID, "kafka send msg err:RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String()) return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) } if msgToMQSingle.MsgData.SendID != msgToMQSingle.MsgData.RecvID { //Filter messages sent to yourself - err2 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.SendID, constant.OnlineStatus) + err2 := rpc.sendMsgToWriter(&msgToMQSingle, msgToMQSingle.MsgData.SendID, constant.OnlineStatus) if err2 != nil { log.NewError(msgToMQSingle.OperationID, "kafka send msg err:SendID", msgToMQSingle.MsgData.SendID, msgToMQSingle.String()) return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) @@ -504,13 +526,13 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSuperGroupMsg result", "end rpc and return", callbackResp) return returnMsg(&replay, pb, int32(callbackResp.ErrCode), callbackResp.ErrMsg, "", 0) } - if flag, errCode, errMsg, _ = messageVerification(pb); !flag { + if flag, errCode, errMsg, _ = rpc.messageVerification(pb); !flag { promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgProcessFailedCounter) return returnMsg(&replay, pb, errCode, errMsg, "", 0) } msgToMQSingle.MsgData = pb.MsgData log.NewInfo(msgToMQSingle.OperationID, msgToMQSingle) - err1 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.GroupID, constant.OnlineStatus) + err1 := rpc.sendMsgToWriter(&msgToMQSingle, msgToMQSingle.MsgData.GroupID, constant.OnlineStatus) if err1 != nil { log.NewError(msgToMQSingle.OperationID, "kafka send msg err:RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String()) promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgProcessFailedCounter) @@ -529,7 +551,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S } } -func (rpc *rpcChat) sendMsgToKafka(m *pbChat.MsgDataToMQ, key string, status string) error { +func (rpc *rpcChat) sendMsgToWriter(m *pbChat.MsgDataToMQ, key string, status string) error { switch status { case constant.OnlineStatus: if m.MsgData.ContentType == constant.SignalingNotification { @@ -548,15 +570,15 @@ func (rpc *rpcChat) sendMsgToKafka(m *pbChat.MsgDataToMQ, key string, status str return nil } } - pid, offset, err := rpc.onlineProducer.SendMessage(m, key, m.OperationID) + pid, offset, err := rpc.messageWriter.SendMessage(m, key, m.OperationID) if err != nil { log.Error(m.OperationID, "kafka send failed", "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key, status) } else { - // log.NewWarn(m.OperationID, "sendMsgToKafka client msgID ", m.MsgData.ClientMsgID) + // log.NewWarn(m.OperationID, "sendMsgToWriter client msgID ", m.MsgData.ClientMsgID) } return err case constant.OfflineStatus: - pid, offset, err := rpc.onlineProducer.SendMessage(m, key, m.OperationID) + pid, offset, err := rpc.messageWriter.SendMessage(m, key, m.OperationID) if err != nil { log.Error(m.OperationID, "kafka send failed", "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key, status) } @@ -1014,15 +1036,15 @@ func (rpc *rpcChat) sendMsgToGroup(list []string, pb pbChat.SendMsgReq, status s isSend := modifyMessageByUserMessageReceiveOpt(v, msgData.GroupID, constant.GroupChatType, &groupPB) if isSend { msgToMQGroup.MsgData = groupPB.MsgData - // log.Debug(groupPB.OperationID, "sendMsgToKafka, ", v, groupID, msgToMQGroup.String()) - err := rpc.sendMsgToKafka(&msgToMQGroup, v, status) + // log.Debug(groupPB.OperationID, "sendMsgToWriter, ", v, groupID, msgToMQGroup.String()) + err := rpc.sendMsgToWriter(&msgToMQGroup, v, status) if err != nil { log.NewError(msgToMQGroup.OperationID, "kafka send msg err:UserId", v, msgToMQGroup.String()) } else { *sendTag = true } } else { - log.Debug(groupPB.OperationID, "not sendMsgToKafka, ", v) + log.Debug(groupPB.OperationID, "not sendMsgToWriter, ", v) } } wg.Done() @@ -1047,14 +1069,14 @@ func (rpc *rpcChat) sendMsgToGroupOptimization(list []string, groupPB *pbChat.Se log.Error(msgToMQGroup.OperationID, "sendMsgToGroupOptimization userID nil ", msgToMQGroup.String()) continue } - err := rpc.sendMsgToKafka(&msgToMQGroup, v, status) + err := rpc.sendMsgToWriter(&msgToMQGroup, v, status) if err != nil { log.NewError(msgToMQGroup.OperationID, "kafka send msg err:UserId", v, msgToMQGroup.String()) } else { *sendTag = true } } else { - log.Debug(groupPB.OperationID, "not sendMsgToKafka, ", v) + log.Debug(groupPB.OperationID, "not sendMsgToWriter, ", v) } } wg.Done() diff --git a/pkg/proto/msg/msg.proto b/pkg/proto/msg/msg.proto index c2118c756..597dabc3a 100644 --- a/pkg/proto/msg/msg.proto +++ b/pkg/proto/msg/msg.proto @@ -131,7 +131,27 @@ message DelSuperGroupMsgResp{ int32 errCode = 1; string errMsg = 2; } +message GetSuperGroupMsgReq{ + string operationID = 1; + uint32 Seq = 2; + string groupID = 3; + +} +message GetSuperGroupMsgResp{ + int32 errCode = 1; + string errMsg = 2; + server_api_params.MsgData msgData = 3; +} +message GetWriteDiffMsgReq{ + string operationID = 1; + uint32 Seq = 2; + } +message GetWriteDiffMsgResp{ + int32 errCode = 1; + string errMsg = 2; + server_api_params.MsgData msgData = 3; +} service msg { @@ -144,5 +164,7 @@ service msg { rpc SetMsgMinSeq(SetMsgMinSeqReq) returns(SetMsgMinSeqResp); rpc SetSendMsgStatus(SetSendMsgStatusReq) returns(SetSendMsgStatusResp); rpc GetSendMsgStatus(GetSendMsgStatusReq) returns(GetSendMsgStatusResp); + rpc GetSuperGroupMsg(GetSuperGroupMsgReq) returns(GetSuperGroupMsgResp); + rpc GetWriteDiffMsg(GetWriteDiffMsgReq) returns(GetWriteDiffMsgResp); }