From f8b223a19c20cf3cde9b2b274e087263bb6d0211 Mon Sep 17 00:00:00 2001 From: chenqinghe Date: Wed, 29 Dec 2021 21:53:17 +0800 Subject: [PATCH] fix: fix some compile errors --- cmd/open_im_api/main.go | 2 ++ internal/push/logic/init.go | 7 ++++--- internal/push/logic/push_handler.go | 29 ++++++++++------------------- internal/rpc/chat/rpcChat.go | 13 ++++++++----- 4 files changed, 24 insertions(+), 27 deletions(-) diff --git a/cmd/open_im_api/main.go b/cmd/open_im_api/main.go index a42c3c706..7603ed0f3 100644 --- a/cmd/open_im_api/main.go +++ b/cmd/open_im_api/main.go @@ -12,7 +12,9 @@ import ( "Open_IM/pkg/common/log" "Open_IM/pkg/utils" "flag" + "os" "strconv" + "github.com/gin-gonic/gin" //"syscall" diff --git a/internal/push/logic/init.go b/internal/push/logic/init.go index 9fba41bdd..3c9a66020 100644 --- a/internal/push/logic/init.go +++ b/internal/push/logic/init.go @@ -8,8 +8,9 @@ package logic import ( "Open_IM/pkg/common/config" - "Open_IM/pkg/common/kafka" "Open_IM/pkg/common/log" + "Open_IM/pkg/common/mq" + "Open_IM/pkg/common/mq/kafka" "Open_IM/pkg/utils" ) @@ -17,7 +18,7 @@ var ( rpcServer RPCServer pushCh PushConsumerHandler pushTerminal []int32 - producer *kafka.Producer + producer mq.Producer ) func Init(rpcPort int) { @@ -32,5 +33,5 @@ func init() { func Run() { go rpcServer.run() - go pushCh.pushConsumerGroup.RegisterHandleAndConsumer(&pushCh) + go pushCh.pushConsumerGroup.Start() } diff --git a/internal/push/logic/push_handler.go b/internal/push/logic/push_handler.go index 22382a5b9..643b30082 100644 --- a/internal/push/logic/push_handler.go +++ b/internal/push/logic/push_handler.go @@ -8,34 +8,33 @@ package logic import ( "Open_IM/pkg/common/config" - kfk "Open_IM/pkg/common/kafka" "Open_IM/pkg/common/log" + "Open_IM/pkg/common/mq" + kfk "Open_IM/pkg/common/mq/kafka" pbChat "Open_IM/pkg/proto/chat" pbRelay "Open_IM/pkg/proto/relay" "github.com/Shopify/sarama" "github.com/golang/protobuf/proto" ) -type fcb func(msg []byte) - type PushConsumerHandler struct { - msgHandle map[string]fcb - pushConsumerGroup *kfk.MConsumerGroup + pushConsumerGroup mq.Consumer } func (ms *PushConsumerHandler) Init() { - ms.msgHandle = make(map[string]fcb) - ms.msgHandle[config.Config.Kafka.Ms2pschat.Topic] = ms.handleMs2PsChat ms.pushConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0, - OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ms2pschat.Topic}, config.Config.Kafka.Ms2pschat.Addr, + OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, config.Config.Kafka.Ms2pschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToPush) + + ms.pushConsumerGroup.RegisterMessageHandler(config.Config.Kafka.Ms2pschat.Topic, mq.MessageHandleFunc(ms.handleMs2PsChat)) } -func (ms *PushConsumerHandler) handleMs2PsChat(msg []byte) { +func (ms *PushConsumerHandler) handleMs2PsChat(message *mq.Message) error { + msg := message.Value log.InfoByKv("msg come from kafka And push!!!", "", "msg", string(msg)) pbData := pbChat.MsgSvrToPushSvrChatMsg{} if err := proto.Unmarshal(msg, &pbData); err != nil { log.ErrorByKv("push Unmarshal msg err", "", "msg", string(msg), "err", err.Error()) - return + return nil // not retry } sendPbData := pbRelay.MsgToUserReq{} sendPbData.SendTime = pbData.SendTime @@ -54,14 +53,6 @@ func (ms *PushConsumerHandler) handleMs2PsChat(msg []byte) { sendPbData.RecvSeq = pbData.RecvSeq //Call push module to send message to the user MsgToUser(&sendPbData, pbData.OfflineInfo, pbData.Options) -} -func (PushConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } -func (PushConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } -func (ms *PushConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, - claim sarama.ConsumerGroupClaim) error { - for msg := range claim.Messages() { - log.InfoByKv("kafka get info to mysql", "", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) - ms.msgHandle[msg.Topic](msg.Value) - } + return nil } diff --git a/internal/rpc/chat/rpcChat.go b/internal/rpc/chat/rpcChat.go index e3d2460dd..e220a1860 100644 --- a/internal/rpc/chat/rpcChat.go +++ b/internal/rpc/chat/rpcChat.go @@ -1,16 +1,19 @@ package chat import ( + "net" + "strconv" + "strings" + "Open_IM/pkg/common/config" - "Open_IM/pkg/common/kafka" "Open_IM/pkg/common/log" + "Open_IM/pkg/common/mq" + "Open_IM/pkg/common/mq/kafka" "Open_IM/pkg/grpc-etcdv3/getcdv3" pbChat "Open_IM/pkg/proto/chat" "Open_IM/pkg/utils" + "google.golang.org/grpc" - "net" - "strconv" - "strings" ) type rpcChat struct { @@ -18,7 +21,7 @@ type rpcChat struct { rpcRegisterName string etcdSchema string etcdAddr []string - producer *kafka.Producer + producer mq.Producer } func NewRpcChatServer(port int) *rpcChat {