diff --git a/config/config.yaml b/config/config.yaml index 8c1ffcc1a..c1134b9cf 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -41,11 +41,15 @@ kafka: ws2mschat: addr: [ 127.0.0.1:9092 ] #kafka配置,默认即可 topic: "ws2ms_chat" + ws2mschatoffline: + addr: [ 127.0.0.1:9092 ] #kafka配置,默认即可 + topic: "ws2ms_chat_offline" ms2pschat: addr: [ 127.0.0.1:9092 ] #kafka配置,默认即可 topic: "ms2ps_chat" consumergroupid: msgToMongo: mongo + MsgToMongoOffline: mongo_offline msgToMySql: mysql msgToPush: push diff --git a/internal/msg_transfer/logic/init.go b/internal/msg_transfer/logic/init.go index a67b9cff4..dff95334c 100644 --- a/internal/msg_transfer/logic/init.go +++ b/internal/msg_transfer/logic/init.go @@ -2,20 +2,36 @@ package logic import ( "Open_IM/pkg/common/config" + "Open_IM/pkg/common/constant" "Open_IM/pkg/common/kafka" + "Open_IM/pkg/statistics" "fmt" + "sync" ) +const OnlineTopicBusy = "Busy" +const OnlineTopicVacancy = "Vacancy" + var ( - persistentCH PersistentConsumerHandler - historyCH HistoryConsumerHandler - producer *kafka.Producer + persistentCH PersistentConsumerHandler + historyCH OnlineHistoryConsumerHandler + offlineHistoryCH OfflineHistoryConsumerHandler + producer *kafka.Producer + cmdCh chan Cmd2Value + onlineTopicStatus string + w *sync.Mutex + singleMsgSuccessCount uint64 + groupMsgCount uint64 + singleMsgFailedCount uint64 ) func Init() { - + cmdCh = make(chan Cmd2Value, 10000) persistentCH.Init() - historyCH.Init() + historyCH.Init(cmdCh) + offlineHistoryCH.Init(cmdCh) + statistics.NewStatistics(&singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) + statistics.NewStatistics(&groupMsgCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second groupMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) producer = kafka.NewKafkaProducer(config.Config.Kafka.Ms2pschat.Addr, config.Config.Kafka.Ms2pschat.Topic) } func Run() { @@ -26,4 +42,15 @@ func Run() { fmt.Println("not start mysql consumer") } go historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyCH) + go offlineHistoryCH.historyConsumerGroup.RegisterHandleAndConsumer(&offlineHistoryCH) +} +func SetOnlineTopicStatus(status string) { + w.Lock() + defer w.Unlock() + onlineTopicStatus = status +} +func GetOnlineTopicStatus() string { + w.Lock() + defer w.Unlock() + return onlineTopicStatus } diff --git a/internal/msg_transfer/logic/offline_history_msg_handler.go b/internal/msg_transfer/logic/offline_history_msg_handler.go new file mode 100644 index 000000000..c4e78f96d --- /dev/null +++ b/internal/msg_transfer/logic/offline_history_msg_handler.go @@ -0,0 +1,116 @@ +package logic + +import ( + "Open_IM/pkg/common/config" + "Open_IM/pkg/common/constant" + kfk "Open_IM/pkg/common/kafka" + "Open_IM/pkg/common/log" + pbMsg "Open_IM/pkg/proto/chat" + "Open_IM/pkg/utils" + "github.com/Shopify/sarama" + "github.com/golang/protobuf/proto" + "time" +) + +type OfflineHistoryConsumerHandler struct { + msgHandle map[string]fcb + cmdCh chan Cmd2Value + historyConsumerGroup *kfk.MConsumerGroup +} + +func (mc *OfflineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) { + mc.msgHandle = make(map[string]fcb) + mc.cmdCh = cmdCh + mc.msgHandle[config.Config.Kafka.Ws2mschatOffline.Topic] = mc.handleChatWs2Mongo + mc.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0, + OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschatOffline.Topic}, + config.Config.Kafka.Ws2mschatOffline.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongoOffline) + +} + +func (mc *OfflineHistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) { + now := time.Now() + msgFromMQ := pbMsg.MsgDataToMQ{} + err := proto.Unmarshal(msg, &msgFromMQ) + if err != nil { + log.Error("msg_transfer Unmarshal msg err", "", "msg", string(msg), "err", err.Error()) + return + } + operationID := msgFromMQ.OperationID + log.NewInfo(operationID, "msg come mongo!!!", "", "msg", string(msg)) + //Control whether to store offline messages (mongo) + isHistory := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsHistory) + //Control whether to store history messages (mysql) + isPersist := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsPersistent) + isSenderSync := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsSenderSync) + switch msgFromMQ.MsgData.SessionType { + case constant.SingleChatType: + log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = SingleChatType", isHistory, isPersist) + if isHistory { + err := saveUserChat(msgKey, &msgFromMQ) + if err != nil { + singleMsgFailedCount++ + log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String()) + return + } + singleMsgSuccessCount++ + log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now)) + } + if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID { + } else { + go sendMessageToPush(&msgFromMQ, msgKey) + } + log.NewDebug(operationID, "saveUserChat cost time ", time.Since(now)) + case constant.GroupChatType: + log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = GroupChatType", isHistory, isPersist) + if isHistory { + err := saveUserChat(msgFromMQ.MsgData.RecvID, &msgFromMQ) + if err != nil { + log.NewError(operationID, "group data insert to mongo err", msgFromMQ.String(), msgFromMQ.MsgData.RecvID, err.Error()) + return + } + groupMsgCount++ + } + go sendMessageToPush(&msgFromMQ, msgFromMQ.MsgData.RecvID) + case constant.NotificationChatType: + log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = NotificationChatType", isHistory, isPersist) + if isHistory { + err := saveUserChat(msgKey, &msgFromMQ) + if err != nil { + log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String()) + return + } + log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now)) + } + if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID { + } else { + go sendMessageToPush(&msgFromMQ, msgKey) + } + log.NewDebug(operationID, "saveUserChat cost time ", time.Since(now)) + default: + log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String()) + return + } + log.NewDebug(msgFromMQ.OperationID, "msg_transfer handle topic data to database success...", msgFromMQ.String()) +} + +func (OfflineHistoryConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } +func (OfflineHistoryConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } +func (mc *OfflineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, + claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group + log.NewDebug("", "new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) + select { + case cmd := <-mc.cmdCh: + if cmd.Cmd == OnlineTopicVacancy { + for msg := range claim.Messages() { + if GetOnlineTopicStatus() == OnlineTopicVacancy { + log.NewDebug("", "kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) + mc.msgHandle[msg.Topic](msg.Value, string(msg.Key)) + sess.MarkMessage(msg, "") + } + } + } + + } + return nil +} diff --git a/internal/msg_transfer/logic/history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go similarity index 71% rename from internal/msg_transfer/logic/history_msg_handler.go rename to internal/msg_transfer/logic/online_history_msg_handler.go index 27d84d673..597d39d05 100644 --- a/internal/msg_transfer/logic/history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -8,10 +8,9 @@ import ( "Open_IM/pkg/grpc-etcdv3/getcdv3" pbMsg "Open_IM/pkg/proto/chat" pbPush "Open_IM/pkg/proto/push" - "Open_IM/pkg/statistics" "Open_IM/pkg/utils" "context" - "fmt" + "errors" "github.com/Shopify/sarama" "github.com/golang/protobuf/proto" "strings" @@ -19,28 +18,52 @@ import ( ) type fcb func(msg []byte, msgKey string) - -type HistoryConsumerHandler struct { - msgHandle map[string]fcb - historyConsumerGroup *kfk.MConsumerGroup - singleMsgFailedCount uint64 - singleMsgSuccessCount uint64 - groupMsgCount uint64 +type Cmd2Value struct { + Cmd string + Value interface{} +} +type OnlineHistoryConsumerHandler struct { + msgHandle map[string]fcb + historyConsumerGroup *kfk.MConsumerGroup + cmdCh chan Cmd2Value } -func (mc *HistoryConsumerHandler) Init() { - statistics.NewStatistics(&mc.singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) - statistics.NewStatistics(&mc.groupMsgCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second groupMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) - - mc.msgHandle = make(map[string]fcb) - mc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = mc.handleChatWs2Mongo - mc.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0, +func (och *OnlineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) { + och.msgHandle = make(map[string]fcb) + och.cmdCh = cmdCh + och.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = och.handleChatWs2Mongo + och.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic}, config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo) } - -func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) { +func (och *OnlineHistoryConsumerHandler) TriggerCmd(status string) { + operationID := utils.OperationIDGenerator() + for { + err := sendCmd(och.cmdCh, Cmd2Value{Cmd: status, Value: ""}, 1) + if err != nil { + log.Error(operationID, "TriggerCmd failed ", err.Error(), status) + continue + } + log.Debug(operationID, "TriggerCmd success", status) + return + } +} +func sendCmd(ch chan Cmd2Value, value Cmd2Value, timeout int64) error { + var flag = 0 + select { + case ch <- value: + flag = 1 + case <-time.After(time.Second * time.Duration(timeout)): + flag = 2 + } + if flag == 1 { + return nil + } else { + return errors.New("send cmd timeout") + } +} +func (och *OnlineHistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) { now := time.Now() msgFromMQ := pbMsg.MsgDataToMQ{} err := proto.Unmarshal(msg, &msgFromMQ) @@ -61,11 +84,11 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) if isHistory { err := saveUserChat(msgKey, &msgFromMQ) if err != nil { - mc.singleMsgFailedCount++ + singleMsgFailedCount++ log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String()) return } - mc.singleMsgSuccessCount++ + singleMsgSuccessCount++ log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now)) } if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID { @@ -81,7 +104,7 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) log.NewError(operationID, "group data insert to mongo err", msgFromMQ.String(), msgFromMQ.MsgData.RecvID, err.Error()) return } - mc.groupMsgCount++ + groupMsgCount++ } go sendMessageToPush(&msgFromMQ, msgFromMQ.MsgData.RecvID) case constant.NotificationChatType: @@ -106,14 +129,20 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) log.NewDebug(msgFromMQ.OperationID, "msg_transfer handle topic data to database success...", msgFromMQ.String()) } -func (HistoryConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } -func (HistoryConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } -func (mc *HistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, - claim sarama.ConsumerGroupClaim) error { +func (OnlineHistoryConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } +func (OnlineHistoryConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } +func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, + claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group + och.TriggerCmd(OnlineTopicBusy) + SetOnlineTopicStatus(OnlineTopicBusy) for msg := range claim.Messages() { log.NewDebug("", "kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) - mc.msgHandle[msg.Topic](msg.Value, string(msg.Key)) + och.msgHandle[msg.Topic](msg.Value, string(msg.Key)) sess.MarkMessage(msg, "") + if claim.HighWaterMarkOffset()-msg.Offset <= 1 { + och.TriggerCmd(OnlineTopicVacancy) + SetOnlineTopicStatus(OnlineTopicVacancy) + } } return nil } diff --git a/internal/rpc/msg/rpcChat.go b/internal/rpc/msg/rpcChat.go index 8bed05810..002918bae 100644 --- a/internal/rpc/msg/rpcChat.go +++ b/internal/rpc/msg/rpcChat.go @@ -19,7 +19,8 @@ type rpcChat struct { rpcRegisterName string etcdSchema string etcdAddr []string - producer *kafka.Producer + onlineProducer *kafka.Producer + offlineProducer *kafka.Producer } func NewRpcChatServer(port int) *rpcChat { @@ -30,7 +31,8 @@ func NewRpcChatServer(port int) *rpcChat { etcdSchema: config.Config.Etcd.EtcdSchema, etcdAddr: config.Config.Etcd.EtcdAddr, } - rc.producer = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic) + rc.onlineProducer = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic) + rc.offlineProducer = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschatOffline.Addr, config.Config.Kafka.Ws2mschatOffline.Topic) return &rc } diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index 78588cdf1..0cb78ca0a 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -10,9 +10,11 @@ import ( pbCache "Open_IM/pkg/proto/cache" pbChat "Open_IM/pkg/proto/chat" pbConversation "Open_IM/pkg/proto/conversation" + pbRelay "Open_IM/pkg/proto/relay" sdk_ws "Open_IM/pkg/proto/sdk_ws" "Open_IM/pkg/utils" "context" + "errors" "github.com/garyburd/redigo/redis" "github.com/golang/protobuf/proto" "math/rand" @@ -188,14 +190,14 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S if isSend { msgToMQSingle.MsgData = pb.MsgData log.NewInfo(msgToMQSingle.OperationID, msgToMQSingle) - err1 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.RecvID) + err1 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.RecvID, constant.OnlineStatus) if err1 != nil { log.NewError(msgToMQSingle.OperationID, "kafka send msg err:RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String()) return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) } } if msgToMQSingle.MsgData.SendID != msgToMQSingle.MsgData.RecvID { //Filter messages sent to yourself - err2 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.SendID) + err2 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.SendID, constant.OnlineStatus) if err2 != nil { log.NewError(msgToMQSingle.OperationID, "kafka send msg err:SendID", msgToMQSingle.MsgData.SendID, msgToMQSingle.String()) return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) @@ -252,72 +254,33 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S default: } + onUserIDList, offUserIDList := getOnlineAndOfflineUserIDList(memberUserIDList, pb.OperationID) + log.Debug(pb.OperationID, onUserIDList, offUserIDList) groupID := pb.MsgData.GroupID //split parallel send var wg sync.WaitGroup var sendTag bool var split = 50 - remain := len(memberUserIDList) % split - for i := 0; i < len(memberUserIDList)/split; i++ { + remain := len(onUserIDList) % split + for i := 0; i < len(onUserIDList)/split; i++ { wg.Add(1) - go func(list []string) { - // log.Debug(pb.OperationID, "split userID ", list) - groupPB := pbChat.SendMsgReq{Token: pb.Token, OperationID: pb.OperationID, MsgData: &sdk_ws.MsgData{OfflinePushInfo: &sdk_ws.OfflinePushInfo{}}} - *groupPB.MsgData = *pb.MsgData - if pb.MsgData.OfflinePushInfo != nil { - *groupPB.MsgData.OfflinePushInfo = *pb.MsgData.OfflinePushInfo - } - msgToMQGroup := pbChat.MsgDataToMQ{Token: groupPB.Token, OperationID: groupPB.OperationID, MsgData: groupPB.MsgData} - for _, v := range list { - groupPB.MsgData.RecvID = v - isSend := modifyMessageByUserMessageReceiveOpt(v, groupID, constant.GroupChatType, &groupPB) - if isSend { - msgToMQGroup.MsgData = groupPB.MsgData - // log.Debug(groupPB.OperationID, "sendMsgToKafka, ", v, groupID, msgToMQGroup.String()) - err := rpc.sendMsgToKafka(&msgToMQGroup, v) - if err != nil { - log.NewError(msgToMQGroup.OperationID, "kafka send msg err:UserId", v, msgToMQGroup.String()) - } else { - sendTag = true - } - } else { - log.Debug(groupPB.OperationID, "not sendMsgToKafka, ", v) - } - } - wg.Done() - }(memberUserIDList[i*split : (i+1)*split]) + go rpc.sendMsgToGroup(onUserIDList[i*split:(i+1)*split], pb, constant.OnlineStatus, &sendTag, &wg) } if remain > 0 { wg.Add(1) - go func(list []string) { - // log.Debug(pb.OperationID, "split userID ", list) - groupPB := pbChat.SendMsgReq{Token: pb.Token, OperationID: pb.OperationID, MsgData: &sdk_ws.MsgData{OfflinePushInfo: &sdk_ws.OfflinePushInfo{}}} - *groupPB.MsgData = *pb.MsgData - if pb.MsgData.OfflinePushInfo != nil { - *groupPB.MsgData.OfflinePushInfo = *pb.MsgData.OfflinePushInfo - } - msgToMQGroup := pbChat.MsgDataToMQ{Token: groupPB.Token, OperationID: groupPB.OperationID, MsgData: groupPB.MsgData} - for _, v := range list { - groupPB.MsgData.RecvID = v - isSend := modifyMessageByUserMessageReceiveOpt(v, groupID, constant.GroupChatType, &groupPB) - if isSend { - msgToMQGroup.MsgData = groupPB.MsgData - // log.Debug(groupPB.OperationID, "sendMsgToKafka, ", v, groupID, msgToMQGroup.String()) - err := rpc.sendMsgToKafka(&msgToMQGroup, v) - if err != nil { - log.NewError(msgToMQGroup.OperationID, "kafka send msg err:UserId", v, msgToMQGroup.String()) - } else { - sendTag = true - } - } else { - log.Debug(groupPB.OperationID, "not sendMsgToKafka, ", v) - } - } - wg.Done() - }(memberUserIDList[split*(len(memberUserIDList)/split):]) + go rpc.sendMsgToGroup(onUserIDList[split*(len(onUserIDList)/split):], pb, constant.OnlineStatus, &sendTag, &wg) + } + wg.Wait() + remain = len(offUserIDList) % split + for i := 0; i < len(offUserIDList)/split; i++ { + wg.Add(1) + go rpc.sendMsgToGroup(offUserIDList[i*split:(i+1)*split], pb, constant.OfflineStatus, &sendTag, &wg) + } + if remain > 0 { + wg.Add(1) + go rpc.sendMsgToGroup(offUserIDList[split*(len(offUserIDList)/split):], pb, constant.OfflineStatus, &sendTag, &wg) } wg.Wait() - log.Info(msgToMQSingle.OperationID, "addUidList", addUidList) for _, v := range addUidList { pb.MsgData.RecvID = v @@ -325,7 +288,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S log.Info(msgToMQSingle.OperationID, "isSend", isSend) if isSend { msgToMQSingle.MsgData = pb.MsgData - err := rpc.sendMsgToKafka(&msgToMQSingle, v) + err := rpc.sendMsgToKafka(&msgToMQSingle, v, constant.OnlineStatus) if err != nil { log.NewError(msgToMQSingle.OperationID, "kafka send msg err:UserId", v, msgToMQSingle.String()) } else { @@ -397,14 +360,14 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S case constant.NotificationChatType: msgToMQSingle.MsgData = pb.MsgData log.NewInfo(msgToMQSingle.OperationID, msgToMQSingle) - err1 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.RecvID) + err1 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.RecvID, constant.OnlineStatus) if err1 != nil { log.NewError(msgToMQSingle.OperationID, "kafka send msg err:RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String()) return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) } if msgToMQSingle.MsgData.SendID != msgToMQSingle.MsgData.RecvID { //Filter messages sent to yourself - err2 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.SendID) + err2 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.SendID, constant.OnlineStatus) if err2 != nil { log.NewError(msgToMQSingle.OperationID, "kafka send msg err:SendID", msgToMQSingle.MsgData.SendID, msgToMQSingle.String()) return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) @@ -416,12 +379,22 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S } } -func (rpc *rpcChat) sendMsgToKafka(m *pbChat.MsgDataToMQ, key string) error { - pid, offset, err := rpc.producer.SendMessage(m, key) - if err != nil { - log.Error(m.OperationID, "kafka send failed", "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key) +func (rpc *rpcChat) sendMsgToKafka(m *pbChat.MsgDataToMQ, key string, status string) error { + switch status { + case constant.OnlineStatus: + pid, offset, err := rpc.onlineProducer.SendMessage(m, key) + if err != nil { + log.Error(m.OperationID, "kafka send failed", "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key, status) + } + return err + case constant.OfflineStatus: + pid, offset, err := rpc.offlineProducer.SendMessage(m, key) + if err != nil { + log.Error(m.OperationID, "kafka send failed", "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key, status) + } + return err } - return err + return errors.New("status error") } func GetMsgID(sendID string) string { t := time.Now().Format("2006-01-02 15:04:05") @@ -736,3 +709,68 @@ func Notification(n *NotificationMsg) { log.NewError(req.OperationID, "SendMsg rpc failed, ", req.String(), reply.ErrCode, reply.ErrMsg) } } +func getOnlineAndOfflineUserIDList(memberList []string, operationID string) (onllUserIDList []string, offlUserIDList []string) { + var wsResult []*pbRelay.GetUsersOnlineStatusResp_SuccessResult + req := &pbRelay.GetUsersOnlineStatusReq{} + req.UserIDList = memberList + req.OperationID = operationID + req.OpUserID = config.Config.Manager.AppManagerUid[0] + flag := false + grpcCons := getcdv3.GetConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOnlineMessageRelayName) + for _, v := range grpcCons { + client := pbRelay.NewOnlineMessageRelayServiceClient(v) + reply, err := client.GetUsersOnlineStatus(context.Background(), req) + if err != nil { + log.NewError(operationID, "GetUsersOnlineStatus rpc err", req.String(), err.Error()) + continue + } else { + if reply.ErrCode == 0 { + wsResult = append(wsResult, reply.SuccessResult...) + } + } + } + log.NewInfo(operationID, "call GetUsersOnlineStatus rpc server is success", wsResult) + //Online data merge of each node + for _, v1 := range memberList { + flag = false + + for _, v2 := range wsResult { + if v2.UserID == v1 { + flag = true + onllUserIDList = append(onllUserIDList, v1) + } + + } + if !flag { + offlUserIDList = append(offlUserIDList, v1) + } + } + return onllUserIDList, offlUserIDList +} + +func (rpc *rpcChat) sendMsgToGroup(list []string, pb *pbChat.SendMsgReq, status string, sendTag *bool, wg *sync.WaitGroup) { + // log.Debug(pb.OperationID, "split userID ", list) + groupPB := pbChat.SendMsgReq{Token: pb.Token, OperationID: pb.OperationID, MsgData: &sdk_ws.MsgData{OfflinePushInfo: &sdk_ws.OfflinePushInfo{}}} + *groupPB.MsgData = *pb.MsgData + if pb.MsgData.OfflinePushInfo != nil { + *groupPB.MsgData.OfflinePushInfo = *pb.MsgData.OfflinePushInfo + } + msgToMQGroup := pbChat.MsgDataToMQ{Token: groupPB.Token, OperationID: groupPB.OperationID, MsgData: groupPB.MsgData} + for _, v := range list { + groupPB.MsgData.RecvID = v + isSend := modifyMessageByUserMessageReceiveOpt(v, groupPB.MsgData.GroupID, constant.GroupChatType, &groupPB) + if isSend { + msgToMQGroup.MsgData = groupPB.MsgData + // log.Debug(groupPB.OperationID, "sendMsgToKafka, ", v, groupID, msgToMQGroup.String()) + err := rpc.sendMsgToKafka(&msgToMQGroup, v, status) + if err != nil { + log.NewError(msgToMQGroup.OperationID, "kafka send msg err:UserId", v, msgToMQGroup.String()) + } else { + *sendTag = true + } + } else { + log.Debug(groupPB.OperationID, "not sendMsgToKafka, ", v) + } + } + wg.Done() +} diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 9d2ca8e55..8c08da2f5 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -192,14 +192,19 @@ type config struct { Addr []string `yaml:"addr"` Topic string `yaml:"topic"` } + Ws2mschatOffline struct { + Addr []string `yaml:"addr"` + Topic string `yaml:"topic"` + } Ms2pschat struct { Addr []string `yaml:"addr"` Topic string `yaml:"topic"` } ConsumerGroupID struct { - MsgToMongo string `yaml:"msgToMongo"` - MsgToMySql string `yaml:"msgToMySql"` - MsgToPush string `yaml:"msgToPush"` + MsgToMongo string `yaml:"msgToMongo"` + MsgToMongoOffline string `yaml:"msgToMongoOffline"` + MsgToMySql string `yaml:"msgToMySql"` + MsgToPush string `yaml:"msgToPush"` } } Secret string `yaml:"secret"`