You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
203 lines
5.9 KiB
203 lines
5.9 KiB
1 year ago
|
package main
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"encoding/json"
|
||
|
"github.com/IBM/sarama"
|
||
|
"github.com/OpenIMSDK/protocol/constant"
|
||
|
"github.com/OpenIMSDK/protocol/msg"
|
||
|
"github.com/OpenIMSDK/protocol/sdkws"
|
||
|
"github.com/OpenIMSDK/tools/mw"
|
||
|
"github.com/golang/protobuf/proto"
|
||
|
"github.com/openimsdk/open-im-server/v3/pkg/apistruct"
|
||
|
pbmsg "github.com/openimsdk/open-im-server/v3/tools/data-conversion/openim/proto/msg"
|
||
|
"google.golang.org/grpc"
|
||
|
"google.golang.org/grpc/credentials/insecure"
|
||
|
"log"
|
||
|
"sync"
|
||
|
"sync/atomic"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
func main() {
|
||
|
|
||
|
var (
|
||
|
topic = "ws2ms_chat" // v2版本配置文件kafka.topic.ws2ms_chat
|
||
|
kafkaAddr = "127.0.0.1:9092" // v2版本配置文件kafka.topic.addr
|
||
|
rpcAddr = "127.0.0.1:10130" // v3版本配置文件rpcPort.openImMessagePort
|
||
|
adminUserID = "openIM123456" // v3版本管理员userID
|
||
|
concurrency = 1 // 并发数量
|
||
|
)
|
||
|
|
||
|
getRpcConn := func() (*grpc.ClientConn, error) {
|
||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
||
|
defer cancel()
|
||
|
return grpc.DialContext(ctx, rpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials()), mw.GrpcClient())
|
||
|
}
|
||
|
conn, err := getRpcConn()
|
||
|
if err != nil {
|
||
|
log.Println("get rpc conn", err)
|
||
|
return
|
||
|
}
|
||
|
defer conn.Close()
|
||
|
|
||
|
msgClient := msg.NewMsgClient(conn)
|
||
|
|
||
|
conf := sarama.NewConfig()
|
||
|
conf.Consumer.Offsets.Initial = sarama.OffsetOldest
|
||
|
|
||
|
consumer, err := sarama.NewConsumer([]string{kafkaAddr}, conf)
|
||
|
if err != nil {
|
||
|
log.Println("kafka consumer conn", err)
|
||
|
return
|
||
|
}
|
||
|
partitions, err := consumer.Partitions(topic) // Get all partitions according to topic
|
||
|
if err != nil {
|
||
|
log.Println("kafka partitions", err)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if len(partitions) == 0 {
|
||
|
log.Println("kafka partitions is empty")
|
||
|
return
|
||
|
}
|
||
|
log.Println("kafka partitions", partitions)
|
||
|
|
||
|
msgCh := make(chan *pbmsg.MsgDataToMQ, concurrency*2)
|
||
|
|
||
|
var kfkWg sync.WaitGroup
|
||
|
|
||
|
distinct := make(map[string]struct{})
|
||
|
var lock sync.Mutex
|
||
|
|
||
|
for _, partition := range partitions {
|
||
|
kfkWg.Add(1)
|
||
|
go func(partition int32) {
|
||
|
defer kfkWg.Done()
|
||
|
pc, err := consumer.ConsumePartition(topic, partition, sarama.OffsetOldest)
|
||
|
if err != nil {
|
||
|
log.Printf("kafka Consume Partition %d failed %s\n", partition, err)
|
||
|
return
|
||
|
}
|
||
|
defer pc.Close()
|
||
|
ch := pc.Messages()
|
||
|
for {
|
||
|
select {
|
||
|
case <-time.After(time.Second * 10): // 10s读取不到就关闭
|
||
|
return
|
||
|
case message, ok := <-ch:
|
||
|
if !ok {
|
||
|
return
|
||
|
}
|
||
|
msgFromMQV2 := pbmsg.MsgDataToMQ{}
|
||
|
err := proto.Unmarshal(message.Value, &msgFromMQV2)
|
||
|
if err != nil {
|
||
|
log.Printf("kafka msg partition %d offset %d unmarshal failed %s\n", message.Partition, message.Offset, message.Value)
|
||
|
continue
|
||
|
}
|
||
|
if msgFromMQV2.MsgData == nil || msgFromMQV2.OperationID == "" {
|
||
|
continue
|
||
|
}
|
||
|
if msgFromMQV2.MsgData.ContentType < constant.ContentTypeBegin || msgFromMQV2.MsgData.ContentType > constant.AdvancedText {
|
||
|
continue
|
||
|
}
|
||
|
lock.Lock()
|
||
|
_, exist := distinct[msgFromMQV2.MsgData.ClientMsgID]
|
||
|
if !exist {
|
||
|
distinct[msgFromMQV2.MsgData.ClientMsgID] = struct{}{}
|
||
|
}
|
||
|
lock.Unlock()
|
||
|
if exist {
|
||
|
continue
|
||
|
}
|
||
|
msgCh <- &msgFromMQV2
|
||
|
}
|
||
|
}
|
||
|
}(partition)
|
||
|
}
|
||
|
|
||
|
go func() {
|
||
|
kfkWg.Wait()
|
||
|
close(msgCh)
|
||
|
}()
|
||
|
|
||
|
var msgWg sync.WaitGroup
|
||
|
|
||
|
var (
|
||
|
success int64
|
||
|
failed int64
|
||
|
)
|
||
|
for i := 0; i < concurrency; i++ {
|
||
|
msgWg.Add(1)
|
||
|
go func() {
|
||
|
defer msgWg.Done()
|
||
|
for message := range msgCh {
|
||
|
HandlerV2Msg(msgClient, adminUserID, message, &success, &failed)
|
||
|
}
|
||
|
}()
|
||
|
}
|
||
|
|
||
|
msgWg.Wait()
|
||
|
log.Printf("total %d success %d failed %d\n", success+failed, success, failed)
|
||
|
}
|
||
|
|
||
|
func HandlerV2Msg(msgClient msg.MsgClient, adminUserID string, msgFromMQV2 *pbmsg.MsgDataToMQ, success *int64, failed *int64) {
|
||
|
msgData := &sdkws.MsgData{
|
||
|
SendID: msgFromMQV2.MsgData.SendID,
|
||
|
RecvID: msgFromMQV2.MsgData.RecvID,
|
||
|
GroupID: msgFromMQV2.MsgData.GroupID,
|
||
|
ClientMsgID: msgFromMQV2.MsgData.ClientMsgID,
|
||
|
ServerMsgID: msgFromMQV2.MsgData.ServerMsgID,
|
||
|
SenderPlatformID: msgFromMQV2.MsgData.SenderPlatformID,
|
||
|
SenderNickname: msgFromMQV2.MsgData.SenderNickname,
|
||
|
SenderFaceURL: msgFromMQV2.MsgData.SenderFaceURL,
|
||
|
SessionType: msgFromMQV2.MsgData.SessionType,
|
||
|
MsgFrom: msgFromMQV2.MsgData.MsgFrom,
|
||
|
ContentType: msgFromMQV2.MsgData.ContentType,
|
||
|
SendTime: msgFromMQV2.MsgData.SendTime,
|
||
|
CreateTime: msgFromMQV2.MsgData.CreateTime,
|
||
|
Status: msgFromMQV2.MsgData.Status,
|
||
|
IsRead: false,
|
||
|
Options: msgFromMQV2.MsgData.Options,
|
||
|
AtUserIDList: msgFromMQV2.MsgData.AtUserIDList,
|
||
|
AttachedInfo: msgFromMQV2.MsgData.AttachedInfo,
|
||
|
Ex: msgFromMQV2.MsgData.Ex,
|
||
|
}
|
||
|
|
||
|
if msgFromMQV2.MsgData.OfflinePushInfo != nil {
|
||
|
msgData.OfflinePushInfo = &sdkws.OfflinePushInfo{
|
||
|
Title: msgFromMQV2.MsgData.OfflinePushInfo.Title,
|
||
|
Desc: msgFromMQV2.MsgData.OfflinePushInfo.Desc,
|
||
|
Ex: msgFromMQV2.MsgData.OfflinePushInfo.Ex,
|
||
|
IOSPushSound: msgFromMQV2.MsgData.OfflinePushInfo.IOSPushSound,
|
||
|
IOSBadgeCount: msgFromMQV2.MsgData.OfflinePushInfo.IOSBadgeCount,
|
||
|
SignalInfo: "",
|
||
|
}
|
||
|
}
|
||
|
switch msgData.ContentType {
|
||
|
case constant.Text:
|
||
|
data, err := json.Marshal(apistruct.TextElem{
|
||
|
Content: string(msgFromMQV2.MsgData.Content),
|
||
|
})
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
msgData.Content = data
|
||
|
default:
|
||
|
msgData.Content = msgFromMQV2.MsgData.Content
|
||
|
}
|
||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
||
|
defer cancel()
|
||
|
ctx = context.WithValue(context.Background(), constant.OperationID, msgFromMQV2.OperationID)
|
||
|
ctx = context.WithValue(ctx, constant.OpUserID, adminUserID)
|
||
|
|
||
|
resp, err := msgClient.SendMsg(ctx, &msg.SendMsgReq{MsgData: msgData})
|
||
|
if err != nil {
|
||
|
atomic.AddInt64(failed, 1)
|
||
|
log.Printf("send msg %+v failed %s\n", msgData, err)
|
||
|
return
|
||
|
}
|
||
|
atomic.AddInt64(success, 1)
|
||
|
log.Printf("send msg success %+v resp %+v\n", msgData, resp)
|
||
|
}
|