diff --git a/cmd/open_im_api/main.go b/cmd/open_im_api/main.go index 294f79d9c..61d3e127e 100644 --- a/cmd/open_im_api/main.go +++ b/cmd/open_im_api/main.go @@ -108,6 +108,7 @@ func main() { thirdGroup.POST("/upload_update_app", apiThird.UploadUpdateApp) thirdGroup.POST("/get_download_url", apiThird.GetDownloadURL) thirdGroup.POST("/get_rtc_invitation_info", apiThird.GetRTCInvitationInfo) + thirdGroup.POST("/get_rtc_invitation_start_app", apiThird.GetRTCInvitationInfoStartApp) } //Message chatGroup := r.Group("/msg") diff --git a/internal/api/third/rtc.go b/internal/api/third/rtc.go index 9014b6c44..30fd5be7b 100644 --- a/internal/api/third/rtc.go +++ b/internal/api/third/rtc.go @@ -31,7 +31,7 @@ func GetRTCInvitationInfo(c *gin.Context) { return } var err error - invitationInfo, err := db.DB.GetSignalInfoFromCache(req.ClientMsgID) + invitationInfo, err := db.DB.GetSignalInfoFromCacheByClientMsgID(req.ClientMsgID) if err != nil { log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetSignalInfoFromCache", err.Error(), req) c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": err.Error()}) @@ -47,3 +47,42 @@ func GetRTCInvitationInfo(c *gin.Context) { resp.Data.Invitation.Timeout = invitationInfo.Invitation.Timeout c.JSON(http.StatusOK, resp) } + +func GetRTCInvitationInfoStartApp(c *gin.Context) { + var ( + req api.GetRTCInvitationInfoStartAppReq + resp api.GetRTCInvitationInfoStartAppResp + ) + if err := c.Bind(&req); err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), "BindJSON failed ", err.Error()) + c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()}) + return + } + log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req) + var ok bool + var errInfo string + ok, userID, errInfo := token_verify.GetUserIDFromToken(c.Request.Header.Get("token"), req.OperationID) + if !ok { + errMsg := req.OperationID + " " + "GetUserIDFromToken failed " + errInfo + " token:" + c.Request.Header.Get("token") + log.NewError(req.OperationID, errMsg) + c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": errMsg}) + return + } + + invitationInfo, err := db.DB.GetAvailableSignalInvitationInfo(userID) + if err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetSignalInfoFromCache", err.Error(), req) + c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": err.Error()}) + return + } + resp.Data.OpUserID = invitationInfo.OpUserID + resp.Data.Invitation.RoomID = invitationInfo.Invitation.RoomID + resp.Data.Invitation.SessionType = invitationInfo.Invitation.SessionType + resp.Data.Invitation.GroupID = invitationInfo.Invitation.GroupID + resp.Data.Invitation.InviterUserID = invitationInfo.Invitation.InviterUserID + resp.Data.Invitation.InviteeUserIDList = invitationInfo.Invitation.InviteeUserIDList + resp.Data.Invitation.MediaType = invitationInfo.Invitation.MediaType + resp.Data.Invitation.Timeout = invitationInfo.Invitation.Timeout + c.JSON(http.StatusOK, resp) + +} diff --git a/internal/msg_gateway/gate/logic.go b/internal/msg_gateway/gate/logic.go index ccb1cd6eb..47b58cde2 100644 --- a/internal/msg_gateway/gate/logic.go +++ b/internal/msg_gateway/gate/logic.go @@ -239,7 +239,7 @@ func (ws *WServer) sendSignalMsgReq(conn *UserConn, m *Req) { } else { log.NewInfo(pbData.OperationID, "rpc call success to sendMsgReq", reply.String()) // save invitation info for offline push - if err := db.DB.CacheSignalInfo(pbData.MsgData); err != nil { + if err := db.DB.NewCacheSignalInfo(pbData.MsgData); err != nil { log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), m, &signalResp) ws.sendSignalMsgResp(conn, 200, err.Error(), m, &signalResp) } else { diff --git a/internal/push/logic/push_to_client.go b/internal/push/logic/push_to_client.go index 5b5f66bae..3ba1e58fb 100644 --- a/internal/push/logic/push_to_client.go +++ b/internal/push/logic/push_to_client.go @@ -119,7 +119,6 @@ func MsgToUser(pushMsg *pbPush.PushMsgReq) { log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "offlinePush stop") break } - if offlinePusher == nil { break } diff --git a/pkg/base_info/third_api_struct.go b/pkg/base_info/third_api_struct.go index 39dff1bb1..758c3494c 100644 --- a/pkg/base_info/third_api_struct.go +++ b/pkg/base_info/third_api_struct.go @@ -60,7 +60,7 @@ type GetDownloadURLResp struct { type GetRTCInvitationInfoReq struct { OperationID string `json:"operationID" binding:"required"` - ClientMsgID string `json:"clientMsgID"` + ClientMsgID string `json:"clientMsgID" binding:"required"` } type GetRTCInvitationInfoResp struct { @@ -79,3 +79,11 @@ type GetRTCInvitationInfoResp struct { OfflinePushInfo struct{} `json:"offlinePushInfo"` } `json:"data"` } + +type GetRTCInvitationInfoStartAppReq struct { + OperationID string `json:"operationID" binding:"required"` +} + +type GetRTCInvitationInfoStartAppResp struct { + GetRTCInvitationInfoResp +} diff --git a/pkg/common/db/model.go b/pkg/common/db/model.go index f8aaca54a..e7282e596 100644 --- a/pkg/common/db/model.go +++ b/pkg/common/db/model.go @@ -8,6 +8,7 @@ import ( //"Open_IM/pkg/common/log" "Open_IM/pkg/utils" "fmt" + go_redis "github.com/go-redis/redis/v8" "go.mongodb.org/mongo-driver/mongo/options" // "context" @@ -30,6 +31,7 @@ type DataBases struct { mgoSession *mgo.Session redisPool *redis.Pool mongoClient *mongo.Client + rdb *go_redis.Client } func key(dbAddress, dbName string) string { @@ -113,6 +115,12 @@ func init() { ) }, } + DB.rdb = go_redis.NewClient(&go_redis.Options{ + Addr: config.Config.Redis.DBAddress, + Password: config.Config.Redis.DBPassWord, // no password set + DB: 0, // use default DB + PoolSize: 100, // 连接池大小 + }) } func createMongoIndex(client *mongo.Client, collection string, isUnique bool, keys ...string) error { diff --git a/pkg/common/db/newRedisModel.go b/pkg/common/db/newRedisModel.go new file mode 100644 index 000000000..b477c20ea --- /dev/null +++ b/pkg/common/db/newRedisModel.go @@ -0,0 +1,157 @@ +package db + +import ( + "Open_IM/pkg/common/config" + log2 "Open_IM/pkg/common/log" + pbChat "Open_IM/pkg/proto/chat" + pbRtc "Open_IM/pkg/proto/rtc" + pbCommon "Open_IM/pkg/proto/sdk_ws" + "Open_IM/pkg/utils" + "context" + "errors" + "fmt" + "github.com/garyburd/redigo/redis" + goRedis "github.com/go-redis/redis/v8" + "github.com/golang/protobuf/proto" + "github.com/mitchellh/mapstructure" + "strconv" + "time" +) + +//func (d * DataBases)pubMessage(channel, msg string) { +// d.rdb.Publish(context.Background(),channel,msg) +//} +//func (d * DataBases)pubMessage(channel, msg string) { +// d.rdb.Publish(context.Background(),channel,msg) +//} + +func (d *DataBases) NewGetMessageListBySeq(userID string, seqList []uint32, operationID string) (seqMsg []*pbCommon.MsgData, failedSeqList []uint32, errResult error) { + for _, v := range seqList { + //MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1 + key := messageCache + userID + "_" + strconv.Itoa(int(v)) + + result, err := d.rdb.HGetAll(context.Background(), key).Result() + if err != nil { + errResult = err + failedSeqList = append(failedSeqList, v) + log2.NewWarn(operationID, "redis get message error:", err.Error(), v) + } else { + msg, err := Map2Pb(result) + //msg := pbCommon.MsgData{} + //err = jsonpb.UnmarshalString(result, &msg) + if err != nil { + errResult = err + failedSeqList = append(failedSeqList, v) + log2.NewWarn(operationID, "Unmarshal err", result, err.Error()) + } else { + log2.NewDebug(operationID, "redis get msg is ", msg.String()) + seqMsg = append(seqMsg, msg) + } + + } + } + return seqMsg, failedSeqList, errResult +} +func Map2Pb(m map[string]string) (*pbCommon.MsgData, error) { + var data pbCommon.MsgData + err := mapstructure.Decode(m, &data) + if err != nil { + return nil, err + } + return &data, nil +} + +func (d *DataBases) NewSetMessageToCache(msgList []*pbChat.MsgDataToMQ, uid string, operationID string) error { + ctx := context.Background() + var failedList []pbChat.MsgDataToMQ + for _, msg := range msgList { + key := messageCache + uid + "_" + strconv.Itoa(int(msg.MsgData.Seq)) + s, err := utils.Pb2Map(msg.MsgData) + if err != nil { + log2.NewWarn(operationID, utils.GetSelfFuncName(), "Pb2Map failed", msg.MsgData.String(), uid, err.Error()) + continue + } + log2.NewDebug(operationID, "convert map is ", s) + fmt.Println("ts", s) + err = d.rdb.HMSet(context.Background(), key, s).Err() + //err = d.rdb.HMSet(context.Background(), "12", map[string]interface{}{"1": 2, "343": false}).Err() + if err != nil { + return err + log2.NewWarn(operationID, utils.GetSelfFuncName(), "redis failed", "args:", key, *msg, uid, s, err.Error()) + failedList = append(failedList, *msg) + } + d.rdb.Expire(ctx, key, time.Second*time.Duration(config.Config.MsgCacheTimeout)) + } + if len(failedList) != 0 { + return errors.New(fmt.Sprintf("set msg to cache failed, failed lists: %q,%s", failedList, operationID)) + } + return nil +} + +func (d *DataBases) CleanUpOneUserAllMsgFromRedis(userID string, operationID string) error { + ctx := context.Background() + key := messageCache + userID + "_" + "*" + vals, err := d.rdb.Keys(ctx, key).Result() + log2.Debug(operationID, "vals: ", vals) + if err == redis.ErrNil { + return nil + } + if err != nil { + return utils.Wrap(err, "") + } + if err = d.rdb.Del(ctx, vals...).Err(); err != nil { + return utils.Wrap(err, "") + } + return nil +} + +func (d *DataBases) NewCacheSignalInfo(msg *pbCommon.MsgData) error { + keyList := SignalListCache + msg.RecvID + timeout, err := strconv.Atoi(config.Config.Rtc.SignalTimeout) + if err != nil { + return err + } + err = d.rdb.LPush(context.Background(), keyList, msg.ClientMsgID).Err() + if err != nil { + return err + } + err = d.rdb.Expire(context.Background(), keyList, time.Duration(timeout)*time.Second).Err() + if err != nil { + return err + } + key := SignalCache + msg.ClientMsgID + err = d.rdb.Set(context.Background(), key, msg.Content, time.Duration(timeout)*time.Second).Err() + if err != nil { + return err + } + err = d.rdb.Expire(context.Background(), key, time.Duration(timeout)*time.Second).Err() + if err != nil { + return err + } + return err +} + +func (d *DataBases) GetSignalInfoFromCacheByClientMsgID(clientMsgID string) (invitationInfo *pbRtc.SignalInviteReq, err error) { + key := SignalCache + clientMsgID + bytes, err := d.rdb.Get(context.Background(), key).Bytes() + if err != nil { + return nil, err + } + req := &pbRtc.SignalReq{} + if err = proto.Unmarshal(bytes, req); err != nil { + return nil, err + } + req2 := req.Payload.(*pbRtc.SignalReq_Invite) + invitationInfo = req2.Invite + return invitationInfo, err +} + +func (d *DataBases) GetAvailableSignalInvitationInfo(userID string) (invitationInfo *pbRtc.SignalInviteReq, err error) { + keyList := SignalListCache + userID + result := d.rdb.RPop(context.Background(), keyList) + if err = result.Err(); err != nil { + return nil, err + } + invitationInfo, err = d.GetSignalInfoFromCacheByClientMsgID(result.String()) + return invitationInfo, err +} diff --git a/pkg/common/db/redisModel.go b/pkg/common/db/redisModel.go index b1c72e12a..4e360802f 100644 --- a/pkg/common/db/redisModel.go +++ b/pkg/common/db/redisModel.go @@ -32,7 +32,8 @@ const ( blackListCache = "BLACK_LIST_CACHE:" groupCache = "GROUP_CACHE:" messageCache = "MESSAGE_CACHE:" - SignalCache = "Signal_CACHE:" + SignalCache = "SIGNAL_CACHE:" + SignalListCache = "SIGNAL_ZSET_CACHE:" ) func (d *DataBases) Exec(cmd string, key interface{}, args ...interface{}) (interface{}, error) {