diff --git a/config/config.yaml b/config/config.yaml index e11407c5c..196b7e231 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -230,7 +230,10 @@ chatpersistencemysql: true reliablestorage: false #消息缓存时间 msgCacheTimeout: 1800 - +#群聊已读开启 +groupMessageHasReadReceiptEnable: false +#单聊已读开启 +singleMessageHasReadReceiptEnable: false #token config tokenpolicy: accessSecret: "open_im_server" #token生成相关,默认即可 diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 7baccf176..84b33cb4f 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -466,7 +466,9 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS //och.chArrays[channelID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: []*pbMsg.MsgDataToMQ{&msgFromMQ}, triggerID: msgFromMQ.OperationID}} //sess.MarkMessage(msg, "") rwLock.Lock() - cMsg = append(cMsg, msg) + if len(msg.Value) != 0 { + cMsg = append(cMsg, msg) + } rwLock.Unlock() sess.MarkMessage(msg, "") //och.TriggerCmd(OnlineTopicBusy) @@ -545,7 +547,7 @@ func sendMessageToPush(message *pbMsg.MsgDataToMQ, pushToUserID string) { grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImPushName) if grpcConn == nil { log.Error(rpcPushMsg.OperationID, "rpc dial failed", "push data", rpcPushMsg.String()) - pid, offset, err := producer.SendMessage(&mqPushMsg) + pid, offset, err := producer.SendMessage(&mqPushMsg, mqPushMsg.PushToUserID, rpcPushMsg.OperationID) if err != nil { log.Error(mqPushMsg.OperationID, "kafka send failed", "send data", message.String(), "pid", pid, "offset", offset, "err", err.Error()) } @@ -555,7 +557,7 @@ func sendMessageToPush(message *pbMsg.MsgDataToMQ, pushToUserID string) { _, err := msgClient.PushMsg(context.Background(), &rpcPushMsg) if err != nil { log.Error(rpcPushMsg.OperationID, "rpc send failed", rpcPushMsg.OperationID, "push data", rpcPushMsg.String(), "err", err.Error()) - pid, offset, err := producer.SendMessage(&mqPushMsg) + pid, offset, err := producer.SendMessage(&mqPushMsg, mqPushMsg.PushToUserID, rpcPushMsg.OperationID) if err != nil { log.Error(message.OperationID, "kafka send failed", mqPushMsg.OperationID, "send data", mqPushMsg.String(), "pid", pid, "offset", offset, "err", err.Error()) } diff --git a/internal/msg_transfer/logic/persistent_msg_handler.go b/internal/msg_transfer/logic/persistent_msg_handler.go index 245634d84..338cc2606 100644 --- a/internal/msg_transfer/logic/persistent_msg_handler.go +++ b/internal/msg_transfer/logic/persistent_msg_handler.go @@ -34,7 +34,7 @@ func (pc *PersistentConsumerHandler) Init() { func (pc *PersistentConsumerHandler) handleChatWs2Mysql(cMsg *sarama.ConsumerMessage, msgKey string, _ sarama.ConsumerGroupSession) { msg := cMsg.Value - log.NewInfo("msg come here mysql!!!", "", "msg", string(msg)) + log.NewInfo("msg come here mysql!!!", "", "msg", string(msg), msgKey) var tag bool msgFromMQ := pbMsg.MsgDataToMQ{} err := proto.Unmarshal(msg, &msgFromMQ) @@ -42,6 +42,7 @@ func (pc *PersistentConsumerHandler) handleChatWs2Mysql(cMsg *sarama.ConsumerMes log.NewError(msgFromMQ.OperationID, "msg_transfer Unmarshal msg err", "msg", string(msg), "err", err.Error()) return } + log.Debug(msgFromMQ.OperationID, "proto.Unmarshal MsgDataToMQ", msgFromMQ.String()) //Control whether to store history messages (mysql) isPersist := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsPersistent) //Only process receiver data @@ -71,8 +72,12 @@ func (PersistentConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { func (pc *PersistentConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for msg := range claim.Messages() { - log.NewDebug("", "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) - pc.msgHandle[msg.Topic](msg, string(msg.Key), sess) + log.NewDebug("", "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key)) + if len(msg.Value) != 0 { + pc.msgHandle[msg.Topic](msg, string(msg.Key), sess) + } else { + log.Error("", "msg get from kafka but is nil", msg.Key) + } sess.MarkMessage(msg, "") } return nil diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index 7897d3f3d..131a36bab 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -49,6 +49,24 @@ type MsgCallBackResp struct { } } +func isMessageHasReadEnabled(pb *pbChat.SendMsgReq) (bool, int32, string) { + switch pb.MsgData.ContentType { + case constant.HasReadReceipt: + if config.Config.SingleMessageHasReadReceiptEnable { + return true, 0, "" + } else { + return false, constant.ErrMessageHasReadDisable.ErrCode, constant.ErrMessageHasReadDisable.ErrMsg + } + case constant.GroupHasReadReceipt: + if config.Config.GroupMessageHasReadReceiptEnable { + return true, 0, "" + } else { + return false, constant.ErrMessageHasReadDisable.ErrCode, constant.ErrMessageHasReadDisable.ErrMsg + } + } + return true, 0, "" +} + func userRelationshipVerification(data *pbChat.SendMsgReq) (bool, int32, string) { if data.MsgData.SessionType == constant.GroupChatType { return true, 0, "" @@ -144,7 +162,12 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S replay := pbChat.SendMsgResp{} newTime := db.GetCurrentTimestampByMill() log.NewWarn(pb.OperationID, "rpc sendMsg come here", pb.String(), pb.MsgData.ClientMsgID) - flag, errCode, errMsg := userRelationshipVerification(pb) + flag, errCode, errMsg := isMessageHasReadEnabled(pb) + if !flag { + return returnMsg(&replay, pb, errCode, errMsg, "", 0) + } + log.Debug(pb.OperationID, "flag ", flag, config.Config.SingleMessageHasReadReceiptEnable, config.Config.GroupMessageHasReadReceiptEnable) + flag, errCode, errMsg = userRelationshipVerification(pb) if !flag { return returnMsg(&replay, pb, errCode, errMsg, "", 0) } @@ -376,7 +399,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S func (rpc *rpcChat) sendMsgToKafka(m *pbChat.MsgDataToMQ, key string, status string) error { switch status { case constant.OnlineStatus: - pid, offset, err := rpc.onlineProducer.SendMessage(m, key) + pid, offset, err := rpc.onlineProducer.SendMessage(m, key, m.OperationID) if err != nil { log.Error(m.OperationID, "kafka send failed", "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key, status) } else { @@ -384,7 +407,7 @@ func (rpc *rpcChat) sendMsgToKafka(m *pbChat.MsgDataToMQ, key string, status str } return err case constant.OfflineStatus: - pid, offset, err := rpc.onlineProducer.SendMessage(m, key) + pid, offset, err := rpc.onlineProducer.SendMessage(m, key, m.OperationID) if err != nil { log.Error(m.OperationID, "kafka send failed", "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key, status) } diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 2b9a4d891..9da3fdf29 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -214,13 +214,14 @@ type config struct { MsgToPush string `yaml:"msgToPush"` } } - Secret string `yaml:"secret"` - MultiLoginPolicy int `yaml:"multiloginpolicy"` - ChatPersistenceMysql bool `yaml:"chatpersistencemysql"` - ReliableStorage bool `yaml:"reliablestorage"` - MsgCacheTimeout int `yaml:"msgCacheTimeout"` - - TokenPolicy struct { + Secret string `yaml:"secret"` + MultiLoginPolicy int `yaml:"multiloginpolicy"` + ChatPersistenceMysql bool `yaml:"chatpersistencemysql"` + ReliableStorage bool `yaml:"reliablestorage"` + MsgCacheTimeout int `yaml:"msgCacheTimeout"` + GroupMessageHasReadReceiptEnable bool `yaml:"groupMessageHasReadReceiptEnable"` + SingleMessageHasReadReceiptEnable bool `yaml:"singleMessageHasReadReceiptEnable"` + TokenPolicy struct { AccessSecret string `yaml:"accessSecret"` AccessExpire int64 `yaml:"accessExpire"` } diff --git a/pkg/common/constant/constant.go b/pkg/common/constant/constant.go index 53a4cce38..a726111b3 100644 --- a/pkg/common/constant/constant.go +++ b/pkg/common/constant/constant.go @@ -28,23 +28,24 @@ const ( ///ContentType //UserRelated - Text = 101 - Picture = 102 - Voice = 103 - Video = 104 - File = 105 - AtText = 106 - Merger = 107 - Card = 108 - Location = 109 - Custom = 110 - Revoke = 111 - HasReadReceipt = 112 - Typing = 113 - Quote = 114 - Common = 200 - GroupMsg = 201 - SignalMsg = 202 + Text = 101 + Picture = 102 + Voice = 103 + Video = 104 + File = 105 + AtText = 106 + Merger = 107 + Card = 108 + Location = 109 + Custom = 110 + Revoke = 111 + HasReadReceipt = 112 + Typing = 113 + Quote = 114 + GroupHasReadReceipt = 116 + Common = 200 + GroupMsg = 201 + SignalMsg = 202 //SysRelated NotificationBegin = 1000 diff --git a/pkg/common/constant/error.go b/pkg/common/constant/error.go index b0c28725e..52c1253e1 100644 --- a/pkg/common/constant/error.go +++ b/pkg/common/constant/error.go @@ -49,12 +49,13 @@ var ( ErrTokenUnknown = ErrInfo{705, TokenUnknownMsg.Error()} ErrTokenKicked = ErrInfo{706, TokenUserKickedMsg.Error()} - ErrAccess = ErrInfo{ErrCode: 801, ErrMsg: AccessMsg.Error()} - ErrDB = ErrInfo{ErrCode: 802, ErrMsg: DBMsg.Error()} - ErrArgs = ErrInfo{ErrCode: 803, ErrMsg: ArgsMsg.Error()} - ErrStatus = ErrInfo{ErrCode: 804, ErrMsg: StatusMsg.Error()} - ErrCallback = ErrInfo{ErrCode: 809, ErrMsg: CallBackMsg.Error()} - ErrSendLimit = ErrInfo{ErrCode: 810, ErrMsg: "send msg limit, to many request, try again later"} + ErrAccess = ErrInfo{ErrCode: 801, ErrMsg: AccessMsg.Error()} + ErrDB = ErrInfo{ErrCode: 802, ErrMsg: DBMsg.Error()} + ErrArgs = ErrInfo{ErrCode: 803, ErrMsg: ArgsMsg.Error()} + ErrStatus = ErrInfo{ErrCode: 804, ErrMsg: StatusMsg.Error()} + ErrCallback = ErrInfo{ErrCode: 809, ErrMsg: CallBackMsg.Error()} + ErrSendLimit = ErrInfo{ErrCode: 810, ErrMsg: "send msg limit, to many request, try again later"} + ErrMessageHasReadDisable = ErrInfo{ErrCode: 811, ErrMsg: "message has read disable"} ) var ( diff --git a/pkg/common/kafka/producer.go b/pkg/common/kafka/producer.go index 98ad92209..3d8ca02f5 100644 --- a/pkg/common/kafka/producer.go +++ b/pkg/common/kafka/producer.go @@ -2,6 +2,7 @@ package kafka import ( log2 "Open_IM/pkg/common/log" + "errors" "github.com/Shopify/sarama" "github.com/golang/protobuf/proto" ) @@ -32,18 +33,22 @@ func NewKafkaProducer(addr []string, topic string) *Producer { return &p } -func (p *Producer) SendMessage(m proto.Message, key ...string) (int32, int64, error) { +func (p *Producer) SendMessage(m proto.Message, key string, operationID string) (int32, int64, error) { + log2.Info(operationID, "SendMessage", "key ", key, m.String(), p.producer) kMsg := &sarama.ProducerMessage{} kMsg.Topic = p.topic - if len(key) == 1 { - kMsg.Key = sarama.StringEncoder(key[0]) - } + kMsg.Key = sarama.StringEncoder(key) bMsg, err := proto.Marshal(m) if err != nil { - log2.Error("", "", "proto marshal err = %s", err.Error()) + log2.Error(operationID, "", "proto marshal err = %s", err.Error()) return -1, -1, err } + if len(bMsg) == 0 { + return 0, 0, errors.New("msg content is nil") + } kMsg.Value = sarama.ByteEncoder(bMsg) - - return p.producer.SendMessage(kMsg) + log2.Info(operationID, "ByteEncoder SendMessage begin", "key ", kMsg, p.producer) + a, b, c := p.producer.SendMessage(kMsg) + log2.Info(operationID, "ByteEncoder SendMessage end", "key ", kMsg, p.producer) + return a, b, c }