Merge branch 'superGroup' of github.com:OpenIMSDK/Open-IM-Server into superGroup

pull/236/head
wangchuxiao 3 years ago
commit f9a26e9195

@ -32,7 +32,6 @@ require (
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646
github.com/olivere/elastic/v7 v7.0.23
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.11.1
github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.7.0
@ -51,3 +50,5 @@ require (
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
)
replace github.com/Shopify/sarama => github.com/Shopify/sarama v1.29.0

@ -19,19 +19,12 @@ import (
)
func (ws *WServer) msgParse(conn *UserConn, binaryMsg []byte) {
//ws online debug data
//{"ReqIdentifier":1001,"Token":"123","SendID":"c4ca4238a0b923820dcc509a6f75849b","Time":"123","OperationID":"123","MsgIncr":0}
//{"ReqIdentifier":1002,"Token":"123","SendID":"c4ca4238a0b923820dcc509a6f75849b","Time":"123","OperationID":"123","MsgIncr":0,"SeqBegin":1,"SeqEnd":6}
//{"ReqIdentifier":1003,"Token":"123","SendID":"c4ca4238a0b923820dcc509a6f75849b",
//"RecvID":"a87ff679a2f3e71d9181a67b7542122c","ClientMsgID":"2343","Time":"147878787","OperationID":
//"123","MsgIncr":0,"SubMsgType":101,"MsgType":100,"MsgFrom":1,"Content":"sdfsdf"}
b := bytes.NewBuffer(binaryMsg)
m := Req{}
dec := gob.NewDecoder(b)
err := dec.Decode(&m)
if err != nil {
log.NewError("", "ws Decode err", err.Error())
ws.sendErrMsg(conn, 200, err.Error(), constant.WSDataError, "", "")
err = conn.Close()
if err != nil {
log.NewError("", "ws close err", err.Error())
@ -43,29 +36,32 @@ func (ws *WServer) msgParse(conn *UserConn, binaryMsg []byte) {
ws.sendErrMsg(conn, 201, err.Error(), m.ReqIdentifier, m.MsgIncr, m.OperationID)
return
}
//if !utils.VerifyToken(m.Token, m.SendID) {
// ws.sendErrMsg(conn, 202, "token validate err", m.ReqIdentifier, m.MsgIncr,m.OperationID)
// return
//}
log.NewInfo(m.OperationID, "Basic Info Authentication Success", m)
log.NewInfo(m.OperationID, "Basic Info Authentication Success", m.SendID, m.MsgIncr, m.ReqIdentifier)
switch m.ReqIdentifier {
case constant.WSGetNewestSeq:
log.NewInfo(m.OperationID, "getSeqReq ", m.SendID, m.MsgIncr, m.ReqIdentifier)
ws.getSeqReq(conn, &m)
case constant.WSSendMsg:
log.NewInfo(m.OperationID, "sendMsgReq ", m.SendID, m.MsgIncr, m.ReqIdentifier)
ws.sendMsgReq(conn, &m)
case constant.WSSendSignalMsg:
log.NewInfo(m.OperationID, "sendSignalMsgReq ", m.SendID, m.MsgIncr, m.ReqIdentifier)
ws.sendSignalMsgReq(conn, &m)
case constant.WSPullMsgBySeqList:
log.NewInfo(m.OperationID, "pullMsgBySeqListReq ", m.SendID, m.MsgIncr, m.ReqIdentifier)
ws.pullMsgBySeqListReq(conn, &m)
default:
log.Error(m.OperationID, "ReqIdentifier failed ", m.SendID, m.MsgIncr, m.ReqIdentifier)
}
log.NewInfo(m.OperationID, "goroutine num is ", runtime.NumGoroutine())
}
func (ws *WServer) getSeqReq(conn *UserConn, m *Req) {
log.NewInfo(m.OperationID, "Ws call success to getNewSeq", m.MsgIncr, m.SendID, m.ReqIdentifier, m.Data)
log.NewInfo(m.OperationID, "Ws call success to getNewSeq", m.MsgIncr, m.SendID, m.ReqIdentifier)
nReply := new(sdk_ws.GetMaxAndMinSeqResp)
isPass, errCode, errMsg, data := ws.argsValidate(m, constant.WSGetNewestSeq)
isPass, errCode, errMsg, data := ws.argsValidate(m, constant.WSGetNewestSeq, m.OperationID)
log.Info(m.OperationID, "argsValidate ", isPass, errCode, errMsg)
if isPass {
rpcReq := sdk_ws.GetMaxAndMinSeqReq{}
rpcReq.GroupIDList = data.(sdk_ws.GetMaxAndMinSeqReq).GroupIDList
@ -76,9 +72,9 @@ func (ws *WServer) getSeqReq(conn *UserConn, m *Req) {
msgClient := pbChat.NewChatClient(grpcConn)
rpcReply, err := msgClient.GetMaxAndMinSeq(context.Background(), &rpcReq)
if err != nil {
log.Error(rpcReq.OperationID, "rpc call failed to getSeqReq", err.Error(), rpcReq.String())
nReply.ErrCode = 500
nReply.ErrMsg = err.Error()
log.Error(rpcReq.OperationID, "rpc call failed to GetMaxAndMinSeq ", nReply.String())
ws.getSeqResp(conn, m, nReply)
} else {
log.NewInfo(rpcReq.OperationID, "rpc call success to getSeqReq", rpcReply.String())
@ -87,13 +83,13 @@ func (ws *WServer) getSeqReq(conn *UserConn, m *Req) {
} else {
nReply.ErrCode = errCode
nReply.ErrMsg = errMsg
log.Error(m.OperationID, "argsValidate failed send resp: ", nReply.String())
ws.getSeqResp(conn, m, nReply)
}
}
func (ws *WServer) getSeqResp(conn *UserConn, m *Req, pb *sdk_ws.GetMaxAndMinSeqResp) {
log.Debug(m.OperationID, "getSeqResp come here ", pb.String())
b, _ := proto.Marshal(pb)
mReply := Resp{
ReqIdentifier: m.ReqIdentifier,
@ -103,13 +99,15 @@ func (ws *WServer) getSeqResp(conn *UserConn, m *Req, pb *sdk_ws.GetMaxAndMinSeq
OperationID: m.OperationID,
Data: b,
}
log.Debug(m.OperationID, "getSeqResp come here req: ", pb.String(), "send resp: ",
mReply.ReqIdentifier, mReply.MsgIncr, mReply.ErrCode, mReply.ErrMsg)
ws.sendMsg(conn, mReply)
}
func (ws *WServer) pullMsgBySeqListReq(conn *UserConn, m *Req) {
log.NewInfo(m.OperationID, "Ws call success to pullMsgBySeqListReq start", m.SendID, m.ReqIdentifier, m.MsgIncr, string(m.Data))
nReply := new(sdk_ws.PullMessageBySeqListResp)
isPass, errCode, errMsg, data := ws.argsValidate(m, constant.WSPullMsgBySeqList)
isPass, errCode, errMsg, data := ws.argsValidate(m, constant.WSPullMsgBySeqList, m.OperationID)
if isPass {
rpcReq := sdk_ws.PullMessageBySeqListReq{}
rpcReq.SeqList = data.(sdk_ws.PullMessageBySeqListReq).SeqList
@ -159,7 +157,7 @@ func (ws *WServer) sendMsgReq(conn *UserConn, m *Req) {
log.NewInfo(m.OperationID, "Ws call success to sendMsgReq start", m.MsgIncr, m.ReqIdentifier, m.SendID, m.Data)
nReply := new(pbChat.SendMsgResp)
isPass, errCode, errMsg, pData := ws.argsValidate(m, constant.WSSendMsg)
isPass, errCode, errMsg, pData := ws.argsValidate(m, constant.WSSendMsg, m.OperationID)
if isPass {
data := pData.(sdk_ws.MsgData)
pbData := pbChat.SendMsgReq{
@ -189,8 +187,6 @@ func (ws *WServer) sendMsgReq(conn *UserConn, m *Req) {
}
func (ws *WServer) sendMsgResp(conn *UserConn, m *Req, pb *pbChat.SendMsgResp) {
// := make(map[string]interface{})
var mReplyData sdk_ws.UserSendMsgResp
mReplyData.ClientMsgID = pb.GetClientMsgID()
mReplyData.ServerMsgID = pb.GetServerMsgID()
@ -210,7 +206,7 @@ func (ws *WServer) sendMsgResp(conn *UserConn, m *Req, pb *pbChat.SendMsgResp) {
func (ws *WServer) sendSignalMsgReq(conn *UserConn, m *Req) {
log.NewInfo(m.OperationID, "Ws call success to sendSignalMsgReq start", m.MsgIncr, m.ReqIdentifier, m.SendID, string(m.Data))
nReply := new(pbChat.SendMsgResp)
isPass, errCode, errMsg, pData := ws.argsValidate(m, constant.WSSendSignalMsg)
isPass, errCode, errMsg, pData := ws.argsValidate(m, constant.WSSendSignalMsg, m.OperationID)
if isPass {
signalResp := pbRtc.SignalResp{}
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImRealTimeCommName)
@ -260,7 +256,7 @@ func (ws *WServer) sendSignalMsgReq(conn *UserConn, m *Req) {
}
func (ws *WServer) sendSignalMsgResp(conn *UserConn, errCode int32, errMsg string, m *Req, pb *pbRtc.SignalResp) {
// := make(map[string]interface{})
log.Debug(m.OperationID, "SignalMsgResp is", pb.String())
log.Debug(m.OperationID, "sendSignalMsgResp is", pb.String())
b, _ := proto.Marshal(pb)
mReply := Resp{
ReqIdentifier: m.ReqIdentifier,
@ -277,14 +273,14 @@ func (ws *WServer) sendMsg(conn *UserConn, mReply interface{}) {
enc := gob.NewEncoder(&b)
err := enc.Encode(mReply)
if err != nil {
uid, platform := ws.getUserUid(conn)
log.NewError(mReply.(Resp).OperationID, mReply.(Resp).ReqIdentifier, mReply.(Resp).ErrCode, mReply.(Resp).ErrMsg, "Encode Msg error", conn.RemoteAddr().String(), uid, platform, err.Error())
// uid, platform := ws.getUserUid(conn)
log.NewError(mReply.(Resp).OperationID, mReply.(Resp).ReqIdentifier, mReply.(Resp).ErrCode, mReply.(Resp).ErrMsg, "Encode Msg error", conn.RemoteAddr().String(), err.Error())
return
}
err = ws.writeMsg(conn, websocket.BinaryMessage, b.Bytes())
if err != nil {
uid, platform := ws.getUserUid(conn)
log.NewError(mReply.(Resp).OperationID, mReply.(Resp).ReqIdentifier, mReply.(Resp).ErrCode, mReply.(Resp).ErrMsg, "ws writeMsg error", conn.RemoteAddr().String(), uid, platform, err.Error())
// uid, platform := ws.getUserUid(conn)
log.NewError(mReply.(Resp).OperationID, mReply.(Resp).ReqIdentifier, mReply.(Resp).ErrCode, mReply.(Resp).ErrMsg, "ws writeMsg error", conn.RemoteAddr().String(), err.Error())
} else {
log.Debug(mReply.(Resp).OperationID, mReply.(Resp).ReqIdentifier, mReply.(Resp).ErrCode, mReply.(Resp).ErrMsg, "ws write response success")
}

@ -57,16 +57,16 @@ type SeqListData struct {
SeqList []int64 `mapstructure:"seqList" validate:"required"`
}
func (ws *WServer) argsValidate(m *Req, r int32) (isPass bool, errCode int32, errMsg string, returnData interface{}) {
func (ws *WServer) argsValidate(m *Req, r int32, operationID string) (isPass bool, errCode int32, errMsg string, returnData interface{}) {
switch r {
case constant.WSGetNewestSeq:
data := open_im_sdk.GetMaxAndMinSeqReq{}
if err := proto.Unmarshal(m.Data, &data); err != nil {
log.Error("", "Decode Data struct err", err.Error(), r)
log.Error(operationID, "Decode Data struct err", err.Error(), r)
return false, 203, err.Error(), nil
}
if err := validate.Struct(data); err != nil {
log.Error("", "data args validate err", err.Error(), r)
log.Error(operationID, "data args validate err", err.Error(), r)
return false, 204, err.Error(), nil
}
@ -74,11 +74,11 @@ func (ws *WServer) argsValidate(m *Req, r int32) (isPass bool, errCode int32, er
case constant.WSSendMsg:
data := open_im_sdk.MsgData{}
if err := proto.Unmarshal(m.Data, &data); err != nil {
log.Error("", "Decode Data struct err", err.Error(), r)
log.Error(operationID, "Decode Data struct err", err.Error(), r)
return false, 203, err.Error(), nil
}
if err := validate.Struct(data); err != nil {
log.Error("", "data args validate err", err.Error(), r)
log.Error(operationID, "data args validate err", err.Error(), r)
return false, 204, err.Error(), nil
}
@ -86,11 +86,11 @@ func (ws *WServer) argsValidate(m *Req, r int32) (isPass bool, errCode int32, er
case constant.WSSendSignalMsg:
data := pbRtc.SignalReq{}
if err := proto.Unmarshal(m.Data, &data); err != nil {
log.Error("", "Decode Data struct err", err.Error(), r)
log.Error(operationID, "Decode Data struct err", err.Error(), r)
return false, 203, err.Error(), nil
}
if err := validate.Struct(data); err != nil {
log.Error("", "data args validate err", err.Error(), r)
log.Error(operationID, "data args validate err", err.Error(), r)
return false, 204, err.Error(), nil
}
@ -98,31 +98,16 @@ func (ws *WServer) argsValidate(m *Req, r int32) (isPass bool, errCode int32, er
case constant.WSPullMsgBySeqList:
data := open_im_sdk.PullMessageBySeqListReq{}
if err := proto.Unmarshal(m.Data, &data); err != nil {
log.Error("", "Decode Data struct err", err.Error(), r)
log.Error(operationID, "Decode Data struct err", err.Error(), r)
return false, 203, err.Error(), nil
}
if err := validate.Struct(data); err != nil {
log.Error("", "data args validate err", err.Error(), r)
log.Error(operationID, "data args validate err", err.Error(), r)
return false, 204, err.Error(), nil
}
return true, 0, "", data
default:
}
return false, 204, "args err", nil
//b := bytes.NewBuffer(m.Data)
//dec := gob.NewDecoder(b)
//err := dec.Decode(&data)
//if err != nil {
// log.ErrorByKv("Decode Data struct err", "", "err", err.Error(), "reqIdentifier", r)
// return false, 203, err.Error(), nil
//}
//if err := mapstructure.WeakDecode(m.Data, &data); err != nil {
// log.ErrorByKv("map to Data struct err", "", "err", err.Error(), "reqIdentifier", r)
// return false, 203, err.Error(), nil
//} else
}

@ -51,21 +51,27 @@ func (ws *WServer) run() {
}
func (ws *WServer) wsHandler(w http.ResponseWriter, r *http.Request) {
if ws.headerCheck(w, r) {
query := r.URL.Query()
query := r.URL.Query()
operationID := ""
if len(query["operationID"]) != 0 {
operationID = query["operationID"][0]
} else {
operationID = utils.OperationIDGenerator()
}
log.Debug(operationID, utils.GetSelfFuncName(), " args: ", query)
if ws.headerCheck(w, r, operationID) {
conn, err := ws.wsUpGrader.Upgrade(w, r, nil) //Conn is obtained through the upgraded escalator
if err != nil {
log.Error("", "upgrade http conn err", err, query)
log.Error(operationID, "upgrade http conn err", err.Error(), query)
return
} else {
//Connection mapping relationship,
//userID+" "+platformID->conn
//Initialize a lock for each user
newConn := &UserConn{conn, new(sync.Mutex), 0}
userCount++
ws.addUserConn(query["sendID"][0], utils.StringToInt(query["platformID"][0]), newConn, query["token"][0])
ws.addUserConn(query["sendID"][0], utils.StringToInt(query["platformID"][0]), newConn, query["token"][0], operationID)
go ws.readMsg(newConn)
}
} else {
log.Error(operationID, "headerCheck failed ")
}
}
@ -76,18 +82,13 @@ func (ws *WServer) readMsg(conn *UserConn) {
log.NewInfo("", "this is a pingMessage")
}
if err != nil {
uid, platform := ws.getUserUid(conn)
log.Error("", "WS ReadMsg error", "userIP", conn.RemoteAddr().String(), "userUid", uid, "platform", platform, "error", err.Error())
log.Error("", "WS ReadMsg error ", " userIP", conn.RemoteAddr().String(), "userUid", "platform", "error", err.Error())
userCount--
ws.delUserConn(conn)
return
} else {
//log.ErrorByKv("test", "", "msgType", msgType, "userIP", conn.RemoteAddr().String(), "userUid", ws.getUserUid(conn))
}
ws.msgParse(conn, msg)
//ws.writeMsg(conn, 1, chat)
}
}
func (ws *WServer) SetWriteTimeout(conn *UserConn, timeout int) {
@ -115,25 +116,27 @@ func (ws *WServer) MultiTerminalLoginChecker(uid string, platformID int, newConn
if oldConnMap, ok := ws.wsUserToConn[uid]; ok { // user->map[platform->conn]
if oldConn, ok := oldConnMap[platformID]; ok {
log.NewDebug(operationID, uid, platformID, "kick old conn")
ws.sendKickMsg(oldConn, newConn)
// ws.sendKickMsg(oldConn, newConn)
m, err := db.DB.GetTokenMapByUidPid(uid, constant.PlatformIDToName(platformID))
if err != nil && err != go_redis.Nil {
log.NewError(operationID, "get token from redis err", err.Error(), uid)
log.NewError(operationID, "get token from redis err", err.Error(), uid, constant.PlatformIDToName(platformID))
return
}
if m == nil {
log.NewError(operationID, "get token from redis err", "m is nil")
log.NewError(operationID, "get token from redis err", "m is nil", uid, constant.PlatformIDToName(platformID))
return
}
log.NewDebug(operationID, "get token map is ", m, uid, constant.PlatformIDToName(platformID))
for k, _ := range m {
if k != token {
m[k] = constant.KickedToken
}
}
log.NewDebug(operationID, "get map is ", m)
log.NewDebug(operationID, "set token map is ", m, uid, constant.PlatformIDToName(platformID))
err = db.DB.SetTokenMapByUidPid(uid, platformID, m)
if err != nil {
log.NewError(operationID, "SetTokenMapByUidPid err", err.Error())
log.NewError(operationID, "SetTokenMapByUidPid err", err.Error(), uid, platformID, m)
return
}
err = oldConn.Close()
@ -146,7 +149,6 @@ func (ws *WServer) MultiTerminalLoginChecker(uid string, platformID int, newConn
if err != nil {
log.NewError(operationID, "conn close err", err.Error(), uid, platformID)
}
} else {
log.NewWarn(operationID, "abnormal uid-conn ", uid, platformID, oldConnMap[platformID])
}
@ -177,10 +179,11 @@ func (ws *WServer) sendKickMsg(oldConn, newConn *UserConn) {
log.NewError(mReply.OperationID, mReply.ReqIdentifier, mReply.ErrCode, mReply.ErrMsg, "sendKickMsg WS WriteMsg error", oldConn.RemoteAddr().String(), newConn.RemoteAddr().String(), err.Error())
}
}
func (ws *WServer) addUserConn(uid string, platformID int, conn *UserConn, token string) {
func (ws *WServer) addUserConn(uid string, platformID int, conn *UserConn, token string, operationID string) {
rwLock.Lock()
defer rwLock.Unlock()
operationID := utils.OperationIDGenerator()
log.Info(operationID, utils.GetSelfFuncName(), " args: ", uid, platformID, conn, token)
callbackResp := callbackUserOnline(operationID, uid, platformID, token)
if callbackResp.ErrCode != 0 {
log.NewError(operationID, utils.GetSelfFuncName(), "callbackUserOnline resp:", callbackResp)
@ -268,43 +271,39 @@ func (ws *WServer) getUserAllCons(uid string) map[int]*UserConn {
return nil
}
func (ws *WServer) getUserUid(conn *UserConn) (uid string, platform int) {
rwLock.RLock()
defer rwLock.RUnlock()
if stringMap, ok := ws.wsConnToUser[conn]; ok {
for k, v := range stringMap {
platform = k
uid = v
}
return uid, platform
}
return "", 0
}
func (ws *WServer) headerCheck(w http.ResponseWriter, r *http.Request) bool {
//func (ws *WServer) getUserUid(conn *UserConn) (uid string, platform int) {
// rwLock.RLock()
// defer rwLock.RUnlock()
//
// if stringMap, ok := ws.wsConnToUser[conn]; ok {
// for k, v := range stringMap {
// platform = k
// uid = v
// }
// return uid, platform
// }
// return "", 0
//}
func (ws *WServer) headerCheck(w http.ResponseWriter, r *http.Request, operationID string) bool {
status := http.StatusUnauthorized
query := r.URL.Query()
operationID := ""
if len(query["operationID"]) != 0 {
operationID = query["operationID"][0]
}
if len(query["token"]) != 0 && len(query["sendID"]) != 0 && len(query["platformID"]) != 0 {
if ok, err, msg := token_verify.WsVerifyToken(query["token"][0], query["sendID"][0], query["platformID"][0], operationID); !ok {
// e := err.(*constant.ErrInfo)
log.Error(operationID, "Token verify failed ", "query ", query, msg, err.Error())
w.Header().Set("Sec-Websocket-Version", "13")
w.Header().Set("ws_err_msg", err.Error())
http.Error(w, err.Error(), status)
return false
} else {
log.Info(operationID, "Connection Authentication Success", "", "token", query["token"][0], "userID", query["sendID"][0])
log.Info(operationID, "Connection Authentication Success", "", "token ", query["token"][0], "userID ", query["sendID"][0], "platformID ", query["platformID"][0])
return true
}
} else {
log.Error(operationID, "Args err", "query", query)
log.Error(operationID, "Args err ", "query ", query)
w.Header().Set("Sec-Websocket-Version", "13")
w.Header().Set("ws_err_msg", "args err, need token, sendID, platformID")
http.Error(w, http.StatusText(status), status)
errMsg := "args err, need token, sendID, platformID"
w.Header().Set("ws_err_msg", errMsg)
http.Error(w, errMsg, status)
return false
}
}

@ -160,29 +160,29 @@ func (rpc *rpcChat) encapsulateMsgData(msg *sdk_ws.MsgData) {
utils.SetSwitchFromOptions(msg.Options, constant.IsSenderConversationUpdate, false)
utils.SetSwitchFromOptions(msg.Options, constant.IsUnreadCount, false)
utils.SetSwitchFromOptions(msg.Options, constant.IsOfflinePush, false)
}
}
func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, error) {
replay := pbChat.SendMsgResp{}
newTime := db.GetCurrentTimestampByMill()
log.NewWarn(pb.OperationID, "rpc sendMsg come here", pb.String(), pb.MsgData.ClientMsgID)
log.Info(pb.OperationID, "rpc sendMsg come here ", pb.String())
flag, errCode, errMsg := isMessageHasReadEnabled(pb)
log.Info(pb.OperationID, "isMessageHasReadEnabled ", flag)
if !flag {
return returnMsg(&replay, pb, errCode, errMsg, "", 0)
}
flag, errCode, errMsg = userRelationshipVerification(pb)
log.Info(pb.OperationID, "userRelationshipVerification ", flag)
if !flag {
return returnMsg(&replay, pb, errCode, errMsg, "", 0)
}
rpc.encapsulateMsgData(pb.MsgData)
log.Info("", "this is a test MsgData ", pb.MsgData)
msgToMQSingle := pbChat.MsgDataToMQ{Token: pb.Token, OperationID: pb.OperationID, MsgData: pb.MsgData}
// callback
callbackResp := callbackWordFilter(pb)
log.Info(pb.OperationID, "callbackWordFilter ", callbackResp)
if callbackResp.ErrCode != 0 {
log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackWordFilter resp: ", callbackResp)
log.Error(pb.OperationID, utils.GetSelfFuncName(), "callbackWordFilter resp: ", callbackResp)
}
log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackResp: ", callbackResp)
if callbackResp.ActionCode != constant.ActionAllow {
@ -212,7 +212,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
log.NewInfo(msgToMQSingle.OperationID, msgToMQSingle)
err1 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.RecvID, constant.OnlineStatus)
if err1 != nil {
log.NewError(msgToMQSingle.OperationID, "kafka send msg err:RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String())
log.NewError(msgToMQSingle.OperationID, "kafka send msg err :RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String(), err1.Error())
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
}
}

@ -155,6 +155,7 @@ func (d *DataBases) GetMessageListBySeq(userID string, seqList []uint32, operati
}
func (d *DataBases) SetMessageToCache(msgList []*pbChat.MsgDataToMQ, uid string, operationID string) error {
ctx := context.Background()
pipe := d.rdb.Pipeline()
var failedList []pbChat.MsgDataToMQ
for _, msg := range msgList {
key := messageCache + uid + "_" + strconv.Itoa(int(msg.MsgData.Seq))
@ -164,7 +165,7 @@ func (d *DataBases) SetMessageToCache(msgList []*pbChat.MsgDataToMQ, uid string,
continue
}
log2.NewDebug(operationID, "convert string is ", s)
err = d.rdb.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err()
err = pipe.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err()
//err = d.rdb.HMSet(context.Background(), "12", map[string]interface{}{"1": 2, "343": false}).Err()
if err != nil {
log2.NewWarn(operationID, utils.GetSelfFuncName(), "redis failed", "args:", key, *msg, uid, s, err.Error())
@ -174,7 +175,8 @@ func (d *DataBases) SetMessageToCache(msgList []*pbChat.MsgDataToMQ, uid string,
if len(failedList) != 0 {
return errors.New(fmt.Sprintf("set msg to cache failed, failed lists: %q,%s", failedList, operationID))
}
return nil
_, err := pipe.Exec(ctx)
return err
}
func (d *DataBases) CleanUpOneUserAllMsgFromRedis(userID string, operationID string) error {

@ -2,6 +2,7 @@ package kafka
import (
log2 "Open_IM/pkg/common/log"
"Open_IM/pkg/utils"
"errors"
"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
@ -16,8 +17,9 @@ type Producer struct {
func NewKafkaProducer(addr []string, topic string) *Producer {
p := Producer{}
p.config = sarama.NewConfig() //Instantiate a sarama Config
p.config.Producer.Return.Successes = true //Whether to enable the successes channel to be notified after the message is sent successfully
p.config = sarama.NewConfig() //Instantiate a sarama Config
p.config.Producer.Return.Successes = true //Whether to enable the successes channel to be notified after the message is sent successfully
p.config.Producer.Return.Errors = true
p.config.Producer.RequiredAcks = sarama.WaitForAll //Set producer Message Reply level 0 1 all
p.config.Producer.Partitioner = sarama.NewHashPartitioner //Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message. If there is no key, the partition will be selected randomly
@ -44,11 +46,16 @@ func (p *Producer) SendMessage(m proto.Message, key string, operationID string)
return -1, -1, err
}
if len(bMsg) == 0 {
return 0, 0, errors.New("msg content is nil")
log2.Error(operationID, "len(bMsg) == 0 ")
return 0, 0, errors.New("len(bMsg) == 0 ")
}
kMsg.Value = sarama.ByteEncoder(bMsg)
log2.Info(operationID, "ByteEncoder SendMessage begin", "key ", kMsg, p.producer)
log2.Info(operationID, "ByteEncoder SendMessage begin", "key ", kMsg, p.producer, "len: ", kMsg.Key.Length(), kMsg.Value.Length())
if kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 {
log2.Error(operationID, "kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 ", kMsg)
return -1, -1, errors.New("key or value == 0")
}
a, b, c := p.producer.SendMessage(kMsg)
log2.Info(operationID, "ByteEncoder SendMessage end", "key ", kMsg, p.producer)
return a, b, c
log2.Info(operationID, "ByteEncoder SendMessage end", "key ", kMsg.Key.Length(), kMsg.Value.Length(), p.producer)
return a, b, utils.Wrap(c, "")
}

@ -245,15 +245,19 @@ func VerifyToken(token, uid string) (bool, error) {
return true, nil
}
func WsVerifyToken(token, uid string, platformID string, operationID string) (bool, error, string) {
argMsg := "token: " + token + " operationID: " + operationID + " userID: " + uid + " platformID: " + platformID
claims, err := ParseToken(token, operationID)
if err != nil {
return false, utils.Wrap(err, "parse token err"), "parse token err"
errMsg := "parse token err " + argMsg
return false, utils.Wrap(err, errMsg), errMsg
}
if claims.UID != uid {
return false, utils.Wrap(&constant.ErrTokenUnknown, "uid is not same to token uid"), "uid is not same to token uid"
errMsg := " uid is not same to token uid " + " claims.UID " + claims.UID + argMsg
return false, utils.Wrap(&constant.ErrTokenUnknown, errMsg), errMsg
}
if claims.Platform != constant.PlatformIDToName(utils.StringToInt(platformID)) {
return false, utils.Wrap(&constant.ErrTokenUnknown, "platform is not same to token platform"), "platform is not same to token platform"
errMsg := " platform is not same to token platform " + argMsg + "claims platformID " + claims.Platform
return false, utils.Wrap(&constant.ErrTokenUnknown, errMsg), errMsg
}
log.NewDebug(operationID, utils.GetSelfFuncName(), " check ok ", claims.UID, uid, claims.Platform)
return true, nil, ""

Loading…
Cancel
Save