From 366f7621fae6dee8325a16716b2b76aecb2b1bd4 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Thu, 17 Mar 2022 19:00:05 +0800 Subject: [PATCH] rtc add --- cmd/open_im_api/main.go | 1 + go.mod | 1 + internal/api/chat/del_msg.go | 43 +++++++++++++++++++++++++++ internal/rpc/msg/del_msg.go | 11 +++++-- pkg/base_info/msg.go | 6 +++- pkg/common/constant/constant.go | 4 +++ pkg/common/db/mongoModel.go | 51 +++++++++++++++++++++++++++++++++ 7 files changed, 113 insertions(+), 4 deletions(-) create mode 100644 internal/api/chat/del_msg.go diff --git a/cmd/open_im_api/main.go b/cmd/open_im_api/main.go index 033688f7e..7329d187d 100644 --- a/cmd/open_im_api/main.go +++ b/cmd/open_im_api/main.go @@ -85,6 +85,7 @@ func main() { chatGroup.POST("/newest_seq", apiChat.GetSeq) chatGroup.POST("/send_msg", apiChat.SendMsg) chatGroup.POST("/pull_msg_by_seq", apiChat.PullMsgBySeqList) + chatGroup.POST("/del_msg", apiChat.DelMsg) } //Manager managementGroup := r.Group("/manager") diff --git a/go.mod b/go.mod index a1afd34d0..4b9da4af5 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( github.com/gin-gonic/gin v1.7.0 github.com/go-playground/validator/v10 v10.4.1 github.com/go-sql-driver/mysql v1.6.0 // indirect + github.com/gogo/protobuf v1.3.2 // indirect github.com/golang-jwt/jwt/v4 v4.1.0 github.com/golang/protobuf v1.5.2 github.com/golang/snappy v0.0.4 // indirect diff --git a/internal/api/chat/del_msg.go b/internal/api/chat/del_msg.go new file mode 100644 index 000000000..81b705953 --- /dev/null +++ b/internal/api/chat/del_msg.go @@ -0,0 +1,43 @@ +package apiChat + +import ( + api "Open_IM/pkg/base_info" + "Open_IM/pkg/common/config" + "Open_IM/pkg/common/constant" + "Open_IM/pkg/common/log" + "Open_IM/pkg/grpc-etcdv3/getcdv3" + pbChat "Open_IM/pkg/proto/chat" + pbCommon "Open_IM/pkg/proto/sdk_ws" + "Open_IM/pkg/utils" + "context" + "github.com/gin-gonic/gin" + "net/http" + "strings" +) + +func DelMsg(c *gin.Context) { + var ( + req api.DelMsgReq + resp api.DelMsgResp + reqPb pbCommon.DelMsgListReq + ) + if err := c.BindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()}) + return + } + if err := utils.CopyStructFields(&reqPb, &req); err != nil { + log.NewDebug(req.OperationID, utils.GetSelfFuncName(), "CopyStructFields", err.Error()) + } + log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req:", req) + grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName) + msgClient := pbChat.NewChatClient(grpcConn) + respPb, err := msgClient.DelMsgList(context.Background(), &reqPb) + if err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), "DelMsgList failed", err.Error(), reqPb) + c.JSON(http.StatusOK, gin.H{"errCode": constant.ErrServer.ErrCode, "errMsg": constant.ErrServer.ErrMsg + err.Error()}) + return + } + resp.ErrCode = respPb.ErrCode + resp.ErrMsg = respPb.ErrMsg + c.JSON(http.StatusOK, resp) +} diff --git a/internal/rpc/msg/del_msg.go b/internal/rpc/msg/del_msg.go index ce1303eb7..838312d40 100644 --- a/internal/rpc/msg/del_msg.go +++ b/internal/rpc/msg/del_msg.go @@ -1,6 +1,8 @@ package msg import ( + "Open_IM/pkg/common/constant" + "Open_IM/pkg/common/db" "Open_IM/pkg/common/log" commonPb "Open_IM/pkg/proto/sdk_ws" "Open_IM/pkg/utils" @@ -9,10 +11,13 @@ import ( func (rpc *rpcChat) DelMsgList(_ context.Context, req *commonPb.DelMsgListReq) (*commonPb.DelMsgListResp, error) { log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String()) - //if err := db.DelMsg(req.UserID, req.SeqList); err != nil { - // log.NewError(req.OperationID, utils.GetSelfFuncName(), "DelMsg failed", err.Error()) - //} resp := &commonPb.DelMsgListResp{} + if err := db.DB.DelMsgLogic(req.UserID, req.SeqList, req.OperationID); err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), "DelMsg failed", err.Error()) + resp.ErrMsg = constant.ErrDB.ErrMsg + resp.ErrCode = constant.ErrDB.ErrCode + return resp, err + } log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "resp: ", resp.String()) return resp, nil } diff --git a/pkg/base_info/msg.go b/pkg/base_info/msg.go index d7aa11a82..bc80cf304 100644 --- a/pkg/base_info/msg.go +++ b/pkg/base_info/msg.go @@ -1,8 +1,12 @@ package base_info type DelMsgReq struct { - OperationID string `json:"operationID"` + OpUserID string `json:"opUserID,omitempty"` + UserID string `json:"userID,omitempty"` + SeqList []uint32 `json:"seqList,omitempty"` + OperationID string `json:"operationID,omitempty"` } type DelMsgResp struct { + CommResp } diff --git a/pkg/common/constant/constant.go b/pkg/common/constant/constant.go index d89ae677c..ff5506c75 100644 --- a/pkg/common/constant/constant.go +++ b/pkg/common/constant/constant.go @@ -83,6 +83,10 @@ const ( SignalingNotificationEnd = 1699 NotificationEnd = 2000 + //status + MsgNormal = 1 + MsgDeleted = 3 + //MsgFrom UserMsgType = 100 SysMsgType = 200 diff --git a/pkg/common/db/mongoModel.go b/pkg/common/db/mongoModel.go index c63d07f72..e0e8acf68 100644 --- a/pkg/common/db/mongoModel.go +++ b/pkg/common/db/mongoModel.go @@ -2,12 +2,16 @@ package db import ( "Open_IM/pkg/common/config" + "Open_IM/pkg/common/constant" "Open_IM/pkg/common/log" pbMsg "Open_IM/pkg/proto/chat" open_im_sdk "Open_IM/pkg/proto/sdk_ws" "Open_IM/pkg/utils" "context" "errors" + "fmt" + "github.com/gogo/protobuf/sortkeys" + //"github.com/garyburd/redigo/redis" "github.com/golang/protobuf/proto" "gopkg.in/mgo.v2/bson" @@ -69,6 +73,41 @@ func (d *DataBases) GetMinSeqFromMongo2(uid string) (MinSeq uint32, err error) { return 1, nil } +// deleteMsgByLogic +func (d *DataBases) DelMsgLogic(uid string, seqList []uint32, operationID string) error { + sortkeys.Uint32s(seqList) + seqMsgs, err := d.GetMsgBySeqListMongo2(uid, seqList, operationID) + if err != nil { + return err + } + for _, seqMsg := range seqMsgs { + seqMsg.Status = constant.MsgDeleted + if err = d.ReplaceMsgBySeq(uid, seqMsg, operationID); err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), "ReplaceMsgListBySeq error", err.Error()) + } + } + return nil +} + +func (d *DataBases) ReplaceMsgBySeq(uid string, msg *open_im_sdk.MsgData, operationID string) error { + log.NewInfo(operationID, utils.GetSelfFuncName(), uid, msg) + ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) + c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) + uid = getSeqUid(uid, msg.Seq) + seqIndex := getMsgIndex(msg.Seq) + s := fmt.Sprintf("msg.%d.msg", seqIndex) + log.NewDebug(operationID, utils.GetSelfFuncName(), seqIndex, s) + updateResult, err := c.UpdateOne( + ctx, bson.M{"uid": uid}, + bson.M{"$set": bson.M{s: msg}}) + log.NewInfo(operationID, utils.GetSelfFuncName(), updateResult) + if err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), "UpdateOne", err.Error()) + return err + } + return nil +} + func (d *DataBases) GetMsgBySeqList(uid string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, err error) { log.NewInfo(operationID, utils.GetSelfFuncName(), uid, seqList) var hasSeqList []uint32 @@ -392,6 +431,18 @@ func getSeqUid(uid string, seq uint32) string { seqSuffix := seq / singleGocMsgNum return indexGen(uid, seqSuffix) } + +func getMsgIndex(seq uint32) int { + seqSuffix := seq / singleGocMsgNum + var index uint32 + if seqSuffix == 0 { + index = (seq - seqSuffix*5000) - 1 + } else { + index = seq - seqSuffix*singleGocMsgNum + } + return int(index) +} + func isContainInt32(target uint32, List []uint32) bool { for _, element := range List {