diff --git a/.github/sync-release.yml b/.github/sync-release.yml new file mode 100644 index 000000000..9d111aefd --- /dev/null +++ b/.github/sync-release.yml @@ -0,0 +1,16 @@ +OpenIMSDK/openim-docker: + - source: ./config + dest: ./openim-server/release/config + replace: true + - source: ./docs + dest: ./openim-server/release/docs + replace: true + - source: ./scripts + dest: ./openim-server/release/scripts + replace: true + - source: ./scripts + dest: ./scripts + replace: true + - source: ./Makefile + dest: ./Makefile + replace: true diff --git a/.github/sync.yml b/.github/sync.yml index 7ffefc79a..ee667d415 100644 --- a/.github/sync.yml +++ b/.github/sync.yml @@ -75,7 +75,7 @@ OpenIMSDK/OpenKF: dest: .github/.codecov.yml replace: false -openim-docker/openim-docker: +OpenIMSDK/openim-docker: - source: ./config dest: ./openim-server/main/config replace: true @@ -85,6 +85,12 @@ openim-docker/openim-docker: - source: ./scripts dest: ./openim-server/main/scripts replace: true + - source: ./scripts + dest: ./scripts + replace: true + - source: ./Makefile + dest: ./Makefile + replace: true group: # first group:common to all warehouses diff --git a/.github/workflows/sync-release.yml b/.github/workflows/sync-release.yml new file mode 100644 index 000000000..839040aff --- /dev/null +++ b/.github/workflows/sync-release.yml @@ -0,0 +1,43 @@ +# Copyright © 2023 KubeCub open source community. All rights reserved. +# Licensed under the MIT License (the "License"); +# you may not use this file except in compliance with the License. + +# https://github.com/BetaHuhn/repo-file-sync-action +name: Synchronize kubecub public code to other repositories +on: + push: + paths: + - scripts/* + - docs/* + - config/* + branches: + - release-v*.* + workflow_dispatch: + +jobs: + sync: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v3 + + - name: Run GitHub File Sync + uses: BetaHuhn/repo-file-sync-action@latest + with: + GH_INSTALLATION_TOKEN: "${{ secrets.BOT_GITHUB_TOKEN }}" + CONFIG_PATH: .github/sync-release.yml + ORIGINAL_MESSAGE: true + SKIP_PR: true + COMMIT_EACH_FILE: false + COMMIT_BODY: "🤖 kubbot to synchronize the warehouse" + GIT_EMAIL: "3293172751ysy@gmail.com" + GIT_USERNAME: "kubbot" + PR_BODY: 👌 kubecub provides automated community services + REVIEWERS: | + kubbot + cubxxw + PR_LABELS: | + file-sync + automerge + ASSIGNEES: | + kubbot diff --git a/internal/api/msg.go b/internal/api/msg.go index 3d6d4bc5d..2bbae9765 100644 --- a/internal/api/msg.go +++ b/internal/api/msg.go @@ -167,9 +167,15 @@ func (m *MessageApi) DeleteMsgPhysical(c *gin.Context) { func (m *MessageApi) getSendMsgReq(c *gin.Context, req apistruct.SendMsg) (sendMsgReq *msg.SendMsgReq, err error) { var data interface{} + log.ZDebug(c, "getSendMsgReq", "req", req.Content) switch req.ContentType { case constant.Text: - data = apistruct.TextElem{} + text, ok := req.Content["text"].(string) + if !ok { + return nil, errs.ErrArgs.WithDetail("text is not string") + } + data = apistruct.TextContentElem{Content: text} + log.ZDebug(c, "getSendMsgReq", "data", data) case constant.Picture: data = apistruct.PictureElem{} case constant.Voice: @@ -195,7 +201,7 @@ func (m *MessageApi) getSendMsgReq(c *gin.Context, req apistruct.SendMsg) (sendM if err := mapstructure.WeakDecode(req.Content, &data); err != nil { return nil, err } - log.ZDebug(c, "getSendMsgReq", "data", data) + log.ZDebug(c, "getSendMsgReq", "req", req.Content) if err := m.validate.Struct(data); err != nil { return nil, err } diff --git a/internal/rpc/friend/friend.go b/internal/rpc/friend/friend.go index 00ea68392..0efcd1d81 100644 --- a/internal/rpc/friend/friend.go +++ b/internal/rpc/friend/friend.go @@ -41,11 +41,12 @@ import ( ) type friendServer struct { - friendDatabase controller.FriendDatabase - blackDatabase controller.BlackDatabase - userRpcClient *rpcclient.UserRpcClient - notificationSender *notification.FriendNotificationSender - RegisterCenter registry.SvcDiscoveryRegistry + friendDatabase controller.FriendDatabase + blackDatabase controller.BlackDatabase + userRpcClient *rpcclient.UserRpcClient + notificationSender *notification.FriendNotificationSender + conversationRpcClient rpcclient.ConversationRpcClient + RegisterCenter registry.SvcDiscoveryRegistry } func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error { @@ -79,9 +80,10 @@ func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error { blackDB, cache.NewBlackCacheRedis(rdb, blackDB, cache.GetDefaultOpt()), ), - userRpcClient: &userRpcClient, - notificationSender: notificationSender, - RegisterCenter: client, + userRpcClient: &userRpcClient, + notificationSender: notificationSender, + RegisterCenter: client, + conversationRpcClient: rpcclient.NewConversationRpcClient(client), }) return nil } @@ -131,17 +133,22 @@ func (s *friendServer) ImportFriends( if _, err := s.userRpcClient.GetUsersInfo(ctx, append([]string{req.OwnerUserID}, req.FriendUserIDs...)); err != nil { return nil, err } - if utils.Contain(req.OwnerUserID, req.FriendUserIDs...) { return nil, errs.ErrCanNotAddYourself.Wrap() } if utils.Duplicate(req.FriendUserIDs) { return nil, errs.ErrArgs.Wrap("friend userID repeated") } - if err := s.friendDatabase.BecomeFriends(ctx, req.OwnerUserID, req.FriendUserIDs, constant.BecomeFriendByImport); err != nil { return nil, err } + for _, userID := range req.FriendUserIDs { + s.notificationSender.FriendApplicationAgreedNotification(ctx, &pbfriend.RespondFriendApplyReq{ + FromUserID: req.OwnerUserID, + ToUserID: userID, + HandleResult: constant.FriendResponseAgree, + }) + } return &pbfriend.ImportFriendResp{}, nil } diff --git a/pkg/apistruct/msg.go b/pkg/apistruct/msg.go index a0c5e1429..d31752f38 100644 --- a/pkg/apistruct/msg.go +++ b/pkg/apistruct/msg.go @@ -76,6 +76,10 @@ type TextElem struct { Text string `mapstructure:"text" validate:"required"` } +type TextContentElem struct { + Content string `json:"content" validate:"required"` +} + type RevokeElem struct { RevokeMsgClientID string `mapstructure:"revokeMsgClientID" validate:"required"` } diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 754753b3b..c6dd41419 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -81,9 +81,16 @@ type configStruct struct { } `yaml:"redis"` Kafka struct { - Username string `yaml:"username"` - Password string `yaml:"password"` - Addr []string `yaml:"addr"` + Username string `yaml:"username"` + Password string `yaml:"password"` + Addr []string `yaml:"addr"` + TLS *struct { + CACrt string `yaml:"caCrt"` + ClientCrt string `yaml:"clientCrt"` + ClientKey string `yaml:"clientKey"` + ClientKeyPwd string `yaml:"clientKeyPwd"` + InsecureSkipVerify bool `yaml:"insecureSkipVerify"` + } `yaml:"tls"` LatestMsgToRedis struct { Topic string `yaml:"topic"` } `yaml:"latestMsgToRedis"` diff --git a/pkg/common/kafka/consumer.go b/pkg/common/kafka/consumer.go index 67bc3977b..e253ec5e0 100644 --- a/pkg/common/kafka/consumer.go +++ b/pkg/common/kafka/consumer.go @@ -40,6 +40,7 @@ func NewKafkaConsumer(addr []string, topic string) *Consumer { consumerConfig.Net.SASL.User = config.Config.Kafka.Username consumerConfig.Net.SASL.Password = config.Config.Kafka.Password } + SetupTLSConfig(consumerConfig) consumer, err := sarama.NewConsumer(p.addr, consumerConfig) if err != nil { panic(err.Error()) diff --git a/pkg/common/kafka/consumer_group.go b/pkg/common/kafka/consumer_group.go index b4bd81660..da62fbe65 100644 --- a/pkg/common/kafka/consumer_group.go +++ b/pkg/common/kafka/consumer_group.go @@ -19,6 +19,8 @@ import ( "github.com/OpenIMSDK/tools/log" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" + "github.com/Shopify/sarama" ) @@ -35,11 +37,17 @@ type MConsumerGroupConfig struct { } func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []string, groupID string) *MConsumerGroup { - config := sarama.NewConfig() - config.Version = consumerConfig.KafkaVersion - config.Consumer.Offsets.Initial = consumerConfig.OffsetsInitial - config.Consumer.Return.Errors = consumerConfig.IsReturnErr - consumerGroup, err := sarama.NewConsumerGroup(addrs, groupID, config) + consumerGroupConfig := sarama.NewConfig() + consumerGroupConfig.Version = consumerConfig.KafkaVersion + consumerGroupConfig.Consumer.Offsets.Initial = consumerConfig.OffsetsInitial + consumerGroupConfig.Consumer.Return.Errors = consumerConfig.IsReturnErr + if config.Config.Kafka.Username != "" && config.Config.Kafka.Password != "" { + consumerGroupConfig.Net.SASL.Enable = true + consumerGroupConfig.Net.SASL.User = config.Config.Kafka.Username + consumerGroupConfig.Net.SASL.Password = config.Config.Kafka.Password + } + SetupTLSConfig(consumerGroupConfig) + consumerGroup, err := sarama.NewConsumerGroup(addrs, groupID, consumerGroupConfig) if err != nil { panic(err.Error()) } diff --git a/pkg/common/kafka/producer.go b/pkg/common/kafka/producer.go index 938757d40..b7ec32714 100644 --- a/pkg/common/kafka/producer.go +++ b/pkg/common/kafka/producer.go @@ -60,6 +60,7 @@ func NewKafkaProducer(addr []string, topic string) *Producer { } p.addr = addr p.topic = topic + SetupTLSConfig(p.config) var producer sarama.SyncProducer var err error for i := 0; i <= maxRetry; i++ { diff --git a/pkg/common/kafka/util.go b/pkg/common/kafka/util.go new file mode 100644 index 000000000..9d5678648 --- /dev/null +++ b/pkg/common/kafka/util.go @@ -0,0 +1,21 @@ +package kafka + +import ( + "github.com/Shopify/sarama" + + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/tls" +) + +// SetupTLSConfig set up the TLS config from config file. +func SetupTLSConfig(cfg *sarama.Config) { + if config.Config.Kafka.TLS != nil { + cfg.Net.TLS.Enable = true + cfg.Net.TLS.Config = tls.NewTLSConfig( + config.Config.Kafka.TLS.ClientCrt, + config.Config.Kafka.TLS.ClientKey, + config.Config.Kafka.TLS.CACrt, + []byte(config.Config.Kafka.TLS.ClientKeyPwd), + ) + } +} diff --git a/pkg/common/tls/tls.go b/pkg/common/tls/tls.go new file mode 100644 index 000000000..5f84f87e3 --- /dev/null +++ b/pkg/common/tls/tls.go @@ -0,0 +1,70 @@ +package tls + +import ( + "crypto/tls" + "crypto/x509" + "encoding/pem" + "errors" + "os" + + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" +) + +func decryptPEM(data []byte, passphrase []byte) ([]byte, error) { + if len(passphrase) == 0 { + return data, nil + } + b, _ := pem.Decode(data) + d, err := x509.DecryptPEMBlock(b, passphrase) + if err != nil { + return nil, err + } + return pem.EncodeToMemory(&pem.Block{ + Type: b.Type, + Bytes: d, + }), nil +} + +func readEncryptablePEMBlock(path string, pwd []byte) ([]byte, error) { + data, err := os.ReadFile(path) + if err != nil { + return nil, err + } + return decryptPEM(data, pwd) +} + +// NewTLSConfig setup the TLS config from general config file. +func NewTLSConfig(clientCertFile, clientKeyFile, caCertFile string, keyPwd []byte) *tls.Config { + tlsConfig := tls.Config{} + + if clientCertFile != "" && clientKeyFile != "" { + certPEMBlock, err := os.ReadFile(clientCertFile) + if err != nil { + panic(err) + } + keyPEMBlock, err := readEncryptablePEMBlock(clientKeyFile, keyPwd) + if err != nil { + panic(err) + } + cert, err := tls.X509KeyPair(certPEMBlock, keyPEMBlock) + if err != nil { + panic(err) + } + tlsConfig.Certificates = []tls.Certificate{cert} + } + + caCert, err := os.ReadFile(caCertFile) + if err != nil { + panic(err) + } + caCertPool := x509.NewCertPool() + ok := caCertPool.AppendCertsFromPEM(caCert) + if !ok { + panic(errors.New("not a valid CA cert")) + } + tlsConfig.RootCAs = caCertPool + + tlsConfig.InsecureSkipVerify = config.Config.Kafka.TLS.InsecureSkipVerify + + return &tlsConfig +} diff --git a/pkg/rpcclient/notification/group.go b/pkg/rpcclient/notification/group.go index 87c657efb..ee62f08b4 100644 --- a/pkg/rpcclient/notification/group.go +++ b/pkg/rpcclient/notification/group.go @@ -18,6 +18,8 @@ import ( "context" "fmt" + "github.com/OpenIMSDK/Open-IM-Server/pkg/authverify" + "github.com/OpenIMSDK/protocol/constant" pbgroup "github.com/OpenIMSDK/protocol/group" "github.com/OpenIMSDK/protocol/sdkws" @@ -235,11 +237,20 @@ func (g *GroupNotificationSender) fillOpUser(ctx context.Context, opUser **sdkws } userID := mcontext.GetOpUserID(ctx) if groupID != "" { - member, err := g.db.TakeGroupMember(ctx, groupID, userID) - if err == nil { - *opUser = g.groupMemberDB2PB(member, 0) - } else if !errs.ErrRecordNotFound.Is(err) { - return err + if authverify.IsManagerUserID(userID) { + *opUser = &sdkws.GroupMemberFullInfo{ + GroupID: groupID, + UserID: userID, + RoleLevel: constant.GroupAdmin, + AppMangerLevel: constant.AppAdmin, + } + } else { + member, err := g.db.TakeGroupMember(ctx, groupID, userID) + if err == nil { + *opUser = g.groupMemberDB2PB(member, 0) + } else if !errs.ErrRecordNotFound.Is(err) { + return err + } } } user, err := g.getUser(ctx, userID) diff --git a/tools/component/component.go b/tools/component/component.go index 295ac44b1..be454f900 100644 --- a/tools/component/component.go +++ b/tools/component/component.go @@ -39,6 +39,7 @@ import ( "gorm.io/gorm" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/kafka" "github.com/minio/minio-go/v7/pkg/credentials" ) @@ -274,6 +275,7 @@ func checkKafka() error { cfg.Net.SASL.User = config.Config.Kafka.Username cfg.Net.SASL.Password = config.Config.Kafka.Password } + kafka.SetupTLSConfig(cfg) kafkaClient, err := sarama.NewClient(config.Config.Kafka.Addr, cfg) if err != nil { return errs.Wrap(err)