feat: add openim error handle

pull/1906/head
Xinwei Xiong (cubxxw) 2 years ago
parent 4e3e93cd75
commit 8a7d27eae8

@ -37,10 +37,16 @@ func TestCompressDecompress(t *testing.T) {
// compress
dest, err := compressor.CompressWithPool(src)
if err != nil {
t.Log(err)
}
assert.Equal(t, nil, err)
// decompress
res, err := compressor.DecompressWithPool(dest)
if err != nil {
t.Log(err)
}
assert.Equal(t, nil, err)
// check
@ -60,10 +66,16 @@ func TestCompressDecompressWithConcurrency(t *testing.T) {
// compress
dest, err := compressor.CompressWithPool(src)
if err != nil {
t.Log(err)
}
assert.Equal(t, nil, err)
// decompress
res, err := compressor.DecompressWithPool(dest)
if err != nil {
t.Log(err)
}
assert.Equal(t, nil, err)
// check
@ -99,6 +111,7 @@ func BenchmarkDecompress(b *testing.B) {
compressor := NewGzipCompressor()
comdata, err := compressor.Compress(src)
assert.Equal(b, nil, err)
for i := 0; i < b.N; i++ {

@ -23,14 +23,7 @@ import (
// RunWsAndServer run ws server.
func RunWsAndServer(rpcPort, wsPort, prometheusPort int) error {
fmt.Println(
"start rpc/msg_gateway server, port: ",
rpcPort,
wsPort,
prometheusPort,
", OpenIM version: ",
config.Version,
)
fmt.Println("start rpc/msg_gateway server, port: ", rpcPort, wsPort, prometheusPort, ", OpenIM version: ",config.Version)
longServer, err := NewWsServer(
WithPort(wsPort),
WithMaxConnNum(int64(config.Config.LongConnSvr.WebsocketMaxConnNum)),

@ -74,7 +74,8 @@ func (d *GWebSocket) GenerateLongConn(w http.ResponseWriter, r *http.Request) er
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return err
// The upgrader.Upgrade method usually returns enough error messages to diagnose problems that may occur during the upgrade
return errs.Wrap(err, "GenerateLongConn: WebSocket upgrade failed")
}
d.conn = conn
return nil

@ -20,6 +20,7 @@ import (
"github.com/OpenIMSDK/protocol/push"
"github.com/OpenIMSDK/tools/discoveryregistry"
"github.com/OpenIMSDK/tools/errs"
"github.com/go-playground/validator/v10"
"google.golang.org/protobuf/proto"
@ -119,18 +120,18 @@ func NewGrpcHandler(validate *validator.Validate, client discoveryregistry.SvcDi
func (g GrpcHandler) GetSeq(context context.Context, data *Req) ([]byte, error) {
req := sdkws.GetMaxSeqReq{}
if err := proto.Unmarshal(data.Data, &req); err != nil {
return nil, err
return nil, errs.Wrap(err, "GetSeq: error unmarshaling request")
}
if err := g.validate.Struct(&req); err != nil {
return nil, err
return nil, errs.Wrap(err, "GetSeq: validation failed")
}
resp, err := g.msgRpcClient.GetMaxSeq(context, &req)
if err != nil {
return nil, err
return nil, errs.Wrap(err, "GetSeq: error calling GetMaxSeq on msgRpcClient")
}
c, err := proto.Marshal(resp)
if err != nil {
return nil, err
return nil, errs.Wrap(err, "GetSeq: error marshaling response")
}
return c, nil
}

@ -45,8 +45,13 @@ import (
)
type MsgTransfer struct {
historyCH *OnlineHistoryRedisConsumerHandler // 这个消费者聚合消息, 订阅的topicws2ms_chat, 修改通知发往msg_to_modify topic, 消息存入redis后Incr Redis, 再发消息到ms2pschat topic推送 发消息到msg_to_mongo topic持久化
historyMongoCH *OnlineHistoryMongoConsumerHandler // mongoDB批量插入, 成功后删除redis中消息以及处理删除通知消息删除的 订阅的topic: msg_to_mongo
// This consumer aggregated messages, subscribed to the topic:ws2ms_chat,
// the modification notification is sent to msg_to_modify topic, the message is stored in redis, Incr Redis,
// and then the message is sent to ms2pschat topic for push, and the message is sent to msg_to_mongo topic for persistence
historyCH *OnlineHistoryRedisConsumerHandler
// mongoDB batch insert, delete messages in redis after success,
// and handle the deletion notification message deleted subscriptions topic: msg_to_mongo
historyMongoCH *OnlineHistoryMongoConsumerHandler
ctx context.Context
cancel context.CancelFunc
}
@ -65,6 +70,7 @@ func StartTransfer(prometheusPort int) error {
if err = mongo.CreateMsgIndex(); err != nil {
return err
}
client, err := kdisc.NewDiscoveryRegister(config.Config.Envs.Discovery)
if err != nil {
return err
@ -73,6 +79,7 @@ func StartTransfer(prometheusPort int) error {
if err := client.CreateRpcRootNodes(config.Config.GetServiceNames()); err != nil {
return err
}
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
msgModel := cache.NewMsgCacheModel(rdb)
msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase())
@ -106,7 +113,7 @@ func NewMsgTransfer(msgDatabase controller.CommonMsgDatabase, conversationRpcCli
}
func (m *MsgTransfer) Start(prometheusPort int) error {
fmt.Println("start msg transfer", "prometheusPort:", prometheusPort)
fmt.Println("Start msg transfer", "prometheusPort:", prometheusPort)
if prometheusPort <= 0 {
return errs.Wrap(errors.New("prometheusPort not correct"))
}

@ -155,15 +155,7 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
notStorageNotificationList,
)
if err := och.msgDatabase.MsgToModifyMQ(ctx, msgChannelValue.uniqueKey, conversationIDNotification, modifyMsgList); err != nil {
log.ZError(
ctx,
"msg to modify mq error",
err,
"uniqueKey",
msgChannelValue.uniqueKey,
"modifyMsgList",
modifyMsgList,
)
log.ZError(ctx, "msg to modify mq error", err, "uniqueKey", msgChannelValue.uniqueKey, "modifyMsgList", modifyMsgList)
}
}
}

@ -31,12 +31,7 @@ func url() string {
return config.Config.Callback.CallbackUrl
}
func callbackOfflinePush(
ctx context.Context,
userIDs []string,
msg *sdkws.MsgData,
offlinePushUserIDs *[]string,
) error {
func callbackOfflinePush(ctx context.Context, userIDs []string, msg *sdkws.MsgData, offlinePushUserIDs *[]string) error {
if !config.Config.Callback.CallbackOfflinePush.Enable || msg.ContentType == constant.Typing {
return nil
}
@ -59,10 +54,12 @@ func callbackOfflinePush(
AtUserIDs: msg.AtUserIDList,
Content: GetContent(msg),
}
resp := &callbackstruct.CallbackBeforePushResp{}
if err := http.CallBackPostReturn(ctx, url(), req, resp, config.Config.Callback.CallbackOfflinePush); err != nil {
return err
}
if len(resp.UserIDs) != 0 {
*offlinePushUserIDs = resp.UserIDs
}

@ -25,7 +25,6 @@ import (
"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/log"
"github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
)
@ -102,7 +101,10 @@ func PostReturn(ctx context.Context, url string, header map[string]string, input
return err
}
err = json.Unmarshal(b, output)
return err
if err != nil {
return errs.Wrap(err, "PostReturn: JSON unmarshal failed")
}
return nil
}
func callBackPostReturn(ctx context.Context, url, command string, input interface{}, output callbackstruct.CallbackResp, callbackConfig config.CallBackConfig) error {
@ -127,7 +129,6 @@ func callBackPostReturn(ctx context.Context, url, command string, input interfac
}
if err := output.Parse(); err != nil {
log.ZWarn(ctx, "callback parse failed", err, "url", url, "input", input, "response", string(b))
return err
}
log.ZInfo(ctx, "callback success", "url", url, "input", input, "response", string(b))
return nil

@ -17,9 +17,8 @@ package kafka
import (
"sync"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/IBM/sarama"
"github.com/OpenIMSDK/tools/errs"
)
type Consumer struct {
@ -30,28 +29,33 @@ type Consumer struct {
Consumer sarama.Consumer
}
func NewKafkaConsumer(addr []string, topic string) *Consumer {
p := Consumer{}
p.Topic = topic
p.addr = addr
consumerConfig := sarama.NewConfig()
if config.Config.Kafka.Username != "" && config.Config.Kafka.Password != "" {
consumerConfig.Net.SASL.Enable = true
consumerConfig.Net.SASL.User = config.Config.Kafka.Username
consumerConfig.Net.SASL.Password = config.Config.Kafka.Password
func NewKafkaConsumer(addr []string, topic string, kafkaConfig *sarama.Config) (*Consumer, error) {
p := Consumer{
Topic: topic,
addr: addr,
}
if kafkaConfig.Net.SASL.User != "" && kafkaConfig.Net.SASL.Password != "" {
kafkaConfig.Net.SASL.Enable = true
}
err := SetupTLSConfig(kafkaConfig)
if err != nil {
return nil, err
}
SetupTLSConfig(consumerConfig)
consumer, err := sarama.NewConsumer(p.addr, consumerConfig)
consumer, err := sarama.NewConsumer(p.addr, kafkaConfig)
if err != nil {
panic(err.Error())
return nil, errs.Wrap(err, "NewKafkaConsumer: creating consumer failed")
}
p.Consumer = consumer
partitionList, err := consumer.Partitions(p.Topic)
if err != nil {
panic(err.Error())
return nil, errs.Wrap(err, "NewKafkaConsumer: getting partitions failed")
}
p.PartitionList = partitionList
return &p
return &p, nil
}

@ -94,7 +94,6 @@ func NewKafkaProducer(addr []string, topic string) (*Producer, error) {
}
time.Sleep(1 * time.Second) // Wait before retrying
}
// Panic if unable to create producer after retries
if err != nil {
return nil, errs.Wrap(errors.New("failed to create Kafka producer: " + err.Error()))

@ -20,22 +20,28 @@ import (
"strings"
"github.com/IBM/sarama"
"github.com/OpenIMSDK/tools/errs"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/tls"
)
// SetupTLSConfig set up the TLS config from config file.
func SetupTLSConfig(cfg *sarama.Config) {
func SetupTLSConfig(cfg *sarama.Config) error {
if config.Config.Kafka.TLS != nil {
cfg.Net.TLS.Enable = true
cfg.Net.TLS.Config = tls.NewTLSConfig(
tlsConfig, err := tls.NewTLSConfig(
config.Config.Kafka.TLS.ClientCrt,
config.Config.Kafka.TLS.ClientKey,
config.Config.Kafka.TLS.CACrt,
[]byte(config.Config.Kafka.TLS.ClientKeyPwd),
)
if err != nil {
return errs.Wrap(err, "SetupTLSConfig: failed to set up TLS config")
}
cfg.Net.TLS.Config = tlsConfig
}
return nil
}
// getEnvOrConfig returns the value of the environment variable if it exists,

@ -21,6 +21,7 @@ import (
"errors"
"os"
"github.com/OpenIMSDK/tools/errs"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
)
@ -49,37 +50,41 @@ func readEncryptablePEMBlock(path string, pwd []byte) ([]byte, error) {
}
// NewTLSConfig setup the TLS config from general config file.
func NewTLSConfig(clientCertFile, clientKeyFile, caCertFile string, keyPwd []byte) *tls.Config {
tlsConfig := tls.Config{}
func NewTLSConfig(clientCertFile, clientKeyFile, caCertFile string, keyPwd []byte) (*tls.Config, error) {
var tlsConfig tls.Config
if clientCertFile != "" && clientKeyFile != "" {
certPEMBlock, err := os.ReadFile(clientCertFile)
if err != nil {
panic(err)
return nil, errs.Wrap(err, "NewTLSConfig: failed to read client cert file")
}
keyPEMBlock, err := readEncryptablePEMBlock(clientKeyFile, keyPwd)
if err != nil {
panic(err)
return nil, err
}
cert, err := tls.X509KeyPair(certPEMBlock, keyPEMBlock)
if err != nil {
panic(err)
return nil, errs.Wrap(err, "NewTLSConfig: failed to create X509 key pair")
}
tlsConfig.Certificates = []tls.Certificate{cert}
}
if caCertFile != "" {
caCert, err := os.ReadFile(caCertFile)
if err != nil {
panic(err)
return nil, errs.Wrap(err, "NewTLSConfig: failed to read CA cert file")
}
caCertPool := x509.NewCertPool()
ok := caCertPool.AppendCertsFromPEM(caCert)
if !ok {
panic(errors.New("not a valid CA cert"))
if ok := caCertPool.AppendCertsFromPEM(caCert); !ok {
return nil, errors.New("NewTLSConfig: not a valid CA cert")
}
tlsConfig.RootCAs = caCertPool
}
tlsConfig.InsecureSkipVerify = config.Config.Kafka.TLS.InsecureSkipVerify
return &tlsConfig
return &tlsConfig, nil
}

@ -23,6 +23,7 @@ import (
pbconversation "github.com/OpenIMSDK/protocol/conversation"
"github.com/OpenIMSDK/tools/discoveryregistry"
"github.com/OpenIMSDK/tools/errs"
util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
)
@ -36,7 +37,7 @@ type Conversation struct {
func NewConversation(discov discoveryregistry.SvcDiscoveryRegistry) *Conversation {
conn, err := discov.GetConn(context.Background(), config.Config.RpcRegisterName.OpenImConversationName)
if err != nil {
panic(err)
util.ExitWithError(err)
}
client := pbconversation.NewConversationClient(conn)
return &Conversation{discov: discov, conn: conn, Client: client}

@ -24,6 +24,7 @@ import (
"github.com/OpenIMSDK/tools/discoveryregistry"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
)
type Friend struct {
@ -35,7 +36,7 @@ type Friend struct {
func NewFriend(discov discoveryregistry.SvcDiscoveryRegistry) *Friend {
conn, err := discov.GetConn(context.Background(), config.Config.RpcRegisterName.OpenImFriendName)
if err != nil {
panic(err)
util.ExitWithError(err)
}
client := friend.NewFriendClient(conn)
return &Friend{discov: discov, conn: conn, Client: client}
@ -62,7 +63,7 @@ func (f *FriendRpcClient) GetFriendsInfo(
return
}
// possibleFriendUserID是否在userID的好友中.
// possibleFriendUserID Is PossibleFriendUserId's friends.
func (f *FriendRpcClient) IsFriend(ctx context.Context, possibleFriendUserID, userID string) (bool, error) {
resp, err := f.Client.IsFriend(ctx, &friend.IsFriendReq{UserID1: userID, UserID2: possibleFriendUserID})
if err != nil {

@ -28,6 +28,7 @@ import (
"github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
)
type Group struct {
@ -39,7 +40,7 @@ type Group struct {
func NewGroup(discov discoveryregistry.SvcDiscoveryRegistry) *Group {
conn, err := discov.GetConn(context.Background(), config.Config.RpcRegisterName.OpenImGroupName)
if err != nil {
panic(err)
util.ExitWithError(err)
}
client := group.NewGroupClient(conn)
return &Group{discov: discov, conn: conn, Client: client}

@ -23,6 +23,7 @@ import (
"github.com/OpenIMSDK/tools/discoveryregistry"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
)
type Push struct {
@ -34,7 +35,7 @@ type Push struct {
func NewPush(discov discoveryregistry.SvcDiscoveryRegistry) *Push {
conn, err := discov.GetConn(context.Background(), config.Config.RpcRegisterName.OpenImPushName)
if err != nil {
panic(err)
util.ExitWithError(err)
}
return &Push{
discov: discov,
@ -49,9 +50,6 @@ func NewPushRpcClient(discov discoveryregistry.SvcDiscoveryRegistry) PushRpcClie
return PushRpcClient(*NewPush(discov))
}
func (p *PushRpcClient) DelUserPushToken(
ctx context.Context,
req *push.DelUserPushTokenReq,
) (*push.DelUserPushTokenResp, error) {
func (p *PushRpcClient) DelUserPushToken(ctx context.Context, req *push.DelUserPushTokenReq) (*push.DelUserPushTokenResp, error) {
return p.Client.DelUserPushToken(ctx, req)
}

@ -24,8 +24,10 @@ import (
"github.com/OpenIMSDK/protocol/third"
"github.com/OpenIMSDK/tools/discoveryregistry"
"github.com/OpenIMSDK/tools/errs"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
)
type Third struct {
@ -38,35 +40,42 @@ type Third struct {
func NewThird(discov discoveryregistry.SvcDiscoveryRegistry) *Third {
conn, err := discov.GetConn(context.Background(), config.Config.RpcRegisterName.OpenImThirdName)
if err != nil {
panic(err)
util.ExitWithError(err)
}
client := third.NewThirdClient(conn)
minioClient, err := minioInit()
if err != nil {
panic(err)
util.ExitWithError(err)
}
return &Third{discov: discov, Client: client, conn: conn, MinioClient: minioClient}
}
func minioInit() (*minio.Client, error) {
minioClient := &minio.Client{}
initUrl := config.Config.Object.Minio.Endpoint
minioUrl, err := url.Parse(initUrl)
// Retrieve MinIO configuration details
endpoint := config.Config.Object.Minio.Endpoint
accessKeyID := config.Config.Object.Minio.AccessKeyID
secretAccessKey := config.Config.Object.Minio.SecretAccessKey
// Parse the MinIO URL to determine if the connection should be secure
minioURL, err := url.Parse(endpoint)
if err != nil {
return nil, err
return nil, errs.Wrap(err, "minioInit: failed to parse MinIO endpoint URL")
}
// Determine the security of the connection based on the scheme
secure := minioURL.Scheme == "https"
// Setup MinIO client options
opts := &minio.Options{
Creds: credentials.NewStaticV4(config.Config.Object.Minio.AccessKeyID, config.Config.Object.Minio.SecretAccessKey, ""),
// Region: config.Config.Credential.Minio.Location,
Creds: credentials.NewStaticV4(accessKeyID, secretAccessKey, ""),
Secure: secure,
}
if minioUrl.Scheme == "http" {
opts.Secure = false
} else if minioUrl.Scheme == "https" {
opts.Secure = true
}
minioClient, err = minio.New(minioUrl.Host, opts)
// Initialize MinIO client
minioClient, err := minio.New(minioURL.Host, opts)
if err != nil {
return nil, err
return nil, errs.Wrap(err, "minioInit: failed to create MinIO client")
}
return minioClient, nil
}

@ -19,6 +19,7 @@ import (
"strings"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
"google.golang.org/grpc"
@ -42,7 +43,7 @@ type User struct {
func NewUser(discov discoveryregistry.SvcDiscoveryRegistry) *User {
conn, err := discov.GetConn(context.Background(), config.Config.RpcRegisterName.OpenImUserName)
if err != nil {
panic(err)
util.ExitWithError(err)
}
client := user.NewUserClient(conn)
return &User{Discov: discov, Client: client, conn: conn}

Loading…
Cancel
Save