feat: add end-to-end encryption support

pull/3491/head
wujunhui 2 months ago
parent 5e813ef079
commit b4c925e910

@ -0,0 +1,79 @@
// Copyright © 2024 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"flag"
"fmt"
"log"
"os"
"path/filepath"
"github.com/openimsdk/open-im-server/v3/internal/rpc/encryption"
"gopkg.in/yaml.v3"
)
func main() {
var configPath = flag.String("config", "./config", "path to config directory")
flag.Parse()
// Load encryption service config independently
config, err := loadEncryptionConfig(*configPath)
if err != nil {
log.Fatalf("Failed to load encryption config: %v", err)
}
// Start the encryption service
ctx := context.Background()
if err := encryption.Start(ctx, config); err != nil {
log.Fatalf("Failed to start encryption service: %v", err)
}
}
// loadEncryptionConfig loads configuration from multiple files following OpenIM pattern
func loadEncryptionConfig(configDir string) (*encryption.Config, error) {
config := &encryption.Config{}
// Load main encryption config (only contains rpc, prometheus, and encryption-specific settings)
encryptionConfigFile := filepath.Join(configDir, "openim-rpc-encryption.yml")
if err := loadYAMLFile(encryptionConfigFile, config); err != nil {
return nil, fmt.Errorf("failed to load encryption config: %w", err)
}
// Load shared MongoDB configuration
mongoConfigFile := filepath.Join(configDir, "mongodb.yml")
if err := loadYAMLFile(mongoConfigFile, &config.MongodbConfig); err != nil {
return nil, fmt.Errorf("failed to load mongodb config: %w", err)
}
// Load shared Discovery configuration
discoveryConfigFile := filepath.Join(configDir, "discovery.yml")
if err := loadYAMLFile(discoveryConfigFile, &config.Discovery); err != nil {
return nil, fmt.Errorf("failed to load discovery config: %w", err)
}
return config, nil
}
// loadYAMLFile loads a YAML file into the given struct
func loadYAMLFile(filename string, out interface{}) error {
data, err := os.ReadFile(filename)
if err != nil {
return err
}
return yaml.Unmarshal(data, out)
}

@ -20,3 +20,4 @@ rpcService:
auth: auth-rpc-service auth: auth-rpc-service
conversation: conversation-rpc-service conversation: conversation-rpc-service
third: third-rpc-service third: third-rpc-service
encryption: encryption-rpc-service

@ -0,0 +1,59 @@
# Copyright © 2024 OpenIM. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# RPC Configuration (following OpenIM standard format)
rpc:
# API or other RPCs can access this RPC through this IP; if left blank, the internal network IP is obtained by default
registerIP:
# Listening IP; 0.0.0.0 means both internal and external IPs are listened to, if blank, the internal network IP is automatically obtained by default
listenIP: 0.0.0.0
# autoSetPorts indicates whether to automatically set the ports
# if you use in kubernetes, set it to false
autoSetPorts: true
# List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports
# It will only take effect when autoSetPorts is set to false.
ports: [ 10800 ]
prometheus:
# Whether to enable prometheus
enable: true
# Prometheus listening ports, must be consistent with the number of rpc.ports
# It will only take effect when autoSetPorts is set to false.
ports: [ 20800 ]
# Encryption Configuration
encryption:
# Encryption mode: "aes", "signal", "hybrid"
mode: "aes" # Default to AES for compatibility
# AES encryption settings (existing)
aes:
enabled: true
# Signal Protocol settings
signal:
enabled: false # Set to true when ready to use Signal Protocol
preKeyBatch: 100 # Max one-time prekeys per upload
keyRotationInterval: "168h" # 7 days
sessionCleanupInterval: "720h" # 30 days
prekeyCleanupInterval: "168h" # 7 days
# Security settings
maxOneTimePreKeys: 100
maxSessionsPerDevice: 1000
# Validation settings
validateSignatures: true
requireIdentityKeys: true

@ -0,0 +1,111 @@
// Copyright © 2024 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package encryption
import (
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
)
// PrometheusConfig contains Prometheus configuration
type PrometheusConfig struct {
Enable bool `yaml:"enable"`
Ports []int `yaml:"ports"`
}
// Config represents the configuration for encryption service
type Config struct {
RpcConfig config.RPC `yaml:"rpc"`
MongodbConfig config.Mongo `yaml:"mongo"`
Discovery config.Discovery `yaml:"discovery"`
PrometheusConfig PrometheusConfig `yaml:"prometheus"`
EncryptionConfig EncryptionConfig `yaml:"encryption"`
}
// EncryptionConfig contains encryption-specific configuration
type EncryptionConfig struct {
Mode string `yaml:"mode"` // "aes", "signal", "hybrid"
AES AESConfig `yaml:"aes"`
Signal SignalConfig `yaml:"signal"`
}
// AESConfig contains AES encryption configuration
type AESConfig struct {
Enabled bool `yaml:"enabled"`
}
// SignalConfig contains Signal Protocol configuration
type SignalConfig struct {
Enabled bool `yaml:"enabled"`
PreKeyBatch int `yaml:"preKeyBatch"`
KeyRotationInterval time.Duration `yaml:"keyRotationInterval"`
SessionCleanupInterval time.Duration `yaml:"sessionCleanupInterval"`
PrekeyCleanupInterval time.Duration `yaml:"prekeyCleanupInterval"`
// Security settings
MaxOneTimePreKeys int `yaml:"maxOneTimePreKeys"`
MaxSessionsPerDevice int `yaml:"maxSessionsPerDevice"`
// Validation settings
ValidateSignatures bool `yaml:"validateSignatures"`
RequireIdentityKeys bool `yaml:"requireIdentityKeys"`
}
// GetEncryptionMode returns the current encryption mode
func (c *Config) GetEncryptionMode() string {
if c.EncryptionConfig.Mode == "" {
return "aes" // default to AES for compatibility
}
return c.EncryptionConfig.Mode
}
// IsSignalEnabled returns true if Signal Protocol is enabled
func (c *Config) IsSignalEnabled() bool {
return c.EncryptionConfig.Signal.Enabled &&
(c.EncryptionConfig.Mode == "signal" || c.EncryptionConfig.Mode == "hybrid")
}
// IsAESEnabled returns true if AES encryption is enabled
func (c *Config) IsAESEnabled() bool {
return c.EncryptionConfig.AES.Enabled ||
c.EncryptionConfig.Mode == "aes" ||
c.EncryptionConfig.Mode == "hybrid"
}
// GetSignalConfig returns Signal Protocol configuration
func (c *Config) GetSignalConfig() *SignalConfig {
// Set defaults if not specified
if c.EncryptionConfig.Signal.PreKeyBatch == 0 {
c.EncryptionConfig.Signal.PreKeyBatch = 100
}
if c.EncryptionConfig.Signal.KeyRotationInterval == 0 {
c.EncryptionConfig.Signal.KeyRotationInterval = 7 * 24 * time.Hour // 7 days
}
if c.EncryptionConfig.Signal.SessionCleanupInterval == 0 {
c.EncryptionConfig.Signal.SessionCleanupInterval = 30 * 24 * time.Hour // 30 days
}
if c.EncryptionConfig.Signal.PrekeyCleanupInterval == 0 {
c.EncryptionConfig.Signal.PrekeyCleanupInterval = 7 * 24 * time.Hour // 7 days
}
if c.EncryptionConfig.Signal.MaxOneTimePreKeys == 0 {
c.EncryptionConfig.Signal.MaxOneTimePreKeys = 100
}
if c.EncryptionConfig.Signal.MaxSessionsPerDevice == 0 {
c.EncryptionConfig.Signal.MaxSessionsPerDevice = 1000
}
return &c.EncryptionConfig.Signal
}

@ -0,0 +1,348 @@
// Copyright © 2024 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package encryption
import (
"encoding/base64"
"net/http"
"strconv"
"github.com/gin-gonic/gin"
"github.com/openimsdk/tools/log"
)
// Request/Response structures for HTTP API
type GetPreKeysResponse struct {
IdentityKey *IdentityKeyInfo `json:"identityKey"`
SignedPreKey *SignedPreKeyInfo `json:"signedPreKey"`
OneTimePreKey *PreKeyInfo `json:"oneTimePreKey,omitempty"`
RegistrationID int32 `json:"registrationId"`
}
type IdentityKeyInfo struct {
IdentityKey string `json:"identityKey"`
RegistrationID int32 `json:"registrationId"`
CreatedTime int64 `json:"createdTime"`
}
type PreKeyInfo struct {
KeyID uint32 `json:"keyId"`
PublicKey string `json:"publicKey"`
}
type SignedPreKeyInfo struct {
KeyID uint32 `json:"keyId"`
PublicKey string `json:"publicKey"`
Signature string `json:"signature"`
CreatedTime int64 `json:"createdTime"`
}
type SetPreKeysRequest struct {
IdentityKey string `json:"identityKey,omitempty"`
SignedPreKey *SignedPreKeyInfo `json:"signedPreKey,omitempty"`
OneTimePreKeys []PreKeyInfo `json:"oneTimePreKeys,omitempty"`
RegistrationID int32 `json:"registrationId,omitempty"`
}
type APIResponse struct {
Code int `json:"code"`
Message string `json:"message"`
Data interface{} `json:"data,omitempty"`
}
// GetPreKeys handles GET /api/v1/encryption/prekeys/:user_id/:device_id
func (s *Server) GetPreKeys(c *gin.Context) {
userID := c.Param("user_id")
deviceIDStr := c.Param("device_id")
deviceID, err := strconv.ParseInt(deviceIDStr, 10, 32)
if err != nil {
c.JSON(http.StatusBadRequest, APIResponse{
Code: 400,
Message: "Invalid device_id",
})
return
}
log.ZInfo(c.Request.Context(), "GetPreKeys", "userID", userID, "deviceID", deviceID)
// Get identity key
identityKey, err := s.keysManager.GetIdentityKey(c.Request.Context(), userID, int32(deviceID))
if err != nil {
log.ZError(c.Request.Context(), "failed to get identity key", err)
c.JSON(http.StatusNotFound, APIResponse{
Code: 404,
Message: "Identity key not found",
})
return
}
// Get signed prekey
signedPreKey, err := s.keysManager.GetActiveSignedPreKey(c.Request.Context(), userID, int32(deviceID))
if err != nil {
log.ZError(c.Request.Context(), "failed to get signed prekey", err)
c.JSON(http.StatusNotFound, APIResponse{
Code: 404,
Message: "Signed prekey not found",
})
return
}
// Get one-time prekey (optional)
oneTimePreKey, err := s.keysManager.GetOneTimePreKey(c.Request.Context(), userID, int32(deviceID))
if err != nil {
log.ZWarn(c.Request.Context(), "no one-time prekey available", err)
oneTimePreKey = nil
}
response := &GetPreKeysResponse{
IdentityKey: &IdentityKeyInfo{
IdentityKey: base64.StdEncoding.EncodeToString(identityKey.IdentityKey),
RegistrationID: identityKey.RegistrationID,
CreatedTime: identityKey.CreatedTime.Unix(),
},
SignedPreKey: &SignedPreKeyInfo{
KeyID: signedPreKey.KeyID,
PublicKey: base64.StdEncoding.EncodeToString(signedPreKey.PublicKey),
Signature: base64.StdEncoding.EncodeToString(signedPreKey.Signature),
CreatedTime: signedPreKey.CreatedTime.Unix(),
},
RegistrationID: identityKey.RegistrationID,
}
if oneTimePreKey != nil {
response.OneTimePreKey = &PreKeyInfo{
KeyID: oneTimePreKey.KeyID,
PublicKey: base64.StdEncoding.EncodeToString(oneTimePreKey.PublicKey),
}
}
c.JSON(http.StatusOK, APIResponse{
Code: 0,
Message: "success",
Data: response,
})
}
// SetPreKeys handles POST /api/v1/encryption/prekeys/:user_id/:device_id
func (s *Server) SetPreKeys(c *gin.Context) {
userID := c.Param("user_id")
deviceIDStr := c.Param("device_id")
deviceID, err := strconv.ParseInt(deviceIDStr, 10, 32)
if err != nil {
c.JSON(http.StatusBadRequest, APIResponse{
Code: 400,
Message: "Invalid device_id",
})
return
}
var req SetPreKeysRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, APIResponse{
Code: 400,
Message: "Invalid request body",
})
return
}
log.ZInfo(c.Request.Context(), "SetPreKeys", "userID", userID, "deviceID", deviceID)
// Set identity key if provided
if req.IdentityKey != "" {
identityKeyBytes, err := base64.StdEncoding.DecodeString(req.IdentityKey)
if err != nil {
c.JSON(http.StatusBadRequest, APIResponse{
Code: 400,
Message: "Invalid identity key encoding",
})
return
}
err = s.keysManager.SetIdentityKey(c.Request.Context(), userID, int32(deviceID), identityKeyBytes, req.RegistrationID)
if err != nil {
log.ZError(c.Request.Context(), "failed to set identity key", err)
c.JSON(http.StatusInternalServerError, APIResponse{
Code: 500,
Message: "Failed to set identity key",
})
return
}
}
// Set signed prekey if provided
if req.SignedPreKey != nil {
publicKeyBytes, err := base64.StdEncoding.DecodeString(req.SignedPreKey.PublicKey)
if err != nil {
c.JSON(http.StatusBadRequest, APIResponse{
Code: 400,
Message: "Invalid signed prekey public key encoding",
})
return
}
signatureBytes, err := base64.StdEncoding.DecodeString(req.SignedPreKey.Signature)
if err != nil {
c.JSON(http.StatusBadRequest, APIResponse{
Code: 400,
Message: "Invalid signed prekey signature encoding",
})
return
}
signedPreKeyData := &SignedPreKeyResponse{
KeyId: req.SignedPreKey.KeyID,
PublicKey: publicKeyBytes,
Signature: signatureBytes,
}
err = s.keysManager.SetSignedPreKey(c.Request.Context(), userID, int32(deviceID), signedPreKeyData)
if err != nil {
log.ZError(c.Request.Context(), "failed to set signed prekey", err)
c.JSON(http.StatusInternalServerError, APIResponse{
Code: 500,
Message: "Failed to set signed prekey",
})
return
}
}
// Set one-time prekeys
if len(req.OneTimePreKeys) > 0 {
var preKeyData []*PreKeyResponse
for _, pk := range req.OneTimePreKeys {
publicKeyBytes, err := base64.StdEncoding.DecodeString(pk.PublicKey)
if err != nil {
c.JSON(http.StatusBadRequest, APIResponse{
Code: 400,
Message: "Invalid one-time prekey public key encoding",
})
return
}
preKeyData = append(preKeyData, &PreKeyResponse{
KeyId: pk.KeyID,
PublicKey: publicKeyBytes,
})
}
acceptedCount, err := s.keysManager.SetOneTimePreKeys(c.Request.Context(), userID, int32(deviceID), preKeyData)
if err != nil {
log.ZError(c.Request.Context(), "failed to set one-time prekeys", err)
c.JSON(http.StatusInternalServerError, APIResponse{
Code: 500,
Message: "Failed to set one-time prekeys",
})
return
}
c.JSON(http.StatusOK, APIResponse{
Code: 0,
Message: "success",
Data: map[string]interface{}{
"preKeysAccepted": acceptedCount,
},
})
return
}
c.JSON(http.StatusOK, APIResponse{
Code: 0,
Message: "success",
})
}
// GetPreKeyCount handles GET /api/v1/encryption/prekeys/:user_id/:device_id/count
func (s *Server) GetPreKeyCount(c *gin.Context) {
userID := c.Param("user_id")
deviceIDStr := c.Param("device_id")
deviceID, err := strconv.ParseInt(deviceIDStr, 10, 32)
if err != nil {
c.JSON(http.StatusBadRequest, APIResponse{
Code: 400,
Message: "Invalid device_id",
})
return
}
count, err := s.keysManager.GetPreKeyCount(c.Request.Context(), userID, int32(deviceID))
if err != nil {
log.ZError(c.Request.Context(), "failed to get prekey count", err)
c.JSON(http.StatusInternalServerError, APIResponse{
Code: 500,
Message: "Failed to get prekey count",
})
return
}
signedPreKeyExists, lastRotation, err := s.keysManager.GetSignedPreKeyInfo(c.Request.Context(), userID, int32(deviceID))
if err != nil {
signedPreKeyExists = false
}
data := map[string]interface{}{
"oneTimePreKeyCount": count,
"signedPreKeyExists": signedPreKeyExists,
}
if !lastRotation.IsZero() {
data["lastSignedPreKeyRotation"] = lastRotation.Unix()
}
c.JSON(http.StatusOK, APIResponse{
Code: 0,
Message: "success",
Data: data,
})
}
// GetIdentityKey handles GET /api/v1/encryption/identity/:user_id/:device_id
func (s *Server) GetIdentityKey(c *gin.Context) {
userID := c.Param("user_id")
deviceIDStr := c.Param("device_id")
deviceID, err := strconv.ParseInt(deviceIDStr, 10, 32)
if err != nil {
c.JSON(http.StatusBadRequest, APIResponse{
Code: 400,
Message: "Invalid device_id",
})
return
}
identityKey, err := s.keysManager.GetIdentityKey(c.Request.Context(), userID, int32(deviceID))
if err != nil {
log.ZError(c.Request.Context(), "failed to get identity key", err)
c.JSON(http.StatusNotFound, APIResponse{
Code: 404,
Message: "Identity key not found",
})
return
}
response := &IdentityKeyInfo{
IdentityKey: base64.StdEncoding.EncodeToString(identityKey.IdentityKey),
RegistrationID: identityKey.RegistrationID,
CreatedTime: identityKey.CreatedTime.Unix(),
}
c.JSON(http.StatusOK, APIResponse{
Code: 0,
Message: "success",
Data: response,
})
}

@ -0,0 +1,287 @@
// Copyright © 2024 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package encryption
import (
"context"
"fmt"
"time"
"github.com/openimsdk/open-im-server/v3/internal/rpc/encryption/stores"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model/signal"
"github.com/openimsdk/tools/log"
)
const (
// Maximum number of one-time prekeys that can be uploaded at once
MaxOneTimePreKeys = 100
)
type KeysManager struct {
identityStore stores.IdentityStoreInterface
preKeyStore stores.PreKeyStoreInterface
signedPreKeyStore stores.SignedPreKeyStoreInterface
}
func NewKeysManager(
identityStore stores.IdentityStoreInterface,
preKeyStore stores.PreKeyStoreInterface,
signedPreKeyStore stores.SignedPreKeyStoreInterface,
) *KeysManager {
return &KeysManager{
identityStore: identityStore,
preKeyStore: preKeyStore,
signedPreKeyStore: signedPreKeyStore,
}
}
// GetIdentityKey retrieves the identity key for a user/device
func (km *KeysManager) GetIdentityKey(ctx context.Context, userID string, deviceID int32) (*signal.SignalIdentityKey, error) {
return km.identityStore.Get(ctx, userID, deviceID)
}
// SetIdentityKey sets the identity key for a user/device
func (km *KeysManager) SetIdentityKey(ctx context.Context, userID string, deviceID int32, identityKey []byte, registrationID int32) error {
now := time.Now()
// Check if identity key already exists
existing, err := km.identityStore.Get(ctx, userID, deviceID)
if err == nil && existing != nil {
// Update existing identity key
existing.IdentityKey = identityKey
existing.RegistrationID = registrationID
existing.UpdatedTime = now
return km.identityStore.Update(ctx, userID, deviceID, existing)
}
// Create new identity key
identityKeyRecord := &signal.SignalIdentityKey{
UserID: userID,
DeviceID: deviceID,
IdentityKey: identityKey,
RegistrationID: registrationID,
CreatedTime: now,
UpdatedTime: now,
}
return km.identityStore.Create(ctx, identityKeyRecord)
}
// GetActiveSignedPreKey retrieves the active signed prekey for a user/device
func (km *KeysManager) GetActiveSignedPreKey(ctx context.Context, userID string, deviceID int32) (*signal.SignalSignedPreKey, error) {
return km.signedPreKeyStore.GetActive(ctx, userID, deviceID)
}
// SetSignedPreKey sets a signed prekey for a user/device
func (km *KeysManager) SetSignedPreKey(ctx context.Context, userID string, deviceID int32, signedPreKey *SignedPreKeyResponse) error {
now := time.Now()
signedPreKeyRecord := &signal.SignalSignedPreKey{
UserID: userID,
DeviceID: deviceID,
KeyID: signedPreKey.KeyId,
PublicKey: signedPreKey.PublicKey,
Signature: signedPreKey.Signature,
CreatedTime: now,
Active: true,
}
// Deactivate existing signed prekeys and set this one as active
err := km.signedPreKeyStore.SetActive(ctx, userID, deviceID, signedPreKey.KeyId)
if err != nil {
log.ZWarn(ctx, "failed to deactivate existing signed prekeys", err)
}
// Create or update the signed prekey
existing, err := km.signedPreKeyStore.GetByKeyID(ctx, userID, deviceID, signedPreKey.KeyId)
if err == nil && existing != nil {
return km.signedPreKeyStore.Update(ctx, userID, deviceID, signedPreKey.KeyId, signedPreKeyRecord)
}
return km.signedPreKeyStore.Create(ctx, signedPreKeyRecord)
}
// GetOneTimePreKey retrieves an available one-time prekey and marks it as used
func (km *KeysManager) GetOneTimePreKey(ctx context.Context, userID string, deviceID int32) (*signal.SignalPreKey, error) {
preKey, err := km.preKeyStore.GetAvailable(ctx, userID, deviceID)
if err != nil {
return nil, fmt.Errorf("no available one-time prekey: %w", err)
}
// Mark the prekey as used
err = km.preKeyStore.MarkUsed(ctx, userID, deviceID, preKey.KeyID)
if err != nil {
log.ZError(ctx, "failed to mark prekey as used", err, "userID", userID, "deviceID", deviceID, "keyID", preKey.KeyID)
// Don't fail the request, but log the error
}
return preKey, nil
}
// SetOneTimePreKeys sets multiple one-time prekeys for a user/device
func (km *KeysManager) SetOneTimePreKeys(ctx context.Context, userID string, deviceID int32, preKeys []*PreKeyResponse) (int, error) {
if len(preKeys) > MaxOneTimePreKeys {
return 0, fmt.Errorf("too many one-time prekeys: %d (max: %d)", len(preKeys), MaxOneTimePreKeys)
}
now := time.Now()
var preKeyRecords []*signal.SignalPreKey
for _, preKey := range preKeys {
preKeyRecord := &signal.SignalPreKey{
UserID: userID,
DeviceID: deviceID,
KeyID: preKey.KeyId,
PublicKey: preKey.PublicKey,
Used: false,
CreatedTime: now,
}
preKeyRecords = append(preKeyRecords, preKeyRecord)
}
err := km.preKeyStore.CreateBatch(ctx, preKeyRecords)
if err != nil {
return 0, fmt.Errorf("failed to create one-time prekeys: %w", err)
}
return len(preKeyRecords), nil
}
// GetPreKeyCount returns the count of available one-time prekeys for a user/device
func (km *KeysManager) GetPreKeyCount(ctx context.Context, userID string, deviceID int32) (int64, error) {
return km.preKeyStore.CountAvailable(ctx, userID, deviceID)
}
// GetSignedPreKeyInfo returns information about signed prekey existence and last rotation
func (km *KeysManager) GetSignedPreKeyInfo(ctx context.Context, userID string, deviceID int32) (exists bool, lastRotation time.Time, err error) {
signedPreKey, err := km.signedPreKeyStore.GetActive(ctx, userID, deviceID)
if err != nil {
return false, time.Time{}, err
}
if signedPreKey != nil {
return true, signedPreKey.CreatedTime, nil
}
return false, time.Time{}, nil
}
// CleanupExpiredKeys removes expired keys from storage
func (km *KeysManager) CleanupExpiredKeys(ctx context.Context) error {
// Cleanup used one-time prekeys older than 7 days
usedPreKeysCleanedCount, err := km.preKeyStore.CleanupUsed(ctx, 7*24*time.Hour)
if err != nil {
log.ZError(ctx, "failed to cleanup used prekeys", err)
} else {
log.ZInfo(ctx, "cleaned up used prekeys", "count", usedPreKeysCleanedCount)
}
// Cleanup inactive signed prekeys older than 30 days
inactiveSignedPreKeysCount, err := km.signedPreKeyStore.CleanupInactive(ctx, 30*24*time.Hour)
if err != nil {
log.ZError(ctx, "failed to cleanup inactive signed prekeys", err)
} else {
log.ZInfo(ctx, "cleaned up inactive signed prekeys", "count", inactiveSignedPreKeysCount)
}
return nil
}
// ValidateSignedPreKey validates the signature of a signed prekey
func (km *KeysManager) ValidateSignedPreKey(ctx context.Context, userID string, deviceID int32, signedPreKey *SignedPreKeyResponse) error {
// Get the identity key to verify the signature
_, err := km.GetIdentityKey(ctx, userID, deviceID)
if err != nil {
return fmt.Errorf("failed to get identity key for signature validation: %w", err)
}
// TODO: Implement actual signature validation using Signal Protocol
// This would require integrating with the Signal Protocol library
// For now, we'll skip the validation
log.ZInfo(ctx, "signed prekey signature validation skipped (not implemented)",
"userID", userID, "deviceID", deviceID, "keyID", signedPreKey.KeyId)
return nil
}
// RotateSignedPreKey creates a new signed prekey and deactivates the old one
func (km *KeysManager) RotateSignedPreKey(ctx context.Context, userID string, deviceID int32, newSignedPreKey *SignedPreKeyResponse) error {
// Validate the new signed prekey
err := km.ValidateSignedPreKey(ctx, userID, deviceID, newSignedPreKey)
if err != nil {
return fmt.Errorf("signed prekey validation failed: %w", err)
}
// Set the new signed prekey (this will automatically deactivate the old one)
return km.SetSignedPreKey(ctx, userID, deviceID, newSignedPreKey)
}
// GetPreKeyBundleForUser retrieves all necessary keys for X3DH key agreement
func (km *KeysManager) GetPreKeyBundleForUser(ctx context.Context, userID string, deviceID int32) (map[string]interface{}, error) {
// Get identity key
identityKey, err := km.GetIdentityKey(ctx, userID, deviceID)
if err != nil {
return nil, fmt.Errorf("failed to get identity key: %w", err)
}
// Get signed prekey
signedPreKey, err := km.GetActiveSignedPreKey(ctx, userID, deviceID)
if err != nil {
return nil, fmt.Errorf("failed to get signed prekey: %w", err)
}
// Get one-time prekey (optional)
oneTimePreKey, err := km.GetOneTimePreKey(ctx, userID, deviceID)
if err != nil {
log.ZWarn(ctx, "no one-time prekey available", err)
oneTimePreKey = nil
}
bundle := map[string]interface{}{
"identityKey": map[string]interface{}{
"identityKey": identityKey.IdentityKey,
"registrationId": identityKey.RegistrationID,
"createdTime": identityKey.CreatedTime.Unix(),
},
"signedPreKey": map[string]interface{}{
"keyId": signedPreKey.KeyID,
"publicKey": signedPreKey.PublicKey,
"signature": signedPreKey.Signature,
"createdTime": signedPreKey.CreatedTime.Unix(),
},
"registrationId": identityKey.RegistrationID,
}
if oneTimePreKey != nil {
bundle["oneTimePreKey"] = map[string]interface{}{
"keyId": oneTimePreKey.KeyID,
"publicKey": oneTimePreKey.PublicKey,
}
}
return bundle, nil
}
// Response structures used in key manager
type SignedPreKeyResponse struct {
KeyId uint32 `json:"keyId"`
PublicKey []byte `json:"publicKey"`
Signature []byte `json:"signature"`
}
type PreKeyResponse struct {
KeyId uint32 `json:"keyId"`
PublicKey []byte `json:"publicKey"`
}

@ -0,0 +1,178 @@
// Copyright © 2024 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package encryption
import (
"context"
"fmt"
"net/http"
"time"
"github.com/gin-gonic/gin"
"github.com/openimsdk/open-im-server/v3/internal/rpc/encryption/stores"
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discovery"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/log"
)
type Server struct {
*Config
keysManager *KeysManager
discoveryConn discovery.Conn
httpServer *http.Server
}
func Start(ctx context.Context, cfg *Config) error {
log.ZInfo(ctx, "encryption server start")
// Initialize service registry
client, err := kdisc.NewDiscoveryRegister(&cfg.Discovery, nil)
if err != nil {
return err
}
// Initialize MongoDB
mongoClient, err := mongoutil.NewMongoDB(ctx, cfg.MongodbConfig.Build())
if err != nil {
return err
}
// Get the specific database
db := mongoClient.GetDB()
// Initialize stores
identityStore := stores.NewIdentityStore(db)
preKeyStore := stores.NewPreKeyStore(db)
signedPreKeyStore := stores.NewSignedPreKeyStore(db)
// Initialize managers
keysManager := NewKeysManager(identityStore, preKeyStore, signedPreKeyStore)
server := &Server{
Config: cfg,
keysManager: keysManager,
discoveryConn: client,
}
// Setup HTTP server
if err := server.setupHTTPServer(); err != nil {
return err
}
// Start HTTP server
go func() {
if err := server.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.ZError(ctx, "HTTP server failed to start", err)
}
}()
log.ZInfo(ctx, "encryption server started successfully", "port", cfg.RpcConfig.Ports[0])
// Keep the service running
select {
case <-ctx.Done():
return server.shutdown(ctx)
}
}
func (s *Server) setupHTTPServer() error {
gin.SetMode(gin.ReleaseMode)
router := gin.Default()
// Add middleware
router.Use(gin.Recovery())
router.Use(s.corsMiddleware())
router.Use(s.loggingMiddleware())
// API routes
api := router.Group("/api/v1/encryption")
{
// Key management endpoints
api.GET("/prekeys/:user_id/:device_id", s.GetPreKeys)
api.POST("/prekeys/:user_id/:device_id", s.SetPreKeys)
api.GET("/prekeys/:user_id/:device_id/count", s.GetPreKeyCount)
api.GET("/identity/:user_id/:device_id", s.GetIdentityKey)
// Health check
api.GET("/health", s.HealthCheck)
}
// Create HTTP server
port := fmt.Sprintf(":%d", s.Config.RpcConfig.Ports[0])
s.httpServer = &http.Server{
Addr: port,
Handler: router,
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
}
return nil
}
func (s *Server) shutdown(ctx context.Context) error {
log.ZInfo(ctx, "shutting down encryption server")
// Shutdown HTTP server
if s.httpServer != nil {
shutdownCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
if err := s.httpServer.Shutdown(shutdownCtx); err != nil {
log.ZError(ctx, "failed to shutdown HTTP server", err)
}
}
return nil
}
func (s *Server) corsMiddleware() gin.HandlerFunc {
return func(c *gin.Context) {
c.Header("Access-Control-Allow-Origin", "*")
c.Header("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS")
c.Header("Access-Control-Allow-Headers", "Content-Type, Authorization")
if c.Request.Method == "OPTIONS" {
c.AbortWithStatus(204)
return
}
c.Next()
}
}
func (s *Server) loggingMiddleware() gin.HandlerFunc {
return gin.LoggerWithFormatter(func(param gin.LogFormatterParams) string {
return fmt.Sprintf("[%s] %s %s %d %s\n",
param.TimeStamp.Format("2006/01/02 - 15:04:05"),
param.Method,
param.Path,
param.StatusCode,
param.Latency,
)
})
}
// HealthCheck handles GET /api/v1/encryption/health
func (s *Server) HealthCheck(c *gin.Context) {
c.JSON(200, APIResponse{
Code: 0,
Message: "success",
Data: map[string]interface{}{
"status": "healthy",
"mode": s.Config.GetEncryptionMode(),
"timestamp": time.Now().Unix(),
},
})
}

@ -0,0 +1,91 @@
// Copyright © 2024 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package stores
import (
"context"
"fmt"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model/signal"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
)
type IdentityStore struct {
coll *mongo.Collection
}
func NewIdentityStore(db *mongo.Database) IdentityStoreInterface {
coll := db.Collection(signal.SignalIdentityKeyCollection)
// Create indexes
_, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
{
Keys: bson.D{{Key: "user_id", Value: 1}, {Key: "device_id", Value: 1}},
Options: options.Index().SetUnique(true),
},
{Keys: bson.D{{Key: "user_id", Value: 1}}},
})
if err != nil {
log.ZWarn(context.Background(), "failed to create indexes for identity store", err)
}
return &IdentityStore{coll: coll}
}
func (s *IdentityStore) Create(ctx context.Context, identityKey *signal.SignalIdentityKey) error {
return mongoutil.InsertOne(ctx, s.coll, identityKey)
}
func (s *IdentityStore) Update(ctx context.Context, userID string, deviceID int32, identityKey *signal.SignalIdentityKey) error {
filter := bson.M{"user_id": userID, "device_id": deviceID}
update := bson.M{"$set": identityKey}
return mongoutil.UpdateOne(ctx, s.coll, filter, update, false)
}
func (s *IdentityStore) Get(ctx context.Context, userID string, deviceID int32) (*signal.SignalIdentityKey, error) {
filter := bson.M{"user_id": userID, "device_id": deviceID}
identityKey, err := mongoutil.FindOne[*signal.SignalIdentityKey](ctx, s.coll, filter)
if err != nil {
if errs.ErrRecordNotFound.Is(err) {
return nil, fmt.Errorf("identity key not found for user %s device %d", userID, deviceID)
}
return nil, err
}
return identityKey, nil
}
func (s *IdentityStore) Delete(ctx context.Context, userID string, deviceID int32) error {
filter := bson.M{"user_id": userID, "device_id": deviceID}
return mongoutil.DeleteOne(ctx, s.coll, filter)
}
func (s *IdentityStore) GetByUserID(ctx context.Context, userID string) ([]*signal.SignalIdentityKey, error) {
filter := bson.M{"user_id": userID}
return mongoutil.Find[*signal.SignalIdentityKey](ctx, s.coll, filter)
}
func (s *IdentityStore) Exists(ctx context.Context, userID string, deviceID int32) (bool, error) {
filter := bson.M{"user_id": userID, "device_id": deviceID}
count, err := s.coll.CountDocuments(ctx, filter)
if err != nil {
return false, err
}
return count > 0, nil
}

@ -0,0 +1,59 @@
// Copyright © 2024 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package stores
import (
"context"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model/signal"
)
// IdentityStoreInterface defines the interface for identity key storage operations
type IdentityStoreInterface interface {
Create(ctx context.Context, identityKey *signal.SignalIdentityKey) error
Update(ctx context.Context, userID string, deviceID int32, identityKey *signal.SignalIdentityKey) error
Get(ctx context.Context, userID string, deviceID int32) (*signal.SignalIdentityKey, error)
Delete(ctx context.Context, userID string, deviceID int32) error
GetByUserID(ctx context.Context, userID string) ([]*signal.SignalIdentityKey, error)
Exists(ctx context.Context, userID string, deviceID int32) (bool, error)
}
// PreKeyStoreInterface defines the interface for one-time prekey storage operations
type PreKeyStoreInterface interface {
Create(ctx context.Context, prekey *signal.SignalPreKey) error
CreateBatch(ctx context.Context, prekeys []*signal.SignalPreKey) error
GetAvailable(ctx context.Context, userID string, deviceID int32) (*signal.SignalPreKey, error)
MarkUsed(ctx context.Context, userID string, deviceID int32, keyID uint32) error
Delete(ctx context.Context, userID string, deviceID int32, keyID uint32) error
DeleteAllByUserDevice(ctx context.Context, userID string, deviceID int32) error
CountAvailable(ctx context.Context, userID string, deviceID int32) (int64, error)
GetByKeyID(ctx context.Context, userID string, deviceID int32, keyID uint32) (*signal.SignalPreKey, error)
CleanupUsed(ctx context.Context, olderThan time.Duration) (int64, error)
}
// SignedPreKeyStoreInterface defines the interface for signed prekey storage operations
type SignedPreKeyStoreInterface interface {
Create(ctx context.Context, signedPrekey *signal.SignalSignedPreKey) error
Update(ctx context.Context, userID string, deviceID int32, keyID uint32, signedPrekey *signal.SignalSignedPreKey) error
GetActive(ctx context.Context, userID string, deviceID int32) (*signal.SignalSignedPreKey, error)
GetByKeyID(ctx context.Context, userID string, deviceID int32, keyID uint32) (*signal.SignalSignedPreKey, error)
SetActive(ctx context.Context, userID string, deviceID int32, keyID uint32) error
Delete(ctx context.Context, userID string, deviceID int32, keyID uint32) error
GetAll(ctx context.Context, userID string, deviceID int32) ([]*signal.SignalSignedPreKey, error)
CleanupInactive(ctx context.Context, olderThan time.Duration) (int64, error)
Exists(ctx context.Context, userID string, deviceID int32) (bool, error)
}

@ -0,0 +1,157 @@
// Copyright © 2024 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package stores
import (
"context"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model/signal"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/log"
)
type PreKeyStore struct {
coll *mongo.Collection
}
func NewPreKeyStore(db *mongo.Database) PreKeyStoreInterface {
coll := db.Collection(signal.SignalPreKeyCollection)
// Create indexes
_, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
{
Keys: bson.D{{Key: "user_id", Value: 1}, {Key: "device_id", Value: 1}, {Key: "key_id", Value: 1}},
Options: options.Index().SetUnique(true),
},
{Keys: bson.D{{Key: "user_id", Value: 1}, {Key: "device_id", Value: 1}}},
{Keys: bson.D{{Key: "user_id", Value: 1}, {Key: "device_id", Value: 1}, {Key: "used", Value: 1}}},
{Keys: bson.D{{Key: "created_time", Value: 1}}},
{Keys: bson.D{{Key: "used_time", Value: 1}}},
})
if err != nil {
log.ZWarn(context.Background(), "failed to create indexes for prekey store", err)
}
return &PreKeyStore{coll: coll}
}
func (s *PreKeyStore) Create(ctx context.Context, prekey *signal.SignalPreKey) error {
return mongoutil.InsertOne(ctx, s.coll, prekey)
}
func (s *PreKeyStore) CreateBatch(ctx context.Context, prekeys []*signal.SignalPreKey) error {
if len(prekeys) == 0 {
return nil
}
return mongoutil.InsertMany(ctx, s.coll, prekeys)
}
func (s *PreKeyStore) GetAvailable(ctx context.Context, userID string, deviceID int32) (*signal.SignalPreKey, error) {
filter := bson.M{
"user_id": userID,
"device_id": deviceID,
"used": false,
}
// Get one available prekey, sorted by creation time (FIFO)
opts := options.FindOne().SetSort(bson.D{{Key: "created_time", Value: 1}})
prekey, err := mongoutil.FindOne[*signal.SignalPreKey](ctx, s.coll, filter, opts)
if err != nil {
return nil, err
}
return prekey, nil
}
func (s *PreKeyStore) MarkUsed(ctx context.Context, userID string, deviceID int32, keyID uint32) error {
filter := bson.M{
"user_id": userID,
"device_id": deviceID,
"key_id": keyID,
}
now := time.Now()
update := bson.M{
"$set": bson.M{
"used": true,
"used_time": now,
},
}
return mongoutil.UpdateOne(ctx, s.coll, filter, update, false)
}
func (s *PreKeyStore) Delete(ctx context.Context, userID string, deviceID int32, keyID uint32) error {
filter := bson.M{
"user_id": userID,
"device_id": deviceID,
"key_id": keyID,
}
return mongoutil.DeleteOne(ctx, s.coll, filter)
}
func (s *PreKeyStore) DeleteAllByUserDevice(ctx context.Context, userID string, deviceID int32) error {
filter := bson.M{
"user_id": userID,
"device_id": deviceID,
}
return mongoutil.DeleteMany(ctx, s.coll, filter)
}
func (s *PreKeyStore) CountAvailable(ctx context.Context, userID string, deviceID int32) (int64, error) {
filter := bson.M{
"user_id": userID,
"device_id": deviceID,
"used": false,
}
return s.coll.CountDocuments(ctx, filter)
}
func (s *PreKeyStore) GetByKeyID(ctx context.Context, userID string, deviceID int32, keyID uint32) (*signal.SignalPreKey, error) {
filter := bson.M{
"user_id": userID,
"device_id": deviceID,
"key_id": keyID,
}
prekey, err := mongoutil.FindOne[*signal.SignalPreKey](ctx, s.coll, filter)
if err != nil {
return nil, err
}
return prekey, nil
}
func (s *PreKeyStore) CleanupUsed(ctx context.Context, olderThan time.Duration) (int64, error) {
cutoffTime := time.Now().Add(-olderThan)
filter := bson.M{
"used": true,
"used_time": bson.M{"$lt": cutoffTime},
}
result, err := s.coll.DeleteMany(ctx, filter)
if err != nil {
return 0, err
}
return result.DeletedCount, nil
}

@ -0,0 +1,187 @@
// Copyright © 2024 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package stores
import (
"context"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model/signal"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/log"
)
type SignedPreKeyStore struct {
coll *mongo.Collection
}
func NewSignedPreKeyStore(db *mongo.Database) SignedPreKeyStoreInterface {
coll := db.Collection(signal.SignalSignedPreKeyCollection)
// Create indexes
_, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
{
Keys: bson.D{{Key: "user_id", Value: 1}, {Key: "device_id", Value: 1}, {Key: "key_id", Value: 1}},
Options: options.Index().SetUnique(true),
},
{Keys: bson.D{{Key: "user_id", Value: 1}, {Key: "device_id", Value: 1}}},
{Keys: bson.D{{Key: "user_id", Value: 1}, {Key: "device_id", Value: 1}, {Key: "active", Value: 1}}},
{Keys: bson.D{{Key: "active", Value: 1}, {Key: "created_time", Value: 1}}},
{Keys: bson.D{{Key: "created_time", Value: 1}}},
})
if err != nil {
log.ZWarn(context.Background(), "failed to create indexes for signed prekey store", err)
}
return &SignedPreKeyStore{coll: coll}
}
func (s *SignedPreKeyStore) Create(ctx context.Context, signedPrekey *signal.SignalSignedPreKey) error {
return mongoutil.InsertOne(ctx, s.coll, signedPrekey)
}
func (s *SignedPreKeyStore) Update(ctx context.Context, userID string, deviceID int32, keyID uint32, signedPrekey *signal.SignalSignedPreKey) error {
filter := bson.M{
"user_id": userID,
"device_id": deviceID,
"key_id": keyID,
}
update := bson.M{"$set": signedPrekey}
return mongoutil.UpdateOne(ctx, s.coll, filter, update, false)
}
func (s *SignedPreKeyStore) GetActive(ctx context.Context, userID string, deviceID int32) (*signal.SignalSignedPreKey, error) {
filter := bson.M{
"user_id": userID,
"device_id": deviceID,
"active": true,
}
// Get the most recent active signed prekey
opts := options.FindOne().SetSort(bson.D{{Key: "created_time", Value: -1}})
signedPrekey, err := mongoutil.FindOne[*signal.SignalSignedPreKey](ctx, s.coll, filter, opts)
if err != nil {
return nil, err
}
return signedPrekey, nil
}
func (s *SignedPreKeyStore) GetByKeyID(ctx context.Context, userID string, deviceID int32, keyID uint32) (*signal.SignalSignedPreKey, error) {
filter := bson.M{
"user_id": userID,
"device_id": deviceID,
"key_id": keyID,
}
signedPrekey, err := mongoutil.FindOne[*signal.SignalSignedPreKey](ctx, s.coll, filter)
if err != nil {
return nil, err
}
return signedPrekey, nil
}
func (s *SignedPreKeyStore) SetActive(ctx context.Context, userID string, deviceID int32, keyID uint32) error {
session, err := s.coll.Database().Client().StartSession()
if err != nil {
return err
}
defer session.EndSession(ctx)
// Use transaction to ensure atomicity
_, err = session.WithTransaction(ctx, func(sc mongo.SessionContext) (interface{}, error) {
// First, deactivate all existing signed prekeys for this user/device
deactivateFilter := bson.M{
"user_id": userID,
"device_id": deviceID,
}
deactivateUpdate := bson.M{
"$set": bson.M{"active": false},
}
_, err := s.coll.UpdateMany(sc, deactivateFilter, deactivateUpdate)
if err != nil {
return nil, err
}
// Then, activate the specified signed prekey
activateFilter := bson.M{
"user_id": userID,
"device_id": deviceID,
"key_id": keyID,
}
activateUpdate := bson.M{
"$set": bson.M{"active": true},
}
return s.coll.UpdateOne(sc, activateFilter, activateUpdate)
})
return err
}
func (s *SignedPreKeyStore) Delete(ctx context.Context, userID string, deviceID int32, keyID uint32) error {
filter := bson.M{
"user_id": userID,
"device_id": deviceID,
"key_id": keyID,
}
return mongoutil.DeleteOne(ctx, s.coll, filter)
}
func (s *SignedPreKeyStore) GetAll(ctx context.Context, userID string, deviceID int32) ([]*signal.SignalSignedPreKey, error) {
filter := bson.M{
"user_id": userID,
"device_id": deviceID,
}
// Sort by creation time, newest first
opts := options.Find().SetSort(bson.D{{Key: "created_time", Value: -1}})
return mongoutil.Find[*signal.SignalSignedPreKey](ctx, s.coll, filter, opts)
}
func (s *SignedPreKeyStore) CleanupInactive(ctx context.Context, olderThan time.Duration) (int64, error) {
cutoffTime := time.Now().Add(-olderThan)
filter := bson.M{
"active": false,
"created_time": bson.M{"$lt": cutoffTime},
}
result, err := s.coll.DeleteMany(ctx, filter)
if err != nil {
return 0, err
}
return result.DeletedCount, nil
}
func (s *SignedPreKeyStore) Exists(ctx context.Context, userID string, deviceID int32) (bool, error) {
filter := bson.M{
"user_id": userID,
"device_id": deviceID,
"active": true,
}
count, err := s.coll.CountDocuments(ctx, filter)
if err != nil {
return false, err
}
return count > 0, nil
}

@ -385,6 +385,7 @@ type RpcService struct {
Auth string `yaml:"auth"` Auth string `yaml:"auth"`
Conversation string `yaml:"conversation"` Conversation string `yaml:"conversation"`
Third string `yaml:"third"` Third string `yaml:"third"`
Encryption string `yaml:"encryption"`
} }
func (r *RpcService) GetServiceNames() []string { func (r *RpcService) GetServiceNames() []string {
@ -398,6 +399,7 @@ func (r *RpcService) GetServiceNames() []string {
r.Auth, r.Auth,
r.Conversation, r.Conversation,
r.Third, r.Third,
r.Encryption,
} }
} }

@ -0,0 +1,67 @@
// Copyright © 2024 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package signal
import (
"context"
"time"
)
const (
SignalIdentityKeyCollection = "signal_identity_keys"
)
// SignalIdentityKey represents the identity key for Signal protocol
type SignalIdentityKey struct {
UserID string `bson:"user_id" json:"userID"`
DeviceID int32 `bson:"device_id" json:"deviceID"`
IdentityKey []byte `bson:"identity_key" json:"identityKey"`
RegistrationID int32 `bson:"registration_id" json:"registrationID"`
CreatedTime time.Time `bson:"created_time" json:"createdTime"`
UpdatedTime time.Time `bson:"updated_time" json:"updatedTime"`
}
type SignalIdentityKeyModelInterface interface {
// Create creates a new identity key record
Create(ctx context.Context, identityKey *SignalIdentityKey) error
// Update updates an existing identity key
Update(ctx context.Context, userID string, deviceID int32, identityKey *SignalIdentityKey) error
// Get retrieves an identity key by user ID and device ID
Get(ctx context.Context, userID string, deviceID int32) (*SignalIdentityKey, error)
// Delete removes an identity key
Delete(ctx context.Context, userID string, deviceID int32) error
// GetByUserID retrieves all identity keys for a user
GetByUserID(ctx context.Context, userID string) ([]*SignalIdentityKey, error)
// Exists checks if an identity key exists
Exists(ctx context.Context, userID string, deviceID int32) (bool, error)
}
func (SignalIdentityKey) TableName() string {
return SignalIdentityKeyCollection
}
// Indexes returns the indexes for the collection
func (SignalIdentityKey) Indexes() []string {
return []string{
"user_id",
"user_id_device_id", // compound index for (user_id, device_id)
"created_time",
}
}

@ -0,0 +1,80 @@
// Copyright © 2024 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package signal
import (
"context"
"time"
)
const (
SignalPreKeyCollection = "signal_prekeys"
)
// SignalPreKey represents one-time prekeys for Signal protocol
type SignalPreKey struct {
UserID string `bson:"user_id" json:"userID"`
DeviceID int32 `bson:"device_id" json:"deviceID"`
KeyID uint32 `bson:"key_id" json:"keyID"`
PublicKey []byte `bson:"public_key" json:"publicKey"`
Used bool `bson:"used" json:"used"`
CreatedTime time.Time `bson:"created_time" json:"createdTime"`
UsedTime *time.Time `bson:"used_time,omitempty" json:"usedTime,omitempty"`
}
type SignalPreKeyModelInterface interface {
// Create creates a new prekey record
Create(ctx context.Context, prekey *SignalPreKey) error
// CreateBatch creates multiple prekey records in batch
CreateBatch(ctx context.Context, prekeys []*SignalPreKey) error
// GetAvailable retrieves an available (unused) prekey for a user/device
GetAvailable(ctx context.Context, userID string, deviceID int32) (*SignalPreKey, error)
// MarkUsed marks a prekey as used
MarkUsed(ctx context.Context, userID string, deviceID int32, keyID uint32) error
// Delete removes a prekey
Delete(ctx context.Context, userID string, deviceID int32, keyID uint32) error
// DeleteAllByUserDevice removes all prekeys for a user/device
DeleteAllByUserDevice(ctx context.Context, userID string, deviceID int32) error
// CountAvailable returns the count of available prekeys for a user/device
CountAvailable(ctx context.Context, userID string, deviceID int32) (int64, error)
// GetByKeyID retrieves a specific prekey by key ID
GetByKeyID(ctx context.Context, userID string, deviceID int32, keyID uint32) (*SignalPreKey, error)
// CleanupUsed removes used prekeys older than the specified duration
CleanupUsed(ctx context.Context, olderThan time.Duration) (int64, error)
}
func (SignalPreKey) TableName() string {
return SignalPreKeyCollection
}
// Indexes returns the indexes for the collection
func (SignalPreKey) Indexes() []string {
return []string{
"user_id",
"user_id_device_id", // compound index for (user_id, device_id)
"user_id_device_id_key_id", // compound unique index for (user_id, device_id, key_id)
"user_id_device_id_used", // compound index for (user_id, device_id, used)
"used_time",
"created_time",
}
}

@ -0,0 +1,80 @@
// Copyright © 2024 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package signal
import (
"context"
"time"
)
const (
SignalSignedPreKeyCollection = "signal_signed_prekeys"
)
// SignalSignedPreKey represents signed prekeys for Signal protocol
type SignalSignedPreKey struct {
UserID string `bson:"user_id" json:"userID"`
DeviceID int32 `bson:"device_id" json:"deviceID"`
KeyID uint32 `bson:"key_id" json:"keyID"`
PublicKey []byte `bson:"public_key" json:"publicKey"`
Signature []byte `bson:"signature" json:"signature"`
CreatedTime time.Time `bson:"created_time" json:"createdTime"`
Active bool `bson:"active" json:"active"` // Whether this is the active signed prekey
}
type SignalSignedPreKeyModelInterface interface {
// Create creates a new signed prekey record
Create(ctx context.Context, signedPrekey *SignalSignedPreKey) error
// Update updates an existing signed prekey (for rotation)
Update(ctx context.Context, userID string, deviceID int32, keyID uint32, signedPrekey *SignalSignedPreKey) error
// GetActive retrieves the active signed prekey for a user/device
GetActive(ctx context.Context, userID string, deviceID int32) (*SignalSignedPreKey, error)
// GetByKeyID retrieves a specific signed prekey by key ID
GetByKeyID(ctx context.Context, userID string, deviceID int32, keyID uint32) (*SignalSignedPreKey, error)
// SetActive marks a signed prekey as active and deactivates others
SetActive(ctx context.Context, userID string, deviceID int32, keyID uint32) error
// Delete removes a signed prekey
Delete(ctx context.Context, userID string, deviceID int32, keyID uint32) error
// GetAll retrieves all signed prekeys for a user/device
GetAll(ctx context.Context, userID string, deviceID int32) ([]*SignalSignedPreKey, error)
// CleanupInactive removes inactive signed prekeys older than the specified duration
CleanupInactive(ctx context.Context, olderThan time.Duration) (int64, error)
// Exists checks if a signed prekey exists
Exists(ctx context.Context, userID string, deviceID int32) (bool, error)
}
func (SignalSignedPreKey) TableName() string {
return SignalSignedPreKeyCollection
}
// Indexes returns the indexes for the collection
func (SignalSignedPreKey) Indexes() []string {
return []string{
"user_id",
"user_id_device_id", // compound index for (user_id, device_id)
"user_id_device_id_key_id", // compound unique index for (user_id, device_id, key_id)
"user_id_device_id_active", // compound index for (user_id, device_id, active)
"active_created_time", // compound index for (active, created_time)
"created_time",
}
}

@ -11,6 +11,7 @@ serviceBinaries:
openim-rpc-friend: 1 openim-rpc-friend: 1
openim-rpc-msg: 1 openim-rpc-msg: 1
openim-rpc-third: 1 openim-rpc-third: 1
openim-rpc-encryption: 1
toolBinaries: toolBinaries:
- check-free-memory - check-free-memory
- check-component - check-component

Loading…
Cancel
Save