Merge branch 'OpenIMSDK:main' into main

pull/952/head
withchao 2 years ago committed by GitHub
commit c9e1d4413a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -37,7 +37,7 @@ require (
require github.com/google/uuid v1.3.0
require (
github.com/OpenIMSDK/protocol v0.0.14
github.com/OpenIMSDK/protocol v0.0.15
github.com/OpenIMSDK/tools v0.0.14
github.com/aliyun/aliyun-oss-go-sdk v2.2.8+incompatible
github.com/go-redis/redis v6.15.9+incompatible

@ -17,8 +17,8 @@ cloud.google.com/go/storage v1.30.1/go.mod h1:NfxhC0UJE1aXSx7CIIbCf7y9HKT7Biccwk
firebase.google.com/go v3.13.0+incompatible h1:3TdYC3DDi6aHn20qoRkxwGqNgdjtblwVAyRLQwGn/+4=
firebase.google.com/go v3.13.0+incompatible/go.mod h1:xlah6XbEyW6tbfSklcfe5FHJIwjt8toICdV5Wh9ptHs=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/OpenIMSDK/protocol v0.0.14 h1:cvQ3f8MTcyYygAnZ7Exq6zIbvHGCEV0fWdpzjQEDDBQ=
github.com/OpenIMSDK/protocol v0.0.14/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y=
github.com/OpenIMSDK/protocol v0.0.15 h1:KrrvdHH9kFF/tFYL2FXRPAr2e5F5DctSHfHq6MQjUI4=
github.com/OpenIMSDK/protocol v0.0.15/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y=
github.com/OpenIMSDK/tools v0.0.14 h1:WLof/+WxyPyRST+QkoTKubYCiV73uCLiL8pgnpH/yKQ=
github.com/OpenIMSDK/tools v0.0.14/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI=
github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM=

@ -167,9 +167,15 @@ func (m *MessageApi) DeleteMsgPhysical(c *gin.Context) {
func (m *MessageApi) getSendMsgReq(c *gin.Context, req apistruct.SendMsg) (sendMsgReq *msg.SendMsgReq, err error) {
var data interface{}
log.ZDebug(c, "getSendMsgReq", "req", req.Content)
switch req.ContentType {
case constant.Text:
data = apistruct.TextElem{}
text, ok := req.Content["content"].(string)
if !ok {
return nil, errs.ErrArgs.WithDetail("text is not string")
}
data = apistruct.TextContentElem{Content: text}
log.ZDebug(c, "getSendMsgReq", "data", data)
case constant.Picture:
data = apistruct.PictureElem{}
case constant.Voice:
@ -195,7 +201,7 @@ func (m *MessageApi) getSendMsgReq(c *gin.Context, req apistruct.SendMsg) (sendM
if err := mapstructure.WeakDecode(req.Content, &data); err != nil {
return nil, err
}
log.ZDebug(c, "getSendMsgReq", "data", data)
log.ZDebug(c, "getSendMsgReq", "req", req.Content)
if err := m.validate.Struct(data); err != nil {
return nil, err
}

@ -49,6 +49,9 @@ func UpdateGroupInfoMap(ctx context.Context, group *sdkws.GroupInfoForSet) map[s
if group.ApplyMemberFriend != nil {
m["apply_member_friend"] = group.ApplyMemberFriend.Value
}
if group.Ex != nil {
m["ex"] = group.Ex.Value
}
return m
}

@ -76,6 +76,10 @@ type TextElem struct {
Text string `mapstructure:"text" validate:"required"`
}
type TextContentElem struct {
Content string `json:"content" validate:"required"`
}
type RevokeElem struct {
RevokeMsgClientID string `mapstructure:"revokeMsgClientID" validate:"required"`
}

@ -84,6 +84,13 @@ type configStruct struct {
Username string `yaml:"username"`
Password string `yaml:"password"`
Addr []string `yaml:"addr"`
TLS *struct {
CACrt string `yaml:"caCrt"`
ClientCrt string `yaml:"clientCrt"`
ClientKey string `yaml:"clientKey"`
ClientKeyPwd string `yaml:"clientKeyPwd"`
InsecureSkipVerify bool `yaml:"insecureSkipVerify"`
} `yaml:"tls"`
LatestMsgToRedis struct {
Topic string `yaml:"topic"`
} `yaml:"latestMsgToRedis"`

@ -40,6 +40,7 @@ func NewKafkaConsumer(addr []string, topic string) *Consumer {
consumerConfig.Net.SASL.User = config.Config.Kafka.Username
consumerConfig.Net.SASL.Password = config.Config.Kafka.Password
}
SetupTLSConfig(consumerConfig)
consumer, err := sarama.NewConsumer(p.addr, consumerConfig)
if err != nil {
panic(err.Error())

@ -17,6 +17,7 @@ package kafka
import (
"context"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/tools/log"
"github.com/Shopify/sarama"
@ -35,11 +36,17 @@ type MConsumerGroupConfig struct {
}
func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []string, groupID string) *MConsumerGroup {
config := sarama.NewConfig()
config.Version = consumerConfig.KafkaVersion
config.Consumer.Offsets.Initial = consumerConfig.OffsetsInitial
config.Consumer.Return.Errors = consumerConfig.IsReturnErr
consumerGroup, err := sarama.NewConsumerGroup(addrs, groupID, config)
consumerGroupConfig := sarama.NewConfig()
consumerGroupConfig.Version = consumerConfig.KafkaVersion
consumerGroupConfig.Consumer.Offsets.Initial = consumerConfig.OffsetsInitial
consumerGroupConfig.Consumer.Return.Errors = consumerConfig.IsReturnErr
if config.Config.Kafka.Username != "" && config.Config.Kafka.Password != "" {
consumerGroupConfig.Net.SASL.Enable = true
consumerGroupConfig.Net.SASL.User = config.Config.Kafka.Username
consumerGroupConfig.Net.SASL.Password = config.Config.Kafka.Password
}
SetupTLSConfig(consumerGroupConfig)
consumerGroup, err := sarama.NewConsumerGroup(addrs, groupID, consumerGroupConfig)
if err != nil {
panic(err.Error())
}

@ -60,6 +60,7 @@ func NewKafkaProducer(addr []string, topic string) *Producer {
}
p.addr = addr
p.topic = topic
SetupTLSConfig(p.config)
var producer sarama.SyncProducer
var err error
for i := 0; i <= maxRetry; i++ {

@ -0,0 +1,20 @@
package kafka
import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/tls"
"github.com/Shopify/sarama"
)
// SetupTLSConfig set up the TLS config from config file.
func SetupTLSConfig(cfg *sarama.Config) {
if config.Config.Kafka.TLS != nil {
cfg.Net.TLS.Enable = true
cfg.Net.TLS.Config = tls.NewTLSConfig(
config.Config.Kafka.TLS.ClientCrt,
config.Config.Kafka.TLS.ClientKey,
config.Config.Kafka.TLS.CACrt,
[]byte(config.Config.Kafka.TLS.ClientKeyPwd),
)
}
}

@ -0,0 +1,71 @@
package tls
import (
"crypto/tls"
"crypto/x509"
"encoding/pem"
"errors"
"os"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
)
func decryptPEM(data []byte, passphrase []byte) ([]byte, error) {
if len(passphrase) == 0 {
return data, nil
}
b, _ := pem.Decode(data)
d, err := x509.DecryptPEMBlock(b, passphrase)
if err != nil {
return nil, err
}
return pem.EncodeToMemory(&pem.Block{
Type: b.Type,
Bytes: d,
}), nil
}
func readEncryptablePEMBlock(path string, pwd []byte) ([]byte, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, err
}
return decryptPEM(data, pwd)
}
// NewTLSConfig setup the TLS config from general config file.
func NewTLSConfig(clientCertFile, clientKeyFile, caCertFile string, keyPwd []byte) *tls.Config {
tlsConfig := tls.Config{}
if clientCertFile != "" && clientKeyFile != "" {
certPEMBlock, err := os.ReadFile(clientCertFile)
if err != nil {
panic(err)
}
keyPEMBlock, err := readEncryptablePEMBlock(clientKeyFile, keyPwd)
if err != nil {
panic(err)
}
cert, err := tls.X509KeyPair(certPEMBlock, keyPEMBlock)
if err != nil {
panic(err)
}
tlsConfig.Certificates = []tls.Certificate{cert}
}
caCert, err := os.ReadFile(caCertFile)
if err != nil {
panic(err)
}
caCertPool := x509.NewCertPool()
ok := caCertPool.AppendCertsFromPEM(caCert)
if !ok {
panic(errors.New("not a valid CA cert"))
}
tlsConfig.RootCAs = caCertPool
tlsConfig.InsecureSkipVerify = config.Config.Kafka.TLS.InsecureSkipVerify
return &tlsConfig
}

@ -39,6 +39,7 @@ import (
"gorm.io/gorm"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/kafka"
"github.com/minio/minio-go/v7/pkg/credentials"
)
@ -274,6 +275,7 @@ func checkKafka() error {
cfg.Net.SASL.User = config.Config.Kafka.Username
cfg.Net.SASL.Password = config.Config.Kafka.Password
}
kafka.SetupTLSConfig(cfg)
kafkaClient, err := sarama.NewClient(config.Config.Kafka.Addr, cfg)
if err != nil {
return errs.Wrap(err)

Loading…
Cancel
Save