message reaction

pull/329/head
Gordon 2 years ago
parent f87eed1336
commit 25aa6a4345

@ -35,7 +35,7 @@ func (rpc *rpcChat) SetMessageReactionExtensions(ctx context.Context, req *msg.S
log.Debug(req.OperationID, "redis handle firstly", req.String())
rResp.MsgFirstModifyTime = utils.GetCurrentTimestampByMill()
for k, v := range req.ReactionExtensionList {
err := lockMessageTypeKey(req.ClientMsgID, k)
err := rpc.dMessageLocker.LockMessageTypeKey(req.ClientMsgID, k)
if err != nil {
setKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, k, v)
continue
@ -54,14 +54,40 @@ func (rpc *rpcChat) SetMessageReactionExtensions(ctx context.Context, req *msg.S
log.Error(req.OperationID, "SetMessageReactionExpire err:", err.Error(), req.String())
}
} else {
//mongo处理
for k, v := range req.ReactionExtensionList {
err := rpc.dMessageLocker.LockMessageTypeKey(req.ClientMsgID, k)
if err != nil {
setKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, k, v)
continue
}
redisValue, err := db.DB.GetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k)
if err != nil && err != go_redis.Nil {
setKeyResultInfo(&rResp, 200, err.Error(), req.ClientMsgID, k, v)
continue
}
temp := new(server_api_params.KeyValue)
utils.JsonStringToStruct(redisValue, temp)
if v.LatestUpdateTime != temp.LatestUpdateTime {
setKeyResultInfo(&rResp, 300, "message have update", req.ClientMsgID, k, temp)
continue
} else {
v.LatestUpdateTime = utils.GetCurrentTimestampByMill()
newerr := db.DB.SetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k, utils.StructToJsonString(v))
if newerr != nil {
setKeyResultInfo(&rResp, 201, newerr.Error(), req.ClientMsgID, k, temp)
continue
}
setKeyResultInfo(&rResp, 0, "", req.ClientMsgID, k, v)
}
}
}
} else {
log.Debug(req.OperationID, "redis handle secondly", req.String())
for k, v := range req.ReactionExtensionList {
err := lockMessageTypeKey(req.ClientMsgID, k)
err := rpc.dMessageLocker.LockMessageTypeKey(req.ClientMsgID, k)
if err != nil {
setKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, k, v)
continue
@ -178,7 +204,7 @@ func (rpc *rpcChat) DeleteMessageReactionExtensions(ctx context.Context, req *ms
if isExists {
log.Debug(req.OperationID, "redis handle this delete", req.String())
for _, v := range req.ReactionExtensionList {
err := lockMessageTypeKey(req.ClientMsgID, v.TypeKey)
err := rpc.dMessageLocker.LockMessageTypeKey(req.ClientMsgID, v.TypeKey)
if err != nil {
setDeleteKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, v.TypeKey, v)
continue
@ -210,16 +236,3 @@ func (rpc *rpcChat) DeleteMessageReactionExtensions(ctx context.Context, req *ms
log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc return is:", rResp.String())
return &rResp, nil
}
func lockMessageTypeKey(clientMsgID, typeKey string) (err error) {
for i := 0; i < 3; i++ {
err = db.DB.LockMessageTypeKey(clientMsgID, typeKey)
if err != nil {
time.Sleep(time.Millisecond * 100)
continue
} else {
break
}
}
return err
}

@ -0,0 +1,32 @@
package msg
import (
"Open_IM/pkg/common/db"
"time"
)
type MessageLocker interface {
LockMessageTypeKey(clientMsgID, typeKey string) (err error)
UnLockMessageTypeKey(clientMsgID string, typeKey string) error
}
type LockerMessage struct{}
func NewLockerMessage() *LockerMessage {
return &LockerMessage{}
}
func (l *LockerMessage) LockMessageTypeKey(clientMsgID, typeKey string) (err error) {
for i := 0; i < 3; i++ {
err = db.DB.LockMessageTypeKey(clientMsgID, typeKey)
if err != nil {
time.Sleep(time.Millisecond * 100)
continue
} else {
break
}
}
return err
}
func (l *LockerMessage) UnLockMessageTypeKey(clientMsgID string, typeKey string) error {
return db.DB.UnLockMessageTypeKey(clientMsgID, typeKey)
}

@ -31,7 +31,8 @@ type rpcChat struct {
etcdAddr []string
messageWriter MessageWriter
//offlineProducer *kafka.Producer
delMsgCh chan deleteMsg
delMsgCh chan deleteMsg
dMessageLocker MessageLocker
}
type deleteMsg struct {
@ -48,6 +49,7 @@ func NewRpcChatServer(port int) *rpcChat {
rpcRegisterName: config.Config.RpcRegisterName.OpenImMsgName,
etcdSchema: config.Config.Etcd.EtcdSchema,
etcdAddr: config.Config.Etcd.EtcdAddr,
dMessageLocker: NewLockerMessage(),
}
rc.messageWriter = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic)
//rc.offlineProducer = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschatOffline.Addr, config.Config.Kafka.Ws2mschatOffline.Topic)

Loading…
Cancel
Save