Merge branch 'OpenIMSDK:main' into data-conversion-kafka

pull/983/head
pluto 2 years ago committed by GitHub
commit 412e1484fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,16 @@
OpenIMSDK/openim-docker:
- source: ./config
dest: ./openim-server/release/config
replace: true
- source: ./docs
dest: ./openim-server/release/docs
replace: true
- source: ./scripts
dest: ./openim-server/release/scripts
replace: true
- source: ./scripts
dest: ./scripts
replace: true
- source: ./Makefile
dest: ./Makefile
replace: true

8
.github/sync.yml vendored

@ -75,7 +75,7 @@ OpenIMSDK/OpenKF:
dest: .github/.codecov.yml dest: .github/.codecov.yml
replace: false replace: false
openim-docker/openim-docker: OpenIMSDK/openim-docker:
- source: ./config - source: ./config
dest: ./openim-server/main/config dest: ./openim-server/main/config
replace: true replace: true
@ -85,6 +85,12 @@ openim-docker/openim-docker:
- source: ./scripts - source: ./scripts
dest: ./openim-server/main/scripts dest: ./openim-server/main/scripts
replace: true replace: true
- source: ./scripts
dest: ./scripts
replace: true
- source: ./Makefile
dest: ./Makefile
replace: true
group: group:
# first groupcommon to all warehouses # first groupcommon to all warehouses

@ -0,0 +1,43 @@
# Copyright © 2023 KubeCub open source community. All rights reserved.
# Licensed under the MIT License (the "License");
# you may not use this file except in compliance with the License.
# https://github.com/BetaHuhn/repo-file-sync-action
name: Synchronize kubecub public code to other repositories
on:
push:
paths:
- scripts/*
- docs/*
- config/*
branches:
- release-v*.*
workflow_dispatch:
jobs:
sync:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Run GitHub File Sync
uses: BetaHuhn/repo-file-sync-action@latest
with:
GH_INSTALLATION_TOKEN: "${{ secrets.BOT_GITHUB_TOKEN }}"
CONFIG_PATH: .github/sync-release.yml
ORIGINAL_MESSAGE: true
SKIP_PR: true
COMMIT_EACH_FILE: false
COMMIT_BODY: "🤖 kubbot to synchronize the warehouse"
GIT_EMAIL: "3293172751ysy@gmail.com"
GIT_USERNAME: "kubbot"
PR_BODY: 👌 kubecub provides automated community services
REVIEWERS: |
kubbot
cubxxw
PR_LABELS: |
file-sync
automerge
ASSIGNEES: |
kubbot

@ -167,9 +167,15 @@ func (m *MessageApi) DeleteMsgPhysical(c *gin.Context) {
func (m *MessageApi) getSendMsgReq(c *gin.Context, req apistruct.SendMsg) (sendMsgReq *msg.SendMsgReq, err error) { func (m *MessageApi) getSendMsgReq(c *gin.Context, req apistruct.SendMsg) (sendMsgReq *msg.SendMsgReq, err error) {
var data interface{} var data interface{}
log.ZDebug(c, "getSendMsgReq", "req", req.Content)
switch req.ContentType { switch req.ContentType {
case constant.Text: case constant.Text:
data = apistruct.TextElem{} text, ok := req.Content["text"].(string)
if !ok {
return nil, errs.ErrArgs.WithDetail("text is not string")
}
data = apistruct.TextContentElem{Content: text}
log.ZDebug(c, "getSendMsgReq", "data", data)
case constant.Picture: case constant.Picture:
data = apistruct.PictureElem{} data = apistruct.PictureElem{}
case constant.Voice: case constant.Voice:
@ -195,7 +201,7 @@ func (m *MessageApi) getSendMsgReq(c *gin.Context, req apistruct.SendMsg) (sendM
if err := mapstructure.WeakDecode(req.Content, &data); err != nil { if err := mapstructure.WeakDecode(req.Content, &data); err != nil {
return nil, err return nil, err
} }
log.ZDebug(c, "getSendMsgReq", "data", data) log.ZDebug(c, "getSendMsgReq", "req", req.Content)
if err := m.validate.Struct(data); err != nil { if err := m.validate.Struct(data); err != nil {
return nil, err return nil, err
} }

@ -45,6 +45,7 @@ type friendServer struct {
blackDatabase controller.BlackDatabase blackDatabase controller.BlackDatabase
userRpcClient *rpcclient.UserRpcClient userRpcClient *rpcclient.UserRpcClient
notificationSender *notification.FriendNotificationSender notificationSender *notification.FriendNotificationSender
conversationRpcClient rpcclient.ConversationRpcClient
RegisterCenter registry.SvcDiscoveryRegistry RegisterCenter registry.SvcDiscoveryRegistry
} }
@ -82,6 +83,7 @@ func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
userRpcClient: &userRpcClient, userRpcClient: &userRpcClient,
notificationSender: notificationSender, notificationSender: notificationSender,
RegisterCenter: client, RegisterCenter: client,
conversationRpcClient: rpcclient.NewConversationRpcClient(client),
}) })
return nil return nil
} }
@ -131,17 +133,22 @@ func (s *friendServer) ImportFriends(
if _, err := s.userRpcClient.GetUsersInfo(ctx, append([]string{req.OwnerUserID}, req.FriendUserIDs...)); err != nil { if _, err := s.userRpcClient.GetUsersInfo(ctx, append([]string{req.OwnerUserID}, req.FriendUserIDs...)); err != nil {
return nil, err return nil, err
} }
if utils.Contain(req.OwnerUserID, req.FriendUserIDs...) { if utils.Contain(req.OwnerUserID, req.FriendUserIDs...) {
return nil, errs.ErrCanNotAddYourself.Wrap() return nil, errs.ErrCanNotAddYourself.Wrap()
} }
if utils.Duplicate(req.FriendUserIDs) { if utils.Duplicate(req.FriendUserIDs) {
return nil, errs.ErrArgs.Wrap("friend userID repeated") return nil, errs.ErrArgs.Wrap("friend userID repeated")
} }
if err := s.friendDatabase.BecomeFriends(ctx, req.OwnerUserID, req.FriendUserIDs, constant.BecomeFriendByImport); err != nil { if err := s.friendDatabase.BecomeFriends(ctx, req.OwnerUserID, req.FriendUserIDs, constant.BecomeFriendByImport); err != nil {
return nil, err return nil, err
} }
for _, userID := range req.FriendUserIDs {
s.notificationSender.FriendApplicationAgreedNotification(ctx, &pbfriend.RespondFriendApplyReq{
FromUserID: req.OwnerUserID,
ToUserID: userID,
HandleResult: constant.FriendResponseAgree,
})
}
return &pbfriend.ImportFriendResp{}, nil return &pbfriend.ImportFriendResp{}, nil
} }

@ -76,6 +76,10 @@ type TextElem struct {
Text string `mapstructure:"text" validate:"required"` Text string `mapstructure:"text" validate:"required"`
} }
type TextContentElem struct {
Content string `json:"content" validate:"required"`
}
type RevokeElem struct { type RevokeElem struct {
RevokeMsgClientID string `mapstructure:"revokeMsgClientID" validate:"required"` RevokeMsgClientID string `mapstructure:"revokeMsgClientID" validate:"required"`
} }

@ -84,6 +84,13 @@ type configStruct struct {
Username string `yaml:"username"` Username string `yaml:"username"`
Password string `yaml:"password"` Password string `yaml:"password"`
Addr []string `yaml:"addr"` Addr []string `yaml:"addr"`
TLS *struct {
CACrt string `yaml:"caCrt"`
ClientCrt string `yaml:"clientCrt"`
ClientKey string `yaml:"clientKey"`
ClientKeyPwd string `yaml:"clientKeyPwd"`
InsecureSkipVerify bool `yaml:"insecureSkipVerify"`
} `yaml:"tls"`
LatestMsgToRedis struct { LatestMsgToRedis struct {
Topic string `yaml:"topic"` Topic string `yaml:"topic"`
} `yaml:"latestMsgToRedis"` } `yaml:"latestMsgToRedis"`

@ -40,6 +40,7 @@ func NewKafkaConsumer(addr []string, topic string) *Consumer {
consumerConfig.Net.SASL.User = config.Config.Kafka.Username consumerConfig.Net.SASL.User = config.Config.Kafka.Username
consumerConfig.Net.SASL.Password = config.Config.Kafka.Password consumerConfig.Net.SASL.Password = config.Config.Kafka.Password
} }
SetupTLSConfig(consumerConfig)
consumer, err := sarama.NewConsumer(p.addr, consumerConfig) consumer, err := sarama.NewConsumer(p.addr, consumerConfig)
if err != nil { if err != nil {
panic(err.Error()) panic(err.Error())

@ -19,6 +19,8 @@ import (
"github.com/OpenIMSDK/tools/log" "github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
) )
@ -35,11 +37,17 @@ type MConsumerGroupConfig struct {
} }
func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []string, groupID string) *MConsumerGroup { func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []string, groupID string) *MConsumerGroup {
config := sarama.NewConfig() consumerGroupConfig := sarama.NewConfig()
config.Version = consumerConfig.KafkaVersion consumerGroupConfig.Version = consumerConfig.KafkaVersion
config.Consumer.Offsets.Initial = consumerConfig.OffsetsInitial consumerGroupConfig.Consumer.Offsets.Initial = consumerConfig.OffsetsInitial
config.Consumer.Return.Errors = consumerConfig.IsReturnErr consumerGroupConfig.Consumer.Return.Errors = consumerConfig.IsReturnErr
consumerGroup, err := sarama.NewConsumerGroup(addrs, groupID, config) if config.Config.Kafka.Username != "" && config.Config.Kafka.Password != "" {
consumerGroupConfig.Net.SASL.Enable = true
consumerGroupConfig.Net.SASL.User = config.Config.Kafka.Username
consumerGroupConfig.Net.SASL.Password = config.Config.Kafka.Password
}
SetupTLSConfig(consumerGroupConfig)
consumerGroup, err := sarama.NewConsumerGroup(addrs, groupID, consumerGroupConfig)
if err != nil { if err != nil {
panic(err.Error()) panic(err.Error())
} }

@ -60,6 +60,7 @@ func NewKafkaProducer(addr []string, topic string) *Producer {
} }
p.addr = addr p.addr = addr
p.topic = topic p.topic = topic
SetupTLSConfig(p.config)
var producer sarama.SyncProducer var producer sarama.SyncProducer
var err error var err error
for i := 0; i <= maxRetry; i++ { for i := 0; i <= maxRetry; i++ {

@ -0,0 +1,21 @@
package kafka
import (
"github.com/Shopify/sarama"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/tls"
)
// SetupTLSConfig set up the TLS config from config file.
func SetupTLSConfig(cfg *sarama.Config) {
if config.Config.Kafka.TLS != nil {
cfg.Net.TLS.Enable = true
cfg.Net.TLS.Config = tls.NewTLSConfig(
config.Config.Kafka.TLS.ClientCrt,
config.Config.Kafka.TLS.ClientKey,
config.Config.Kafka.TLS.CACrt,
[]byte(config.Config.Kafka.TLS.ClientKeyPwd),
)
}
}

@ -0,0 +1,70 @@
package tls
import (
"crypto/tls"
"crypto/x509"
"encoding/pem"
"errors"
"os"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
)
func decryptPEM(data []byte, passphrase []byte) ([]byte, error) {
if len(passphrase) == 0 {
return data, nil
}
b, _ := pem.Decode(data)
d, err := x509.DecryptPEMBlock(b, passphrase)
if err != nil {
return nil, err
}
return pem.EncodeToMemory(&pem.Block{
Type: b.Type,
Bytes: d,
}), nil
}
func readEncryptablePEMBlock(path string, pwd []byte) ([]byte, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, err
}
return decryptPEM(data, pwd)
}
// NewTLSConfig setup the TLS config from general config file.
func NewTLSConfig(clientCertFile, clientKeyFile, caCertFile string, keyPwd []byte) *tls.Config {
tlsConfig := tls.Config{}
if clientCertFile != "" && clientKeyFile != "" {
certPEMBlock, err := os.ReadFile(clientCertFile)
if err != nil {
panic(err)
}
keyPEMBlock, err := readEncryptablePEMBlock(clientKeyFile, keyPwd)
if err != nil {
panic(err)
}
cert, err := tls.X509KeyPair(certPEMBlock, keyPEMBlock)
if err != nil {
panic(err)
}
tlsConfig.Certificates = []tls.Certificate{cert}
}
caCert, err := os.ReadFile(caCertFile)
if err != nil {
panic(err)
}
caCertPool := x509.NewCertPool()
ok := caCertPool.AppendCertsFromPEM(caCert)
if !ok {
panic(errors.New("not a valid CA cert"))
}
tlsConfig.RootCAs = caCertPool
tlsConfig.InsecureSkipVerify = config.Config.Kafka.TLS.InsecureSkipVerify
return &tlsConfig
}

@ -18,6 +18,8 @@ import (
"context" "context"
"fmt" "fmt"
"github.com/OpenIMSDK/Open-IM-Server/pkg/authverify"
"github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/protocol/constant"
pbgroup "github.com/OpenIMSDK/protocol/group" pbgroup "github.com/OpenIMSDK/protocol/group"
"github.com/OpenIMSDK/protocol/sdkws" "github.com/OpenIMSDK/protocol/sdkws"
@ -235,6 +237,14 @@ func (g *GroupNotificationSender) fillOpUser(ctx context.Context, opUser **sdkws
} }
userID := mcontext.GetOpUserID(ctx) userID := mcontext.GetOpUserID(ctx)
if groupID != "" { if groupID != "" {
if authverify.IsManagerUserID(userID) {
*opUser = &sdkws.GroupMemberFullInfo{
GroupID: groupID,
UserID: userID,
RoleLevel: constant.GroupAdmin,
AppMangerLevel: constant.AppAdmin,
}
} else {
member, err := g.db.TakeGroupMember(ctx, groupID, userID) member, err := g.db.TakeGroupMember(ctx, groupID, userID)
if err == nil { if err == nil {
*opUser = g.groupMemberDB2PB(member, 0) *opUser = g.groupMemberDB2PB(member, 0)
@ -242,6 +252,7 @@ func (g *GroupNotificationSender) fillOpUser(ctx context.Context, opUser **sdkws
return err return err
} }
} }
}
user, err := g.getUser(ctx, userID) user, err := g.getUser(ctx, userID)
if err != nil { if err != nil {
return err return err

@ -39,6 +39,7 @@ import (
"gorm.io/gorm" "gorm.io/gorm"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/kafka"
"github.com/minio/minio-go/v7/pkg/credentials" "github.com/minio/minio-go/v7/pkg/credentials"
) )
@ -274,6 +275,7 @@ func checkKafka() error {
cfg.Net.SASL.User = config.Config.Kafka.Username cfg.Net.SASL.User = config.Config.Kafka.Username
cfg.Net.SASL.Password = config.Config.Kafka.Password cfg.Net.SASL.Password = config.Config.Kafka.Password
} }
kafka.SetupTLSConfig(cfg)
kafkaClient, err := sarama.NewClient(config.Config.Kafka.Addr, cfg) kafkaClient, err := sarama.NewClient(config.Config.Kafka.Addr, cfg)
if err != nil { if err != nil {
return errs.Wrap(err) return errs.Wrap(err)

Loading…
Cancel
Save