Merge branch 'tuoyun' of github.com:OpenIMSDK/Open-IM-Server into tuoyun

pull/232/head
wangchuxiao 2 years ago
commit 2b596e226a

@ -230,7 +230,10 @@ chatpersistencemysql: true
reliablestorage: false reliablestorage: false
#消息缓存时间 #消息缓存时间
msgCacheTimeout: 1800 msgCacheTimeout: 1800
#群聊已读开启
groupMessageHasReadReceiptEnable: false
#单聊已读开启
singleMessageHasReadReceiptEnable: false
#token config #token config
tokenpolicy: tokenpolicy:
accessSecret: "open_im_server" #token生成相关默认即可 accessSecret: "open_im_server" #token生成相关默认即可

@ -466,7 +466,9 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS
//och.chArrays[channelID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: []*pbMsg.MsgDataToMQ{&msgFromMQ}, triggerID: msgFromMQ.OperationID}} //och.chArrays[channelID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: []*pbMsg.MsgDataToMQ{&msgFromMQ}, triggerID: msgFromMQ.OperationID}}
//sess.MarkMessage(msg, "") //sess.MarkMessage(msg, "")
rwLock.Lock() rwLock.Lock()
if len(msg.Value) != 0 {
cMsg = append(cMsg, msg) cMsg = append(cMsg, msg)
}
rwLock.Unlock() rwLock.Unlock()
sess.MarkMessage(msg, "") sess.MarkMessage(msg, "")
//och.TriggerCmd(OnlineTopicBusy) //och.TriggerCmd(OnlineTopicBusy)
@ -545,7 +547,7 @@ func sendMessageToPush(message *pbMsg.MsgDataToMQ, pushToUserID string) {
grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImPushName) grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImPushName)
if grpcConn == nil { if grpcConn == nil {
log.Error(rpcPushMsg.OperationID, "rpc dial failed", "push data", rpcPushMsg.String()) log.Error(rpcPushMsg.OperationID, "rpc dial failed", "push data", rpcPushMsg.String())
pid, offset, err := producer.SendMessage(&mqPushMsg) pid, offset, err := producer.SendMessage(&mqPushMsg, mqPushMsg.PushToUserID, rpcPushMsg.OperationID)
if err != nil { if err != nil {
log.Error(mqPushMsg.OperationID, "kafka send failed", "send data", message.String(), "pid", pid, "offset", offset, "err", err.Error()) log.Error(mqPushMsg.OperationID, "kafka send failed", "send data", message.String(), "pid", pid, "offset", offset, "err", err.Error())
} }
@ -555,7 +557,7 @@ func sendMessageToPush(message *pbMsg.MsgDataToMQ, pushToUserID string) {
_, err := msgClient.PushMsg(context.Background(), &rpcPushMsg) _, err := msgClient.PushMsg(context.Background(), &rpcPushMsg)
if err != nil { if err != nil {
log.Error(rpcPushMsg.OperationID, "rpc send failed", rpcPushMsg.OperationID, "push data", rpcPushMsg.String(), "err", err.Error()) log.Error(rpcPushMsg.OperationID, "rpc send failed", rpcPushMsg.OperationID, "push data", rpcPushMsg.String(), "err", err.Error())
pid, offset, err := producer.SendMessage(&mqPushMsg) pid, offset, err := producer.SendMessage(&mqPushMsg, mqPushMsg.PushToUserID, rpcPushMsg.OperationID)
if err != nil { if err != nil {
log.Error(message.OperationID, "kafka send failed", mqPushMsg.OperationID, "send data", mqPushMsg.String(), "pid", pid, "offset", offset, "err", err.Error()) log.Error(message.OperationID, "kafka send failed", mqPushMsg.OperationID, "send data", mqPushMsg.String(), "pid", pid, "offset", offset, "err", err.Error())
} }

@ -34,7 +34,7 @@ func (pc *PersistentConsumerHandler) Init() {
func (pc *PersistentConsumerHandler) handleChatWs2Mysql(cMsg *sarama.ConsumerMessage, msgKey string, _ sarama.ConsumerGroupSession) { func (pc *PersistentConsumerHandler) handleChatWs2Mysql(cMsg *sarama.ConsumerMessage, msgKey string, _ sarama.ConsumerGroupSession) {
msg := cMsg.Value msg := cMsg.Value
log.NewInfo("msg come here mysql!!!", "", "msg", string(msg)) log.NewInfo("msg come here mysql!!!", "", "msg", string(msg), msgKey)
var tag bool var tag bool
msgFromMQ := pbMsg.MsgDataToMQ{} msgFromMQ := pbMsg.MsgDataToMQ{}
err := proto.Unmarshal(msg, &msgFromMQ) err := proto.Unmarshal(msg, &msgFromMQ)
@ -42,6 +42,7 @@ func (pc *PersistentConsumerHandler) handleChatWs2Mysql(cMsg *sarama.ConsumerMes
log.NewError(msgFromMQ.OperationID, "msg_transfer Unmarshal msg err", "msg", string(msg), "err", err.Error()) log.NewError(msgFromMQ.OperationID, "msg_transfer Unmarshal msg err", "msg", string(msg), "err", err.Error())
return return
} }
log.Debug(msgFromMQ.OperationID, "proto.Unmarshal MsgDataToMQ", msgFromMQ.String())
//Control whether to store history messages (mysql) //Control whether to store history messages (mysql)
isPersist := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsPersistent) isPersist := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsPersistent)
//Only process receiver data //Only process receiver data
@ -71,8 +72,12 @@ func (PersistentConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
func (pc *PersistentConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, func (pc *PersistentConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim) error { claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() { for msg := range claim.Messages() {
log.NewDebug("", "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) log.NewDebug("", "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key))
if len(msg.Value) != 0 {
pc.msgHandle[msg.Topic](msg, string(msg.Key), sess) pc.msgHandle[msg.Topic](msg, string(msg.Key), sess)
} else {
log.Error("", "msg get from kafka but is nil", msg.Key)
}
sess.MarkMessage(msg, "") sess.MarkMessage(msg, "")
} }
return nil return nil

@ -49,6 +49,24 @@ type MsgCallBackResp struct {
} }
} }
func isMessageHasReadEnabled(pb *pbChat.SendMsgReq) (bool, int32, string) {
switch pb.MsgData.ContentType {
case constant.HasReadReceipt:
if config.Config.SingleMessageHasReadReceiptEnable {
return true, 0, ""
} else {
return false, constant.ErrMessageHasReadDisable.ErrCode, constant.ErrMessageHasReadDisable.ErrMsg
}
case constant.GroupHasReadReceipt:
if config.Config.GroupMessageHasReadReceiptEnable {
return true, 0, ""
} else {
return false, constant.ErrMessageHasReadDisable.ErrCode, constant.ErrMessageHasReadDisable.ErrMsg
}
}
return true, 0, ""
}
func userRelationshipVerification(data *pbChat.SendMsgReq) (bool, int32, string) { func userRelationshipVerification(data *pbChat.SendMsgReq) (bool, int32, string) {
if data.MsgData.SessionType == constant.GroupChatType { if data.MsgData.SessionType == constant.GroupChatType {
return true, 0, "" return true, 0, ""
@ -144,7 +162,12 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
replay := pbChat.SendMsgResp{} replay := pbChat.SendMsgResp{}
newTime := db.GetCurrentTimestampByMill() newTime := db.GetCurrentTimestampByMill()
log.NewWarn(pb.OperationID, "rpc sendMsg come here", pb.String(), pb.MsgData.ClientMsgID) log.NewWarn(pb.OperationID, "rpc sendMsg come here", pb.String(), pb.MsgData.ClientMsgID)
flag, errCode, errMsg := userRelationshipVerification(pb) flag, errCode, errMsg := isMessageHasReadEnabled(pb)
if !flag {
return returnMsg(&replay, pb, errCode, errMsg, "", 0)
}
log.Debug(pb.OperationID, "flag ", flag, config.Config.SingleMessageHasReadReceiptEnable, config.Config.GroupMessageHasReadReceiptEnable)
flag, errCode, errMsg = userRelationshipVerification(pb)
if !flag { if !flag {
return returnMsg(&replay, pb, errCode, errMsg, "", 0) return returnMsg(&replay, pb, errCode, errMsg, "", 0)
} }
@ -376,7 +399,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
func (rpc *rpcChat) sendMsgToKafka(m *pbChat.MsgDataToMQ, key string, status string) error { func (rpc *rpcChat) sendMsgToKafka(m *pbChat.MsgDataToMQ, key string, status string) error {
switch status { switch status {
case constant.OnlineStatus: case constant.OnlineStatus:
pid, offset, err := rpc.onlineProducer.SendMessage(m, key) pid, offset, err := rpc.onlineProducer.SendMessage(m, key, m.OperationID)
if err != nil { if err != nil {
log.Error(m.OperationID, "kafka send failed", "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key, status) log.Error(m.OperationID, "kafka send failed", "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key, status)
} else { } else {
@ -384,7 +407,7 @@ func (rpc *rpcChat) sendMsgToKafka(m *pbChat.MsgDataToMQ, key string, status str
} }
return err return err
case constant.OfflineStatus: case constant.OfflineStatus:
pid, offset, err := rpc.onlineProducer.SendMessage(m, key) pid, offset, err := rpc.onlineProducer.SendMessage(m, key, m.OperationID)
if err != nil { if err != nil {
log.Error(m.OperationID, "kafka send failed", "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key, status) log.Error(m.OperationID, "kafka send failed", "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key, status)
} }

@ -219,7 +219,8 @@ type config struct {
ChatPersistenceMysql bool `yaml:"chatpersistencemysql"` ChatPersistenceMysql bool `yaml:"chatpersistencemysql"`
ReliableStorage bool `yaml:"reliablestorage"` ReliableStorage bool `yaml:"reliablestorage"`
MsgCacheTimeout int `yaml:"msgCacheTimeout"` MsgCacheTimeout int `yaml:"msgCacheTimeout"`
GroupMessageHasReadReceiptEnable bool `yaml:"groupMessageHasReadReceiptEnable"`
SingleMessageHasReadReceiptEnable bool `yaml:"singleMessageHasReadReceiptEnable"`
TokenPolicy struct { TokenPolicy struct {
AccessSecret string `yaml:"accessSecret"` AccessSecret string `yaml:"accessSecret"`
AccessExpire int64 `yaml:"accessExpire"` AccessExpire int64 `yaml:"accessExpire"`

@ -42,6 +42,7 @@ const (
HasReadReceipt = 112 HasReadReceipt = 112
Typing = 113 Typing = 113
Quote = 114 Quote = 114
GroupHasReadReceipt = 116
Common = 200 Common = 200
GroupMsg = 201 GroupMsg = 201
SignalMsg = 202 SignalMsg = 202

@ -55,6 +55,7 @@ var (
ErrStatus = ErrInfo{ErrCode: 804, ErrMsg: StatusMsg.Error()} ErrStatus = ErrInfo{ErrCode: 804, ErrMsg: StatusMsg.Error()}
ErrCallback = ErrInfo{ErrCode: 809, ErrMsg: CallBackMsg.Error()} ErrCallback = ErrInfo{ErrCode: 809, ErrMsg: CallBackMsg.Error()}
ErrSendLimit = ErrInfo{ErrCode: 810, ErrMsg: "send msg limit, to many request, try again later"} ErrSendLimit = ErrInfo{ErrCode: 810, ErrMsg: "send msg limit, to many request, try again later"}
ErrMessageHasReadDisable = ErrInfo{ErrCode: 811, ErrMsg: "message has read disable"}
) )
var ( var (

@ -2,6 +2,7 @@ package kafka
import ( import (
log2 "Open_IM/pkg/common/log" log2 "Open_IM/pkg/common/log"
"errors"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
) )
@ -32,18 +33,22 @@ func NewKafkaProducer(addr []string, topic string) *Producer {
return &p return &p
} }
func (p *Producer) SendMessage(m proto.Message, key ...string) (int32, int64, error) { func (p *Producer) SendMessage(m proto.Message, key string, operationID string) (int32, int64, error) {
log2.Info(operationID, "SendMessage", "key ", key, m.String(), p.producer)
kMsg := &sarama.ProducerMessage{} kMsg := &sarama.ProducerMessage{}
kMsg.Topic = p.topic kMsg.Topic = p.topic
if len(key) == 1 { kMsg.Key = sarama.StringEncoder(key)
kMsg.Key = sarama.StringEncoder(key[0])
}
bMsg, err := proto.Marshal(m) bMsg, err := proto.Marshal(m)
if err != nil { if err != nil {
log2.Error("", "", "proto marshal err = %s", err.Error()) log2.Error(operationID, "", "proto marshal err = %s", err.Error())
return -1, -1, err return -1, -1, err
} }
if len(bMsg) == 0 {
return 0, 0, errors.New("msg content is nil")
}
kMsg.Value = sarama.ByteEncoder(bMsg) kMsg.Value = sarama.ByteEncoder(bMsg)
log2.Info(operationID, "ByteEncoder SendMessage begin", "key ", kMsg, p.producer)
return p.producer.SendMessage(kMsg) a, b, c := p.producer.SendMessage(kMsg)
log2.Info(operationID, "ByteEncoder SendMessage end", "key ", kMsg, p.producer)
return a, b, c
} }

Loading…
Cancel
Save