From b4c925e910542524f62da633f06471f2d5539ec5 Mon Sep 17 00:00:00 2001 From: wujunhui <344686925@qq.com> Date: Thu, 24 Jul 2025 15:46:32 +0800 Subject: [PATCH] feat: add end-to-end encryption support --- cmd/openim-rpc/encryption/main.go | 79 ++++ config/discovery.yml | 1 + config/openim-rpc-encryption.yml | 59 +++ internal/rpc/encryption/config.go | 111 ++++++ internal/rpc/encryption/handlers.go | 348 ++++++++++++++++++ internal/rpc/encryption/keys_manager.go | 287 +++++++++++++++ internal/rpc/encryption/server.go | 178 +++++++++ .../rpc/encryption/stores/identity_store.go | 91 +++++ internal/rpc/encryption/stores/interfaces.go | 59 +++ .../rpc/encryption/stores/prekey_store.go | 157 ++++++++ .../encryption/stores/signed_prekey_store.go | 187 ++++++++++ pkg/common/config/config.go | 2 + .../model/signal/signal_identity_keys.go | 67 ++++ .../storage/model/signal/signal_prekeys.go | 80 ++++ .../model/signal/signal_signed_prekeys.go | 80 ++++ start-config.yml | 1 + 16 files changed, 1787 insertions(+) create mode 100644 cmd/openim-rpc/encryption/main.go create mode 100644 config/openim-rpc-encryption.yml create mode 100644 internal/rpc/encryption/config.go create mode 100644 internal/rpc/encryption/handlers.go create mode 100644 internal/rpc/encryption/keys_manager.go create mode 100644 internal/rpc/encryption/server.go create mode 100644 internal/rpc/encryption/stores/identity_store.go create mode 100644 internal/rpc/encryption/stores/interfaces.go create mode 100644 internal/rpc/encryption/stores/prekey_store.go create mode 100644 internal/rpc/encryption/stores/signed_prekey_store.go create mode 100644 pkg/common/storage/model/signal/signal_identity_keys.go create mode 100644 pkg/common/storage/model/signal/signal_prekeys.go create mode 100644 pkg/common/storage/model/signal/signal_signed_prekeys.go diff --git a/cmd/openim-rpc/encryption/main.go b/cmd/openim-rpc/encryption/main.go new file mode 100644 index 000000000..c87a537b9 --- /dev/null +++ b/cmd/openim-rpc/encryption/main.go @@ -0,0 +1,79 @@ +// Copyright © 2024 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "flag" + "fmt" + "log" + "os" + "path/filepath" + + "github.com/openimsdk/open-im-server/v3/internal/rpc/encryption" + "gopkg.in/yaml.v3" +) + +func main() { + var configPath = flag.String("config", "./config", "path to config directory") + flag.Parse() + + // Load encryption service config independently + config, err := loadEncryptionConfig(*configPath) + if err != nil { + log.Fatalf("Failed to load encryption config: %v", err) + } + + // Start the encryption service + ctx := context.Background() + if err := encryption.Start(ctx, config); err != nil { + log.Fatalf("Failed to start encryption service: %v", err) + } +} + +// loadEncryptionConfig loads configuration from multiple files following OpenIM pattern +func loadEncryptionConfig(configDir string) (*encryption.Config, error) { + config := &encryption.Config{} + + // Load main encryption config (only contains rpc, prometheus, and encryption-specific settings) + encryptionConfigFile := filepath.Join(configDir, "openim-rpc-encryption.yml") + if err := loadYAMLFile(encryptionConfigFile, config); err != nil { + return nil, fmt.Errorf("failed to load encryption config: %w", err) + } + + // Load shared MongoDB configuration + mongoConfigFile := filepath.Join(configDir, "mongodb.yml") + if err := loadYAMLFile(mongoConfigFile, &config.MongodbConfig); err != nil { + return nil, fmt.Errorf("failed to load mongodb config: %w", err) + } + + // Load shared Discovery configuration + discoveryConfigFile := filepath.Join(configDir, "discovery.yml") + if err := loadYAMLFile(discoveryConfigFile, &config.Discovery); err != nil { + return nil, fmt.Errorf("failed to load discovery config: %w", err) + } + + return config, nil +} + +// loadYAMLFile loads a YAML file into the given struct +func loadYAMLFile(filename string, out interface{}) error { + data, err := os.ReadFile(filename) + if err != nil { + return err + } + + return yaml.Unmarshal(data, out) +} diff --git a/config/discovery.yml b/config/discovery.yml index 2251dceb7..c0824ef86 100644 --- a/config/discovery.yml +++ b/config/discovery.yml @@ -20,3 +20,4 @@ rpcService: auth: auth-rpc-service conversation: conversation-rpc-service third: third-rpc-service + encryption: encryption-rpc-service diff --git a/config/openim-rpc-encryption.yml b/config/openim-rpc-encryption.yml new file mode 100644 index 000000000..17bc31602 --- /dev/null +++ b/config/openim-rpc-encryption.yml @@ -0,0 +1,59 @@ +# Copyright © 2024 OpenIM. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# RPC Configuration (following OpenIM standard format) +rpc: + # API or other RPCs can access this RPC through this IP; if left blank, the internal network IP is obtained by default + registerIP: + # Listening IP; 0.0.0.0 means both internal and external IPs are listened to, if blank, the internal network IP is automatically obtained by default + listenIP: 0.0.0.0 + # autoSetPorts indicates whether to automatically set the ports + # if you use in kubernetes, set it to false + autoSetPorts: true + # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports + # It will only take effect when autoSetPorts is set to false. + ports: [ 10800 ] + +prometheus: + # Whether to enable prometheus + enable: true + # Prometheus listening ports, must be consistent with the number of rpc.ports + # It will only take effect when autoSetPorts is set to false. + ports: [ 20800 ] + +# Encryption Configuration +encryption: + # Encryption mode: "aes", "signal", "hybrid" + mode: "aes" # Default to AES for compatibility + + # AES encryption settings (existing) + aes: + enabled: true + + # Signal Protocol settings + signal: + enabled: false # Set to true when ready to use Signal Protocol + preKeyBatch: 100 # Max one-time prekeys per upload + keyRotationInterval: "168h" # 7 days + sessionCleanupInterval: "720h" # 30 days + prekeyCleanupInterval: "168h" # 7 days + + # Security settings + maxOneTimePreKeys: 100 + maxSessionsPerDevice: 1000 + + # Validation settings + validateSignatures: true + requireIdentityKeys: true + diff --git a/internal/rpc/encryption/config.go b/internal/rpc/encryption/config.go new file mode 100644 index 000000000..856d8ebe7 --- /dev/null +++ b/internal/rpc/encryption/config.go @@ -0,0 +1,111 @@ +// Copyright © 2024 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package encryption + +import ( + "time" + + "github.com/openimsdk/open-im-server/v3/pkg/common/config" +) + +// PrometheusConfig contains Prometheus configuration +type PrometheusConfig struct { + Enable bool `yaml:"enable"` + Ports []int `yaml:"ports"` +} + +// Config represents the configuration for encryption service +type Config struct { + RpcConfig config.RPC `yaml:"rpc"` + MongodbConfig config.Mongo `yaml:"mongo"` + Discovery config.Discovery `yaml:"discovery"` + PrometheusConfig PrometheusConfig `yaml:"prometheus"` + EncryptionConfig EncryptionConfig `yaml:"encryption"` +} + +// EncryptionConfig contains encryption-specific configuration +type EncryptionConfig struct { + Mode string `yaml:"mode"` // "aes", "signal", "hybrid" + AES AESConfig `yaml:"aes"` + Signal SignalConfig `yaml:"signal"` +} + +// AESConfig contains AES encryption configuration +type AESConfig struct { + Enabled bool `yaml:"enabled"` +} + +// SignalConfig contains Signal Protocol configuration +type SignalConfig struct { + Enabled bool `yaml:"enabled"` + PreKeyBatch int `yaml:"preKeyBatch"` + KeyRotationInterval time.Duration `yaml:"keyRotationInterval"` + SessionCleanupInterval time.Duration `yaml:"sessionCleanupInterval"` + PrekeyCleanupInterval time.Duration `yaml:"prekeyCleanupInterval"` + + // Security settings + MaxOneTimePreKeys int `yaml:"maxOneTimePreKeys"` + MaxSessionsPerDevice int `yaml:"maxSessionsPerDevice"` + + // Validation settings + ValidateSignatures bool `yaml:"validateSignatures"` + RequireIdentityKeys bool `yaml:"requireIdentityKeys"` +} + +// GetEncryptionMode returns the current encryption mode +func (c *Config) GetEncryptionMode() string { + if c.EncryptionConfig.Mode == "" { + return "aes" // default to AES for compatibility + } + return c.EncryptionConfig.Mode +} + +// IsSignalEnabled returns true if Signal Protocol is enabled +func (c *Config) IsSignalEnabled() bool { + return c.EncryptionConfig.Signal.Enabled && + (c.EncryptionConfig.Mode == "signal" || c.EncryptionConfig.Mode == "hybrid") +} + +// IsAESEnabled returns true if AES encryption is enabled +func (c *Config) IsAESEnabled() bool { + return c.EncryptionConfig.AES.Enabled || + c.EncryptionConfig.Mode == "aes" || + c.EncryptionConfig.Mode == "hybrid" +} + +// GetSignalConfig returns Signal Protocol configuration +func (c *Config) GetSignalConfig() *SignalConfig { + // Set defaults if not specified + if c.EncryptionConfig.Signal.PreKeyBatch == 0 { + c.EncryptionConfig.Signal.PreKeyBatch = 100 + } + if c.EncryptionConfig.Signal.KeyRotationInterval == 0 { + c.EncryptionConfig.Signal.KeyRotationInterval = 7 * 24 * time.Hour // 7 days + } + if c.EncryptionConfig.Signal.SessionCleanupInterval == 0 { + c.EncryptionConfig.Signal.SessionCleanupInterval = 30 * 24 * time.Hour // 30 days + } + if c.EncryptionConfig.Signal.PrekeyCleanupInterval == 0 { + c.EncryptionConfig.Signal.PrekeyCleanupInterval = 7 * 24 * time.Hour // 7 days + } + if c.EncryptionConfig.Signal.MaxOneTimePreKeys == 0 { + c.EncryptionConfig.Signal.MaxOneTimePreKeys = 100 + } + if c.EncryptionConfig.Signal.MaxSessionsPerDevice == 0 { + c.EncryptionConfig.Signal.MaxSessionsPerDevice = 1000 + } + + return &c.EncryptionConfig.Signal +} diff --git a/internal/rpc/encryption/handlers.go b/internal/rpc/encryption/handlers.go new file mode 100644 index 000000000..52ade1c18 --- /dev/null +++ b/internal/rpc/encryption/handlers.go @@ -0,0 +1,348 @@ +// Copyright © 2024 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package encryption + +import ( + "encoding/base64" + "net/http" + "strconv" + + "github.com/gin-gonic/gin" + "github.com/openimsdk/tools/log" +) + +// Request/Response structures for HTTP API +type GetPreKeysResponse struct { + IdentityKey *IdentityKeyInfo `json:"identityKey"` + SignedPreKey *SignedPreKeyInfo `json:"signedPreKey"` + OneTimePreKey *PreKeyInfo `json:"oneTimePreKey,omitempty"` + RegistrationID int32 `json:"registrationId"` +} + +type IdentityKeyInfo struct { + IdentityKey string `json:"identityKey"` + RegistrationID int32 `json:"registrationId"` + CreatedTime int64 `json:"createdTime"` +} + +type PreKeyInfo struct { + KeyID uint32 `json:"keyId"` + PublicKey string `json:"publicKey"` +} + +type SignedPreKeyInfo struct { + KeyID uint32 `json:"keyId"` + PublicKey string `json:"publicKey"` + Signature string `json:"signature"` + CreatedTime int64 `json:"createdTime"` +} + +type SetPreKeysRequest struct { + IdentityKey string `json:"identityKey,omitempty"` + SignedPreKey *SignedPreKeyInfo `json:"signedPreKey,omitempty"` + OneTimePreKeys []PreKeyInfo `json:"oneTimePreKeys,omitempty"` + RegistrationID int32 `json:"registrationId,omitempty"` +} + +type APIResponse struct { + Code int `json:"code"` + Message string `json:"message"` + Data interface{} `json:"data,omitempty"` +} + +// GetPreKeys handles GET /api/v1/encryption/prekeys/:user_id/:device_id +func (s *Server) GetPreKeys(c *gin.Context) { + userID := c.Param("user_id") + deviceIDStr := c.Param("device_id") + + deviceID, err := strconv.ParseInt(deviceIDStr, 10, 32) + if err != nil { + c.JSON(http.StatusBadRequest, APIResponse{ + Code: 400, + Message: "Invalid device_id", + }) + return + } + + log.ZInfo(c.Request.Context(), "GetPreKeys", "userID", userID, "deviceID", deviceID) + + // Get identity key + identityKey, err := s.keysManager.GetIdentityKey(c.Request.Context(), userID, int32(deviceID)) + if err != nil { + log.ZError(c.Request.Context(), "failed to get identity key", err) + c.JSON(http.StatusNotFound, APIResponse{ + Code: 404, + Message: "Identity key not found", + }) + return + } + + // Get signed prekey + signedPreKey, err := s.keysManager.GetActiveSignedPreKey(c.Request.Context(), userID, int32(deviceID)) + if err != nil { + log.ZError(c.Request.Context(), "failed to get signed prekey", err) + c.JSON(http.StatusNotFound, APIResponse{ + Code: 404, + Message: "Signed prekey not found", + }) + return + } + + // Get one-time prekey (optional) + oneTimePreKey, err := s.keysManager.GetOneTimePreKey(c.Request.Context(), userID, int32(deviceID)) + if err != nil { + log.ZWarn(c.Request.Context(), "no one-time prekey available", err) + oneTimePreKey = nil + } + + response := &GetPreKeysResponse{ + IdentityKey: &IdentityKeyInfo{ + IdentityKey: base64.StdEncoding.EncodeToString(identityKey.IdentityKey), + RegistrationID: identityKey.RegistrationID, + CreatedTime: identityKey.CreatedTime.Unix(), + }, + SignedPreKey: &SignedPreKeyInfo{ + KeyID: signedPreKey.KeyID, + PublicKey: base64.StdEncoding.EncodeToString(signedPreKey.PublicKey), + Signature: base64.StdEncoding.EncodeToString(signedPreKey.Signature), + CreatedTime: signedPreKey.CreatedTime.Unix(), + }, + RegistrationID: identityKey.RegistrationID, + } + + if oneTimePreKey != nil { + response.OneTimePreKey = &PreKeyInfo{ + KeyID: oneTimePreKey.KeyID, + PublicKey: base64.StdEncoding.EncodeToString(oneTimePreKey.PublicKey), + } + } + + c.JSON(http.StatusOK, APIResponse{ + Code: 0, + Message: "success", + Data: response, + }) +} + +// SetPreKeys handles POST /api/v1/encryption/prekeys/:user_id/:device_id +func (s *Server) SetPreKeys(c *gin.Context) { + userID := c.Param("user_id") + deviceIDStr := c.Param("device_id") + + deviceID, err := strconv.ParseInt(deviceIDStr, 10, 32) + if err != nil { + c.JSON(http.StatusBadRequest, APIResponse{ + Code: 400, + Message: "Invalid device_id", + }) + return + } + + var req SetPreKeysRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, APIResponse{ + Code: 400, + Message: "Invalid request body", + }) + return + } + + log.ZInfo(c.Request.Context(), "SetPreKeys", "userID", userID, "deviceID", deviceID) + + // Set identity key if provided + if req.IdentityKey != "" { + identityKeyBytes, err := base64.StdEncoding.DecodeString(req.IdentityKey) + if err != nil { + c.JSON(http.StatusBadRequest, APIResponse{ + Code: 400, + Message: "Invalid identity key encoding", + }) + return + } + + err = s.keysManager.SetIdentityKey(c.Request.Context(), userID, int32(deviceID), identityKeyBytes, req.RegistrationID) + if err != nil { + log.ZError(c.Request.Context(), "failed to set identity key", err) + c.JSON(http.StatusInternalServerError, APIResponse{ + Code: 500, + Message: "Failed to set identity key", + }) + return + } + } + + // Set signed prekey if provided + if req.SignedPreKey != nil { + publicKeyBytes, err := base64.StdEncoding.DecodeString(req.SignedPreKey.PublicKey) + if err != nil { + c.JSON(http.StatusBadRequest, APIResponse{ + Code: 400, + Message: "Invalid signed prekey public key encoding", + }) + return + } + + signatureBytes, err := base64.StdEncoding.DecodeString(req.SignedPreKey.Signature) + if err != nil { + c.JSON(http.StatusBadRequest, APIResponse{ + Code: 400, + Message: "Invalid signed prekey signature encoding", + }) + return + } + + signedPreKeyData := &SignedPreKeyResponse{ + KeyId: req.SignedPreKey.KeyID, + PublicKey: publicKeyBytes, + Signature: signatureBytes, + } + + err = s.keysManager.SetSignedPreKey(c.Request.Context(), userID, int32(deviceID), signedPreKeyData) + if err != nil { + log.ZError(c.Request.Context(), "failed to set signed prekey", err) + c.JSON(http.StatusInternalServerError, APIResponse{ + Code: 500, + Message: "Failed to set signed prekey", + }) + return + } + } + + // Set one-time prekeys + if len(req.OneTimePreKeys) > 0 { + var preKeyData []*PreKeyResponse + for _, pk := range req.OneTimePreKeys { + publicKeyBytes, err := base64.StdEncoding.DecodeString(pk.PublicKey) + if err != nil { + c.JSON(http.StatusBadRequest, APIResponse{ + Code: 400, + Message: "Invalid one-time prekey public key encoding", + }) + return + } + + preKeyData = append(preKeyData, &PreKeyResponse{ + KeyId: pk.KeyID, + PublicKey: publicKeyBytes, + }) + } + + acceptedCount, err := s.keysManager.SetOneTimePreKeys(c.Request.Context(), userID, int32(deviceID), preKeyData) + if err != nil { + log.ZError(c.Request.Context(), "failed to set one-time prekeys", err) + c.JSON(http.StatusInternalServerError, APIResponse{ + Code: 500, + Message: "Failed to set one-time prekeys", + }) + return + } + + c.JSON(http.StatusOK, APIResponse{ + Code: 0, + Message: "success", + Data: map[string]interface{}{ + "preKeysAccepted": acceptedCount, + }, + }) + return + } + + c.JSON(http.StatusOK, APIResponse{ + Code: 0, + Message: "success", + }) +} + +// GetPreKeyCount handles GET /api/v1/encryption/prekeys/:user_id/:device_id/count +func (s *Server) GetPreKeyCount(c *gin.Context) { + userID := c.Param("user_id") + deviceIDStr := c.Param("device_id") + + deviceID, err := strconv.ParseInt(deviceIDStr, 10, 32) + if err != nil { + c.JSON(http.StatusBadRequest, APIResponse{ + Code: 400, + Message: "Invalid device_id", + }) + return + } + + count, err := s.keysManager.GetPreKeyCount(c.Request.Context(), userID, int32(deviceID)) + if err != nil { + log.ZError(c.Request.Context(), "failed to get prekey count", err) + c.JSON(http.StatusInternalServerError, APIResponse{ + Code: 500, + Message: "Failed to get prekey count", + }) + return + } + + signedPreKeyExists, lastRotation, err := s.keysManager.GetSignedPreKeyInfo(c.Request.Context(), userID, int32(deviceID)) + if err != nil { + signedPreKeyExists = false + } + + data := map[string]interface{}{ + "oneTimePreKeyCount": count, + "signedPreKeyExists": signedPreKeyExists, + } + + if !lastRotation.IsZero() { + data["lastSignedPreKeyRotation"] = lastRotation.Unix() + } + + c.JSON(http.StatusOK, APIResponse{ + Code: 0, + Message: "success", + Data: data, + }) +} + +// GetIdentityKey handles GET /api/v1/encryption/identity/:user_id/:device_id +func (s *Server) GetIdentityKey(c *gin.Context) { + userID := c.Param("user_id") + deviceIDStr := c.Param("device_id") + + deviceID, err := strconv.ParseInt(deviceIDStr, 10, 32) + if err != nil { + c.JSON(http.StatusBadRequest, APIResponse{ + Code: 400, + Message: "Invalid device_id", + }) + return + } + + identityKey, err := s.keysManager.GetIdentityKey(c.Request.Context(), userID, int32(deviceID)) + if err != nil { + log.ZError(c.Request.Context(), "failed to get identity key", err) + c.JSON(http.StatusNotFound, APIResponse{ + Code: 404, + Message: "Identity key not found", + }) + return + } + + response := &IdentityKeyInfo{ + IdentityKey: base64.StdEncoding.EncodeToString(identityKey.IdentityKey), + RegistrationID: identityKey.RegistrationID, + CreatedTime: identityKey.CreatedTime.Unix(), + } + + c.JSON(http.StatusOK, APIResponse{ + Code: 0, + Message: "success", + Data: response, + }) +} \ No newline at end of file diff --git a/internal/rpc/encryption/keys_manager.go b/internal/rpc/encryption/keys_manager.go new file mode 100644 index 000000000..f072e088f --- /dev/null +++ b/internal/rpc/encryption/keys_manager.go @@ -0,0 +1,287 @@ +// Copyright © 2024 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package encryption + +import ( + "context" + "fmt" + "time" + + "github.com/openimsdk/open-im-server/v3/internal/rpc/encryption/stores" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model/signal" + "github.com/openimsdk/tools/log" +) + +const ( + // Maximum number of one-time prekeys that can be uploaded at once + MaxOneTimePreKeys = 100 +) + +type KeysManager struct { + identityStore stores.IdentityStoreInterface + preKeyStore stores.PreKeyStoreInterface + signedPreKeyStore stores.SignedPreKeyStoreInterface +} + +func NewKeysManager( + identityStore stores.IdentityStoreInterface, + preKeyStore stores.PreKeyStoreInterface, + signedPreKeyStore stores.SignedPreKeyStoreInterface, +) *KeysManager { + return &KeysManager{ + identityStore: identityStore, + preKeyStore: preKeyStore, + signedPreKeyStore: signedPreKeyStore, + } +} + +// GetIdentityKey retrieves the identity key for a user/device +func (km *KeysManager) GetIdentityKey(ctx context.Context, userID string, deviceID int32) (*signal.SignalIdentityKey, error) { + return km.identityStore.Get(ctx, userID, deviceID) +} + +// SetIdentityKey sets the identity key for a user/device +func (km *KeysManager) SetIdentityKey(ctx context.Context, userID string, deviceID int32, identityKey []byte, registrationID int32) error { + now := time.Now() + + // Check if identity key already exists + existing, err := km.identityStore.Get(ctx, userID, deviceID) + if err == nil && existing != nil { + // Update existing identity key + existing.IdentityKey = identityKey + existing.RegistrationID = registrationID + existing.UpdatedTime = now + return km.identityStore.Update(ctx, userID, deviceID, existing) + } + + // Create new identity key + identityKeyRecord := &signal.SignalIdentityKey{ + UserID: userID, + DeviceID: deviceID, + IdentityKey: identityKey, + RegistrationID: registrationID, + CreatedTime: now, + UpdatedTime: now, + } + + return km.identityStore.Create(ctx, identityKeyRecord) +} + +// GetActiveSignedPreKey retrieves the active signed prekey for a user/device +func (km *KeysManager) GetActiveSignedPreKey(ctx context.Context, userID string, deviceID int32) (*signal.SignalSignedPreKey, error) { + return km.signedPreKeyStore.GetActive(ctx, userID, deviceID) +} + +// SetSignedPreKey sets a signed prekey for a user/device +func (km *KeysManager) SetSignedPreKey(ctx context.Context, userID string, deviceID int32, signedPreKey *SignedPreKeyResponse) error { + now := time.Now() + + signedPreKeyRecord := &signal.SignalSignedPreKey{ + UserID: userID, + DeviceID: deviceID, + KeyID: signedPreKey.KeyId, + PublicKey: signedPreKey.PublicKey, + Signature: signedPreKey.Signature, + CreatedTime: now, + Active: true, + } + + // Deactivate existing signed prekeys and set this one as active + err := km.signedPreKeyStore.SetActive(ctx, userID, deviceID, signedPreKey.KeyId) + if err != nil { + log.ZWarn(ctx, "failed to deactivate existing signed prekeys", err) + } + + // Create or update the signed prekey + existing, err := km.signedPreKeyStore.GetByKeyID(ctx, userID, deviceID, signedPreKey.KeyId) + if err == nil && existing != nil { + return km.signedPreKeyStore.Update(ctx, userID, deviceID, signedPreKey.KeyId, signedPreKeyRecord) + } + + return km.signedPreKeyStore.Create(ctx, signedPreKeyRecord) +} + +// GetOneTimePreKey retrieves an available one-time prekey and marks it as used +func (km *KeysManager) GetOneTimePreKey(ctx context.Context, userID string, deviceID int32) (*signal.SignalPreKey, error) { + preKey, err := km.preKeyStore.GetAvailable(ctx, userID, deviceID) + if err != nil { + return nil, fmt.Errorf("no available one-time prekey: %w", err) + } + + // Mark the prekey as used + err = km.preKeyStore.MarkUsed(ctx, userID, deviceID, preKey.KeyID) + if err != nil { + log.ZError(ctx, "failed to mark prekey as used", err, "userID", userID, "deviceID", deviceID, "keyID", preKey.KeyID) + // Don't fail the request, but log the error + } + + return preKey, nil +} + +// SetOneTimePreKeys sets multiple one-time prekeys for a user/device +func (km *KeysManager) SetOneTimePreKeys(ctx context.Context, userID string, deviceID int32, preKeys []*PreKeyResponse) (int, error) { + if len(preKeys) > MaxOneTimePreKeys { + return 0, fmt.Errorf("too many one-time prekeys: %d (max: %d)", len(preKeys), MaxOneTimePreKeys) + } + + now := time.Now() + var preKeyRecords []*signal.SignalPreKey + + for _, preKey := range preKeys { + preKeyRecord := &signal.SignalPreKey{ + UserID: userID, + DeviceID: deviceID, + KeyID: preKey.KeyId, + PublicKey: preKey.PublicKey, + Used: false, + CreatedTime: now, + } + preKeyRecords = append(preKeyRecords, preKeyRecord) + } + + err := km.preKeyStore.CreateBatch(ctx, preKeyRecords) + if err != nil { + return 0, fmt.Errorf("failed to create one-time prekeys: %w", err) + } + + return len(preKeyRecords), nil +} + +// GetPreKeyCount returns the count of available one-time prekeys for a user/device +func (km *KeysManager) GetPreKeyCount(ctx context.Context, userID string, deviceID int32) (int64, error) { + return km.preKeyStore.CountAvailable(ctx, userID, deviceID) +} + +// GetSignedPreKeyInfo returns information about signed prekey existence and last rotation +func (km *KeysManager) GetSignedPreKeyInfo(ctx context.Context, userID string, deviceID int32) (exists bool, lastRotation time.Time, err error) { + signedPreKey, err := km.signedPreKeyStore.GetActive(ctx, userID, deviceID) + if err != nil { + return false, time.Time{}, err + } + + if signedPreKey != nil { + return true, signedPreKey.CreatedTime, nil + } + + return false, time.Time{}, nil +} + +// CleanupExpiredKeys removes expired keys from storage +func (km *KeysManager) CleanupExpiredKeys(ctx context.Context) error { + // Cleanup used one-time prekeys older than 7 days + usedPreKeysCleanedCount, err := km.preKeyStore.CleanupUsed(ctx, 7*24*time.Hour) + if err != nil { + log.ZError(ctx, "failed to cleanup used prekeys", err) + } else { + log.ZInfo(ctx, "cleaned up used prekeys", "count", usedPreKeysCleanedCount) + } + + // Cleanup inactive signed prekeys older than 30 days + inactiveSignedPreKeysCount, err := km.signedPreKeyStore.CleanupInactive(ctx, 30*24*time.Hour) + if err != nil { + log.ZError(ctx, "failed to cleanup inactive signed prekeys", err) + } else { + log.ZInfo(ctx, "cleaned up inactive signed prekeys", "count", inactiveSignedPreKeysCount) + } + + return nil +} + +// ValidateSignedPreKey validates the signature of a signed prekey +func (km *KeysManager) ValidateSignedPreKey(ctx context.Context, userID string, deviceID int32, signedPreKey *SignedPreKeyResponse) error { + // Get the identity key to verify the signature + _, err := km.GetIdentityKey(ctx, userID, deviceID) + if err != nil { + return fmt.Errorf("failed to get identity key for signature validation: %w", err) + } + + // TODO: Implement actual signature validation using Signal Protocol + // This would require integrating with the Signal Protocol library + // For now, we'll skip the validation + log.ZInfo(ctx, "signed prekey signature validation skipped (not implemented)", + "userID", userID, "deviceID", deviceID, "keyID", signedPreKey.KeyId) + + return nil +} + +// RotateSignedPreKey creates a new signed prekey and deactivates the old one +func (km *KeysManager) RotateSignedPreKey(ctx context.Context, userID string, deviceID int32, newSignedPreKey *SignedPreKeyResponse) error { + // Validate the new signed prekey + err := km.ValidateSignedPreKey(ctx, userID, deviceID, newSignedPreKey) + if err != nil { + return fmt.Errorf("signed prekey validation failed: %w", err) + } + + // Set the new signed prekey (this will automatically deactivate the old one) + return km.SetSignedPreKey(ctx, userID, deviceID, newSignedPreKey) +} + +// GetPreKeyBundleForUser retrieves all necessary keys for X3DH key agreement +func (km *KeysManager) GetPreKeyBundleForUser(ctx context.Context, userID string, deviceID int32) (map[string]interface{}, error) { + // Get identity key + identityKey, err := km.GetIdentityKey(ctx, userID, deviceID) + if err != nil { + return nil, fmt.Errorf("failed to get identity key: %w", err) + } + + // Get signed prekey + signedPreKey, err := km.GetActiveSignedPreKey(ctx, userID, deviceID) + if err != nil { + return nil, fmt.Errorf("failed to get signed prekey: %w", err) + } + + // Get one-time prekey (optional) + oneTimePreKey, err := km.GetOneTimePreKey(ctx, userID, deviceID) + if err != nil { + log.ZWarn(ctx, "no one-time prekey available", err) + oneTimePreKey = nil + } + + bundle := map[string]interface{}{ + "identityKey": map[string]interface{}{ + "identityKey": identityKey.IdentityKey, + "registrationId": identityKey.RegistrationID, + "createdTime": identityKey.CreatedTime.Unix(), + }, + "signedPreKey": map[string]interface{}{ + "keyId": signedPreKey.KeyID, + "publicKey": signedPreKey.PublicKey, + "signature": signedPreKey.Signature, + "createdTime": signedPreKey.CreatedTime.Unix(), + }, + "registrationId": identityKey.RegistrationID, + } + + if oneTimePreKey != nil { + bundle["oneTimePreKey"] = map[string]interface{}{ + "keyId": oneTimePreKey.KeyID, + "publicKey": oneTimePreKey.PublicKey, + } + } + + return bundle, nil +} + +// Response structures used in key manager +type SignedPreKeyResponse struct { + KeyId uint32 `json:"keyId"` + PublicKey []byte `json:"publicKey"` + Signature []byte `json:"signature"` +} + +type PreKeyResponse struct { + KeyId uint32 `json:"keyId"` + PublicKey []byte `json:"publicKey"` +} diff --git a/internal/rpc/encryption/server.go b/internal/rpc/encryption/server.go new file mode 100644 index 000000000..df21cbac0 --- /dev/null +++ b/internal/rpc/encryption/server.go @@ -0,0 +1,178 @@ +// Copyright © 2024 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package encryption + +import ( + "context" + "fmt" + "net/http" + "time" + + "github.com/gin-gonic/gin" + "github.com/openimsdk/open-im-server/v3/internal/rpc/encryption/stores" + kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discovery" + "github.com/openimsdk/tools/db/mongoutil" + "github.com/openimsdk/tools/discovery" + "github.com/openimsdk/tools/log" +) + +type Server struct { + *Config + keysManager *KeysManager + discoveryConn discovery.Conn + httpServer *http.Server +} + +func Start(ctx context.Context, cfg *Config) error { + log.ZInfo(ctx, "encryption server start") + + // Initialize service registry + client, err := kdisc.NewDiscoveryRegister(&cfg.Discovery, nil) + if err != nil { + return err + } + + // Initialize MongoDB + mongoClient, err := mongoutil.NewMongoDB(ctx, cfg.MongodbConfig.Build()) + if err != nil { + return err + } + + // Get the specific database + db := mongoClient.GetDB() + + // Initialize stores + identityStore := stores.NewIdentityStore(db) + preKeyStore := stores.NewPreKeyStore(db) + signedPreKeyStore := stores.NewSignedPreKeyStore(db) + + // Initialize managers + keysManager := NewKeysManager(identityStore, preKeyStore, signedPreKeyStore) + + server := &Server{ + Config: cfg, + keysManager: keysManager, + discoveryConn: client, + } + + // Setup HTTP server + if err := server.setupHTTPServer(); err != nil { + return err + } + + // Start HTTP server + go func() { + if err := server.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.ZError(ctx, "HTTP server failed to start", err) + } + }() + + log.ZInfo(ctx, "encryption server started successfully", "port", cfg.RpcConfig.Ports[0]) + + // Keep the service running + select { + case <-ctx.Done(): + return server.shutdown(ctx) + } +} + +func (s *Server) setupHTTPServer() error { + gin.SetMode(gin.ReleaseMode) + router := gin.Default() + + // Add middleware + router.Use(gin.Recovery()) + router.Use(s.corsMiddleware()) + router.Use(s.loggingMiddleware()) + + // API routes + api := router.Group("/api/v1/encryption") + { + // Key management endpoints + api.GET("/prekeys/:user_id/:device_id", s.GetPreKeys) + api.POST("/prekeys/:user_id/:device_id", s.SetPreKeys) + api.GET("/prekeys/:user_id/:device_id/count", s.GetPreKeyCount) + api.GET("/identity/:user_id/:device_id", s.GetIdentityKey) + + // Health check + api.GET("/health", s.HealthCheck) + } + + // Create HTTP server + port := fmt.Sprintf(":%d", s.Config.RpcConfig.Ports[0]) + s.httpServer = &http.Server{ + Addr: port, + Handler: router, + ReadTimeout: 30 * time.Second, + WriteTimeout: 30 * time.Second, + } + + return nil +} + +func (s *Server) shutdown(ctx context.Context) error { + log.ZInfo(ctx, "shutting down encryption server") + + // Shutdown HTTP server + if s.httpServer != nil { + shutdownCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + if err := s.httpServer.Shutdown(shutdownCtx); err != nil { + log.ZError(ctx, "failed to shutdown HTTP server", err) + } + } + + return nil +} + +func (s *Server) corsMiddleware() gin.HandlerFunc { + return func(c *gin.Context) { + c.Header("Access-Control-Allow-Origin", "*") + c.Header("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS") + c.Header("Access-Control-Allow-Headers", "Content-Type, Authorization") + + if c.Request.Method == "OPTIONS" { + c.AbortWithStatus(204) + return + } + + c.Next() + } +} + +func (s *Server) loggingMiddleware() gin.HandlerFunc { + return gin.LoggerWithFormatter(func(param gin.LogFormatterParams) string { + return fmt.Sprintf("[%s] %s %s %d %s\n", + param.TimeStamp.Format("2006/01/02 - 15:04:05"), + param.Method, + param.Path, + param.StatusCode, + param.Latency, + ) + }) +} + +// HealthCheck handles GET /api/v1/encryption/health +func (s *Server) HealthCheck(c *gin.Context) { + c.JSON(200, APIResponse{ + Code: 0, + Message: "success", + Data: map[string]interface{}{ + "status": "healthy", + "mode": s.Config.GetEncryptionMode(), + "timestamp": time.Now().Unix(), + }, + }) +} \ No newline at end of file diff --git a/internal/rpc/encryption/stores/identity_store.go b/internal/rpc/encryption/stores/identity_store.go new file mode 100644 index 000000000..fa1e8c4be --- /dev/null +++ b/internal/rpc/encryption/stores/identity_store.go @@ -0,0 +1,91 @@ +// Copyright © 2024 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stores + +import ( + "context" + "fmt" + + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model/signal" + "github.com/openimsdk/tools/db/mongoutil" + "github.com/openimsdk/tools/errs" + "github.com/openimsdk/tools/log" +) + +type IdentityStore struct { + coll *mongo.Collection +} + +func NewIdentityStore(db *mongo.Database) IdentityStoreInterface { + coll := db.Collection(signal.SignalIdentityKeyCollection) + // Create indexes + _, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{ + { + Keys: bson.D{{Key: "user_id", Value: 1}, {Key: "device_id", Value: 1}}, + Options: options.Index().SetUnique(true), + }, + {Keys: bson.D{{Key: "user_id", Value: 1}}}, + }) + if err != nil { + log.ZWarn(context.Background(), "failed to create indexes for identity store", err) + } + + return &IdentityStore{coll: coll} +} + +func (s *IdentityStore) Create(ctx context.Context, identityKey *signal.SignalIdentityKey) error { + return mongoutil.InsertOne(ctx, s.coll, identityKey) +} + +func (s *IdentityStore) Update(ctx context.Context, userID string, deviceID int32, identityKey *signal.SignalIdentityKey) error { + filter := bson.M{"user_id": userID, "device_id": deviceID} + update := bson.M{"$set": identityKey} + return mongoutil.UpdateOne(ctx, s.coll, filter, update, false) +} + +func (s *IdentityStore) Get(ctx context.Context, userID string, deviceID int32) (*signal.SignalIdentityKey, error) { + filter := bson.M{"user_id": userID, "device_id": deviceID} + identityKey, err := mongoutil.FindOne[*signal.SignalIdentityKey](ctx, s.coll, filter) + if err != nil { + if errs.ErrRecordNotFound.Is(err) { + return nil, fmt.Errorf("identity key not found for user %s device %d", userID, deviceID) + } + return nil, err + } + return identityKey, nil +} + +func (s *IdentityStore) Delete(ctx context.Context, userID string, deviceID int32) error { + filter := bson.M{"user_id": userID, "device_id": deviceID} + return mongoutil.DeleteOne(ctx, s.coll, filter) +} + +func (s *IdentityStore) GetByUserID(ctx context.Context, userID string) ([]*signal.SignalIdentityKey, error) { + filter := bson.M{"user_id": userID} + return mongoutil.Find[*signal.SignalIdentityKey](ctx, s.coll, filter) +} + +func (s *IdentityStore) Exists(ctx context.Context, userID string, deviceID int32) (bool, error) { + filter := bson.M{"user_id": userID, "device_id": deviceID} + count, err := s.coll.CountDocuments(ctx, filter) + if err != nil { + return false, err + } + return count > 0, nil +} \ No newline at end of file diff --git a/internal/rpc/encryption/stores/interfaces.go b/internal/rpc/encryption/stores/interfaces.go new file mode 100644 index 000000000..c9396723f --- /dev/null +++ b/internal/rpc/encryption/stores/interfaces.go @@ -0,0 +1,59 @@ +// Copyright © 2024 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stores + +import ( + "context" + "time" + + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model/signal" +) + +// IdentityStoreInterface defines the interface for identity key storage operations +type IdentityStoreInterface interface { + Create(ctx context.Context, identityKey *signal.SignalIdentityKey) error + Update(ctx context.Context, userID string, deviceID int32, identityKey *signal.SignalIdentityKey) error + Get(ctx context.Context, userID string, deviceID int32) (*signal.SignalIdentityKey, error) + Delete(ctx context.Context, userID string, deviceID int32) error + GetByUserID(ctx context.Context, userID string) ([]*signal.SignalIdentityKey, error) + Exists(ctx context.Context, userID string, deviceID int32) (bool, error) +} + +// PreKeyStoreInterface defines the interface for one-time prekey storage operations +type PreKeyStoreInterface interface { + Create(ctx context.Context, prekey *signal.SignalPreKey) error + CreateBatch(ctx context.Context, prekeys []*signal.SignalPreKey) error + GetAvailable(ctx context.Context, userID string, deviceID int32) (*signal.SignalPreKey, error) + MarkUsed(ctx context.Context, userID string, deviceID int32, keyID uint32) error + Delete(ctx context.Context, userID string, deviceID int32, keyID uint32) error + DeleteAllByUserDevice(ctx context.Context, userID string, deviceID int32) error + CountAvailable(ctx context.Context, userID string, deviceID int32) (int64, error) + GetByKeyID(ctx context.Context, userID string, deviceID int32, keyID uint32) (*signal.SignalPreKey, error) + CleanupUsed(ctx context.Context, olderThan time.Duration) (int64, error) +} + +// SignedPreKeyStoreInterface defines the interface for signed prekey storage operations +type SignedPreKeyStoreInterface interface { + Create(ctx context.Context, signedPrekey *signal.SignalSignedPreKey) error + Update(ctx context.Context, userID string, deviceID int32, keyID uint32, signedPrekey *signal.SignalSignedPreKey) error + GetActive(ctx context.Context, userID string, deviceID int32) (*signal.SignalSignedPreKey, error) + GetByKeyID(ctx context.Context, userID string, deviceID int32, keyID uint32) (*signal.SignalSignedPreKey, error) + SetActive(ctx context.Context, userID string, deviceID int32, keyID uint32) error + Delete(ctx context.Context, userID string, deviceID int32, keyID uint32) error + GetAll(ctx context.Context, userID string, deviceID int32) ([]*signal.SignalSignedPreKey, error) + CleanupInactive(ctx context.Context, olderThan time.Duration) (int64, error) + Exists(ctx context.Context, userID string, deviceID int32) (bool, error) +} + diff --git a/internal/rpc/encryption/stores/prekey_store.go b/internal/rpc/encryption/stores/prekey_store.go new file mode 100644 index 000000000..4ce440889 --- /dev/null +++ b/internal/rpc/encryption/stores/prekey_store.go @@ -0,0 +1,157 @@ +// Copyright © 2024 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stores + +import ( + "context" + "time" + + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model/signal" + "github.com/openimsdk/tools/db/mongoutil" + "github.com/openimsdk/tools/log" +) + +type PreKeyStore struct { + coll *mongo.Collection +} + +func NewPreKeyStore(db *mongo.Database) PreKeyStoreInterface { + coll := db.Collection(signal.SignalPreKeyCollection) + // Create indexes + _, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{ + { + Keys: bson.D{{Key: "user_id", Value: 1}, {Key: "device_id", Value: 1}, {Key: "key_id", Value: 1}}, + Options: options.Index().SetUnique(true), + }, + {Keys: bson.D{{Key: "user_id", Value: 1}, {Key: "device_id", Value: 1}}}, + {Keys: bson.D{{Key: "user_id", Value: 1}, {Key: "device_id", Value: 1}, {Key: "used", Value: 1}}}, + {Keys: bson.D{{Key: "created_time", Value: 1}}}, + {Keys: bson.D{{Key: "used_time", Value: 1}}}, + }) + if err != nil { + log.ZWarn(context.Background(), "failed to create indexes for prekey store", err) + } + + return &PreKeyStore{coll: coll} +} + +func (s *PreKeyStore) Create(ctx context.Context, prekey *signal.SignalPreKey) error { + return mongoutil.InsertOne(ctx, s.coll, prekey) +} + +func (s *PreKeyStore) CreateBatch(ctx context.Context, prekeys []*signal.SignalPreKey) error { + if len(prekeys) == 0 { + return nil + } + return mongoutil.InsertMany(ctx, s.coll, prekeys) +} + +func (s *PreKeyStore) GetAvailable(ctx context.Context, userID string, deviceID int32) (*signal.SignalPreKey, error) { + filter := bson.M{ + "user_id": userID, + "device_id": deviceID, + "used": false, + } + + // Get one available prekey, sorted by creation time (FIFO) + opts := options.FindOne().SetSort(bson.D{{Key: "created_time", Value: 1}}) + prekey, err := mongoutil.FindOne[*signal.SignalPreKey](ctx, s.coll, filter, opts) + if err != nil { + return nil, err + } + + return prekey, nil +} + +func (s *PreKeyStore) MarkUsed(ctx context.Context, userID string, deviceID int32, keyID uint32) error { + filter := bson.M{ + "user_id": userID, + "device_id": deviceID, + "key_id": keyID, + } + + now := time.Now() + update := bson.M{ + "$set": bson.M{ + "used": true, + "used_time": now, + }, + } + + return mongoutil.UpdateOne(ctx, s.coll, filter, update, false) +} + +func (s *PreKeyStore) Delete(ctx context.Context, userID string, deviceID int32, keyID uint32) error { + filter := bson.M{ + "user_id": userID, + "device_id": deviceID, + "key_id": keyID, + } + + return mongoutil.DeleteOne(ctx, s.coll, filter) +} + +func (s *PreKeyStore) DeleteAllByUserDevice(ctx context.Context, userID string, deviceID int32) error { + filter := bson.M{ + "user_id": userID, + "device_id": deviceID, + } + + return mongoutil.DeleteMany(ctx, s.coll, filter) +} + +func (s *PreKeyStore) CountAvailable(ctx context.Context, userID string, deviceID int32) (int64, error) { + filter := bson.M{ + "user_id": userID, + "device_id": deviceID, + "used": false, + } + + return s.coll.CountDocuments(ctx, filter) +} + +func (s *PreKeyStore) GetByKeyID(ctx context.Context, userID string, deviceID int32, keyID uint32) (*signal.SignalPreKey, error) { + filter := bson.M{ + "user_id": userID, + "device_id": deviceID, + "key_id": keyID, + } + + prekey, err := mongoutil.FindOne[*signal.SignalPreKey](ctx, s.coll, filter) + if err != nil { + return nil, err + } + + return prekey, nil +} + +func (s *PreKeyStore) CleanupUsed(ctx context.Context, olderThan time.Duration) (int64, error) { + cutoffTime := time.Now().Add(-olderThan) + filter := bson.M{ + "used": true, + "used_time": bson.M{"$lt": cutoffTime}, + } + + result, err := s.coll.DeleteMany(ctx, filter) + if err != nil { + return 0, err + } + + return result.DeletedCount, nil +} \ No newline at end of file diff --git a/internal/rpc/encryption/stores/signed_prekey_store.go b/internal/rpc/encryption/stores/signed_prekey_store.go new file mode 100644 index 000000000..88d379f85 --- /dev/null +++ b/internal/rpc/encryption/stores/signed_prekey_store.go @@ -0,0 +1,187 @@ +// Copyright © 2024 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stores + +import ( + "context" + "time" + + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model/signal" + "github.com/openimsdk/tools/db/mongoutil" + "github.com/openimsdk/tools/log" +) + +type SignedPreKeyStore struct { + coll *mongo.Collection +} + +func NewSignedPreKeyStore(db *mongo.Database) SignedPreKeyStoreInterface { + coll := db.Collection(signal.SignalSignedPreKeyCollection) + // Create indexes + _, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{ + { + Keys: bson.D{{Key: "user_id", Value: 1}, {Key: "device_id", Value: 1}, {Key: "key_id", Value: 1}}, + Options: options.Index().SetUnique(true), + }, + {Keys: bson.D{{Key: "user_id", Value: 1}, {Key: "device_id", Value: 1}}}, + {Keys: bson.D{{Key: "user_id", Value: 1}, {Key: "device_id", Value: 1}, {Key: "active", Value: 1}}}, + {Keys: bson.D{{Key: "active", Value: 1}, {Key: "created_time", Value: 1}}}, + {Keys: bson.D{{Key: "created_time", Value: 1}}}, + }) + if err != nil { + log.ZWarn(context.Background(), "failed to create indexes for signed prekey store", err) + } + + return &SignedPreKeyStore{coll: coll} +} + +func (s *SignedPreKeyStore) Create(ctx context.Context, signedPrekey *signal.SignalSignedPreKey) error { + return mongoutil.InsertOne(ctx, s.coll, signedPrekey) +} + +func (s *SignedPreKeyStore) Update(ctx context.Context, userID string, deviceID int32, keyID uint32, signedPrekey *signal.SignalSignedPreKey) error { + filter := bson.M{ + "user_id": userID, + "device_id": deviceID, + "key_id": keyID, + } + + update := bson.M{"$set": signedPrekey} + return mongoutil.UpdateOne(ctx, s.coll, filter, update, false) +} + +func (s *SignedPreKeyStore) GetActive(ctx context.Context, userID string, deviceID int32) (*signal.SignalSignedPreKey, error) { + filter := bson.M{ + "user_id": userID, + "device_id": deviceID, + "active": true, + } + + // Get the most recent active signed prekey + opts := options.FindOne().SetSort(bson.D{{Key: "created_time", Value: -1}}) + signedPrekey, err := mongoutil.FindOne[*signal.SignalSignedPreKey](ctx, s.coll, filter, opts) + if err != nil { + return nil, err + } + + return signedPrekey, nil +} + +func (s *SignedPreKeyStore) GetByKeyID(ctx context.Context, userID string, deviceID int32, keyID uint32) (*signal.SignalSignedPreKey, error) { + filter := bson.M{ + "user_id": userID, + "device_id": deviceID, + "key_id": keyID, + } + + signedPrekey, err := mongoutil.FindOne[*signal.SignalSignedPreKey](ctx, s.coll, filter) + if err != nil { + return nil, err + } + + return signedPrekey, nil +} + +func (s *SignedPreKeyStore) SetActive(ctx context.Context, userID string, deviceID int32, keyID uint32) error { + session, err := s.coll.Database().Client().StartSession() + if err != nil { + return err + } + defer session.EndSession(ctx) + + // Use transaction to ensure atomicity + _, err = session.WithTransaction(ctx, func(sc mongo.SessionContext) (interface{}, error) { + // First, deactivate all existing signed prekeys for this user/device + deactivateFilter := bson.M{ + "user_id": userID, + "device_id": deviceID, + } + deactivateUpdate := bson.M{ + "$set": bson.M{"active": false}, + } + _, err := s.coll.UpdateMany(sc, deactivateFilter, deactivateUpdate) + if err != nil { + return nil, err + } + + // Then, activate the specified signed prekey + activateFilter := bson.M{ + "user_id": userID, + "device_id": deviceID, + "key_id": keyID, + } + activateUpdate := bson.M{ + "$set": bson.M{"active": true}, + } + return s.coll.UpdateOne(sc, activateFilter, activateUpdate) + }) + + return err +} + +func (s *SignedPreKeyStore) Delete(ctx context.Context, userID string, deviceID int32, keyID uint32) error { + filter := bson.M{ + "user_id": userID, + "device_id": deviceID, + "key_id": keyID, + } + + return mongoutil.DeleteOne(ctx, s.coll, filter) +} + +func (s *SignedPreKeyStore) GetAll(ctx context.Context, userID string, deviceID int32) ([]*signal.SignalSignedPreKey, error) { + filter := bson.M{ + "user_id": userID, + "device_id": deviceID, + } + + // Sort by creation time, newest first + opts := options.Find().SetSort(bson.D{{Key: "created_time", Value: -1}}) + return mongoutil.Find[*signal.SignalSignedPreKey](ctx, s.coll, filter, opts) +} + +func (s *SignedPreKeyStore) CleanupInactive(ctx context.Context, olderThan time.Duration) (int64, error) { + cutoffTime := time.Now().Add(-olderThan) + filter := bson.M{ + "active": false, + "created_time": bson.M{"$lt": cutoffTime}, + } + + result, err := s.coll.DeleteMany(ctx, filter) + if err != nil { + return 0, err + } + + return result.DeletedCount, nil +} + +func (s *SignedPreKeyStore) Exists(ctx context.Context, userID string, deviceID int32) (bool, error) { + filter := bson.M{ + "user_id": userID, + "device_id": deviceID, + "active": true, + } + + count, err := s.coll.CountDocuments(ctx, filter) + if err != nil { + return false, err + } + + return count > 0, nil +} \ No newline at end of file diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 43914310e..503c3eb25 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -385,6 +385,7 @@ type RpcService struct { Auth string `yaml:"auth"` Conversation string `yaml:"conversation"` Third string `yaml:"third"` + Encryption string `yaml:"encryption"` } func (r *RpcService) GetServiceNames() []string { @@ -398,6 +399,7 @@ func (r *RpcService) GetServiceNames() []string { r.Auth, r.Conversation, r.Third, + r.Encryption, } } diff --git a/pkg/common/storage/model/signal/signal_identity_keys.go b/pkg/common/storage/model/signal/signal_identity_keys.go new file mode 100644 index 000000000..0be046a3f --- /dev/null +++ b/pkg/common/storage/model/signal/signal_identity_keys.go @@ -0,0 +1,67 @@ +// Copyright © 2024 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package signal + +import ( + "context" + "time" +) + +const ( + SignalIdentityKeyCollection = "signal_identity_keys" +) + +// SignalIdentityKey represents the identity key for Signal protocol +type SignalIdentityKey struct { + UserID string `bson:"user_id" json:"userID"` + DeviceID int32 `bson:"device_id" json:"deviceID"` + IdentityKey []byte `bson:"identity_key" json:"identityKey"` + RegistrationID int32 `bson:"registration_id" json:"registrationID"` + CreatedTime time.Time `bson:"created_time" json:"createdTime"` + UpdatedTime time.Time `bson:"updated_time" json:"updatedTime"` +} + +type SignalIdentityKeyModelInterface interface { + // Create creates a new identity key record + Create(ctx context.Context, identityKey *SignalIdentityKey) error + + // Update updates an existing identity key + Update(ctx context.Context, userID string, deviceID int32, identityKey *SignalIdentityKey) error + + // Get retrieves an identity key by user ID and device ID + Get(ctx context.Context, userID string, deviceID int32) (*SignalIdentityKey, error) + + // Delete removes an identity key + Delete(ctx context.Context, userID string, deviceID int32) error + + // GetByUserID retrieves all identity keys for a user + GetByUserID(ctx context.Context, userID string) ([]*SignalIdentityKey, error) + + // Exists checks if an identity key exists + Exists(ctx context.Context, userID string, deviceID int32) (bool, error) +} + +func (SignalIdentityKey) TableName() string { + return SignalIdentityKeyCollection +} + +// Indexes returns the indexes for the collection +func (SignalIdentityKey) Indexes() []string { + return []string{ + "user_id", + "user_id_device_id", // compound index for (user_id, device_id) + "created_time", + } +} \ No newline at end of file diff --git a/pkg/common/storage/model/signal/signal_prekeys.go b/pkg/common/storage/model/signal/signal_prekeys.go new file mode 100644 index 000000000..e27cd4662 --- /dev/null +++ b/pkg/common/storage/model/signal/signal_prekeys.go @@ -0,0 +1,80 @@ +// Copyright © 2024 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package signal + +import ( + "context" + "time" +) + +const ( + SignalPreKeyCollection = "signal_prekeys" +) + +// SignalPreKey represents one-time prekeys for Signal protocol +type SignalPreKey struct { + UserID string `bson:"user_id" json:"userID"` + DeviceID int32 `bson:"device_id" json:"deviceID"` + KeyID uint32 `bson:"key_id" json:"keyID"` + PublicKey []byte `bson:"public_key" json:"publicKey"` + Used bool `bson:"used" json:"used"` + CreatedTime time.Time `bson:"created_time" json:"createdTime"` + UsedTime *time.Time `bson:"used_time,omitempty" json:"usedTime,omitempty"` +} + +type SignalPreKeyModelInterface interface { + // Create creates a new prekey record + Create(ctx context.Context, prekey *SignalPreKey) error + + // CreateBatch creates multiple prekey records in batch + CreateBatch(ctx context.Context, prekeys []*SignalPreKey) error + + // GetAvailable retrieves an available (unused) prekey for a user/device + GetAvailable(ctx context.Context, userID string, deviceID int32) (*SignalPreKey, error) + + // MarkUsed marks a prekey as used + MarkUsed(ctx context.Context, userID string, deviceID int32, keyID uint32) error + + // Delete removes a prekey + Delete(ctx context.Context, userID string, deviceID int32, keyID uint32) error + + // DeleteAllByUserDevice removes all prekeys for a user/device + DeleteAllByUserDevice(ctx context.Context, userID string, deviceID int32) error + + // CountAvailable returns the count of available prekeys for a user/device + CountAvailable(ctx context.Context, userID string, deviceID int32) (int64, error) + + // GetByKeyID retrieves a specific prekey by key ID + GetByKeyID(ctx context.Context, userID string, deviceID int32, keyID uint32) (*SignalPreKey, error) + + // CleanupUsed removes used prekeys older than the specified duration + CleanupUsed(ctx context.Context, olderThan time.Duration) (int64, error) +} + +func (SignalPreKey) TableName() string { + return SignalPreKeyCollection +} + +// Indexes returns the indexes for the collection +func (SignalPreKey) Indexes() []string { + return []string{ + "user_id", + "user_id_device_id", // compound index for (user_id, device_id) + "user_id_device_id_key_id", // compound unique index for (user_id, device_id, key_id) + "user_id_device_id_used", // compound index for (user_id, device_id, used) + "used_time", + "created_time", + } +} \ No newline at end of file diff --git a/pkg/common/storage/model/signal/signal_signed_prekeys.go b/pkg/common/storage/model/signal/signal_signed_prekeys.go new file mode 100644 index 000000000..60f62321c --- /dev/null +++ b/pkg/common/storage/model/signal/signal_signed_prekeys.go @@ -0,0 +1,80 @@ +// Copyright © 2024 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package signal + +import ( + "context" + "time" +) + +const ( + SignalSignedPreKeyCollection = "signal_signed_prekeys" +) + +// SignalSignedPreKey represents signed prekeys for Signal protocol +type SignalSignedPreKey struct { + UserID string `bson:"user_id" json:"userID"` + DeviceID int32 `bson:"device_id" json:"deviceID"` + KeyID uint32 `bson:"key_id" json:"keyID"` + PublicKey []byte `bson:"public_key" json:"publicKey"` + Signature []byte `bson:"signature" json:"signature"` + CreatedTime time.Time `bson:"created_time" json:"createdTime"` + Active bool `bson:"active" json:"active"` // Whether this is the active signed prekey +} + +type SignalSignedPreKeyModelInterface interface { + // Create creates a new signed prekey record + Create(ctx context.Context, signedPrekey *SignalSignedPreKey) error + + // Update updates an existing signed prekey (for rotation) + Update(ctx context.Context, userID string, deviceID int32, keyID uint32, signedPrekey *SignalSignedPreKey) error + + // GetActive retrieves the active signed prekey for a user/device + GetActive(ctx context.Context, userID string, deviceID int32) (*SignalSignedPreKey, error) + + // GetByKeyID retrieves a specific signed prekey by key ID + GetByKeyID(ctx context.Context, userID string, deviceID int32, keyID uint32) (*SignalSignedPreKey, error) + + // SetActive marks a signed prekey as active and deactivates others + SetActive(ctx context.Context, userID string, deviceID int32, keyID uint32) error + + // Delete removes a signed prekey + Delete(ctx context.Context, userID string, deviceID int32, keyID uint32) error + + // GetAll retrieves all signed prekeys for a user/device + GetAll(ctx context.Context, userID string, deviceID int32) ([]*SignalSignedPreKey, error) + + // CleanupInactive removes inactive signed prekeys older than the specified duration + CleanupInactive(ctx context.Context, olderThan time.Duration) (int64, error) + + // Exists checks if a signed prekey exists + Exists(ctx context.Context, userID string, deviceID int32) (bool, error) +} + +func (SignalSignedPreKey) TableName() string { + return SignalSignedPreKeyCollection +} + +// Indexes returns the indexes for the collection +func (SignalSignedPreKey) Indexes() []string { + return []string{ + "user_id", + "user_id_device_id", // compound index for (user_id, device_id) + "user_id_device_id_key_id", // compound unique index for (user_id, device_id, key_id) + "user_id_device_id_active", // compound index for (user_id, device_id, active) + "active_created_time", // compound index for (active, created_time) + "created_time", + } +} \ No newline at end of file diff --git a/start-config.yml b/start-config.yml index da959044d..88ca5d022 100644 --- a/start-config.yml +++ b/start-config.yml @@ -11,6 +11,7 @@ serviceBinaries: openim-rpc-friend: 1 openim-rpc-msg: 1 openim-rpc-third: 1 + openim-rpc-encryption: 1 toolBinaries: - check-free-memory - check-component