diff --git a/cmd/api/main.go b/cmd/api/main.go index 4d8e521fb..b5e183a61 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -7,6 +7,7 @@ import ( "OpenIM/pkg/common/log" "OpenIM/pkg/common/mw" "context" + "errors" "fmt" "github.com/OpenIMSDK/openKeeper" "net" @@ -43,7 +44,9 @@ func run(port int) error { } fmt.Println("start api server, address: ", address, ", OpenIM version: ", config.Version) log.ZInfo(context.Background(), "start server success", "address", address, "version", config.Version) - log.Info("s", "start server") + log.ZDebug(context.Background(), "start server success", "address", address, "version", config.Version) + log.ZError(context.Background(), "start server success", errors.New("ss"), "address", address) + log.ZWarn(context.Background(), "start server success", errors.New("ss"), "address", address) err = router.Run(address) if err != nil { log.Error("", "api run failed ", address, err.Error()) diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index 06f697481..01e317a46 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -13,7 +13,6 @@ import ( pbConversation "OpenIM/pkg/proto/conversation" "OpenIM/pkg/utils" "context" - "github.com/dtm-labs/rockscache" "google.golang.org/grpc" ) @@ -35,12 +34,9 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e if err != nil { return err } - opts := rockscache.NewDefaultOptions() - opts.RandomExpireAdjustment = 0.2 - opts.StrongConsistency = true pbConversation.RegisterConversationServer(server, &conversationServer{ groupChecker: check.NewGroupChecker(client), - ConversationDatabase: controller.NewConversationDatabase(relation.NewConversationGorm(db), cache.NewConversationRedis(rdb, opts), tx.NewGorm(db)), + ConversationDatabase: controller.NewConversationDatabase(relation.NewConversationGorm(db), cache.NewConversationRedis(rdb, cache.GetDefaultOpt()), tx.NewGorm(db)), }) return nil } diff --git a/pkg/common/db/cache/redis.go b/pkg/common/db/cache/redis.go index 06922ba13..61afd4b4e 100644 --- a/pkg/common/db/cache/redis.go +++ b/pkg/common/db/cache/redis.go @@ -3,6 +3,7 @@ package cache import ( "OpenIM/pkg/common/config" "OpenIM/pkg/common/constant" + "OpenIM/pkg/common/log" "OpenIM/pkg/common/tracelog" pbMsg "OpenIM/pkg/proto/msg" "OpenIM/pkg/proto/sdkws" @@ -93,6 +94,38 @@ type cache struct { rdb redis.UniversalClient } +// 兼容老版本调用 +func (c *cache) DelKeys() { + for _, key := range []string{"GROUP_CACHE:", "FRIEND_RELATION_CACHE:", "BLACK_LIST_CACHE:", "USER_INFO_CACHE:", "GROUP_INFO_CACHE:", "JOINED_GROUP_LIST_CACHE:", + "GROUP_MEMBER_INFO_CACHE:", "GROUP_ALL_MEMBER_INFO_CACHE:", "ALL_FRIEND_INFO_CACHE:"} { + fName := utils.GetSelfFuncName() + var cursor uint64 + var n int + for { + var keys []string + var err error + keys, cursor, err = c.rdb.Scan(context.Background(), cursor, key+"*", scanCount).Result() + if err != nil { + panic(err.Error()) + } + n += len(keys) + // for each for redis cluster + for _, key := range keys { + if err = c.rdb.Del(context.Background(), key).Err(); err != nil { + log.NewError("", fName, key, err.Error()) + err = c.rdb.Del(context.Background(), key).Err() + if err != nil { + panic(err.Error()) + } + } + } + if cursor == 0 { + break + } + } + } +} + func (c *cache) IncrUserSeq(ctx context.Context, userID string) (int64, error) { return utils.Wrap2(c.rdb.Get(ctx, userIncrSeq+userID).Int64()) } diff --git a/pkg/common/db/cache/rockscache.go b/pkg/common/db/cache/rockscache.go index 35c025b65..1969da554 100644 --- a/pkg/common/db/cache/rockscache.go +++ b/pkg/common/db/cache/rockscache.go @@ -1,6 +1,7 @@ package cache import ( + "OpenIM/pkg/common/log" "OpenIM/pkg/utils" "context" "encoding/json" @@ -10,36 +11,13 @@ import ( const scanCount = 3000 -//func (rc *RcClient) DelKeys() { -// for _, key := range []string{"GROUP_CACHE:", "FRIEND_RELATION_CACHE", "BLACK_LIST_CACHE:", "USER_INFO_CACHE:", "GROUP_INFO_CACHE", groupOwnerIDCache, joinedGroupListCache, -// groupMemberInfoCache, groupAllMemberInfoCache, "ALL_FRIEND_INFO_CACHE:"} { -// fName := utils.GetSelfFuncName() -// var cursor uint64 -// var n int -// for { -// var keys []string -// var err error -// keys, cursor, err = rc.rdb.Scan(context.Background(), cursor, key+"*", scanCount).Result() -// if err != nil { -// panic(err.Error()) -// } -// n += len(keys) -// // for each for redis cluster -// for _, key := range keys { -// if err = rc.rdb.Del(context.Background(), key).Err(); err != nil { -// log.NewError("", fName, key, err.Error()) -// err = rc.rdb.Del(context.Background(), key).Err() -// if err != nil { -// panic(err.Error()) -// } -// } -// } -// if cursor == 0 { -// break -// } -// } -// } -//} + +func GetDefaultOpt() rockscache.Options { + opts := rockscache.NewDefaultOptions() + opts.StrongConsistency = true + opts.RandomExpireAdjustment = 0.2 + return opts +} func GetCache[T any](ctx context.Context, rcClient *rockscache.Client, key string, expire time.Duration, fn func(ctx context.Context) (T, error)) (T, error) { var t T diff --git a/pkg/common/log/logrus.go b/pkg/common/log/logrus.go index cdf783e1a..1e2b9c531 100644 --- a/pkg/common/log/logrus.go +++ b/pkg/common/log/logrus.go @@ -107,18 +107,6 @@ func InfoKv(ctx context.Context, msg string, keysAndValues ...interface{}) { }).Infoln(keysAndValues) } -func DebugKv(ctx context.Context, msg string, keysAndValues ...interface{}) { - -} - -func ErrorKv(ctx context.Context, msg string, err error, keysAndValues ...interface{}) { - -} - -func WarnKv(ctx context.Context, msg string, err error, keysAndValues ...interface{}) { - -} - func Info(OperationID string, args ...interface{}) { logger.WithFields(logrus.Fields{ "OperationID": OperationID, diff --git a/pkg/common/log/zap.go b/pkg/common/log/zap.go index 2b51cb015..068ca2bb2 100644 --- a/pkg/common/log/zap.go +++ b/pkg/common/log/zap.go @@ -63,7 +63,14 @@ func NewZapLogger() (*ZapLogger, error) { if config.Config.Log.Stderr { zapConfig.OutputPaths = append(zapConfig.OutputPaths, "stderr") } - l, err := zapConfig.Build(zl.cores()) + zapConfig.EncoderConfig.EncodeTime = zl.timeEncoder + zapConfig.EncoderConfig.EncodeDuration = zapcore.SecondsDurationEncoder + zapConfig.EncoderConfig.EncodeLevel = zapcore.LowercaseColorLevelEncoder + opts, err := zl.cores() + if err != nil { + return nil, err + } + l, err := zapConfig.Build(opts) if err != nil { return nil, err } @@ -71,13 +78,16 @@ func NewZapLogger() (*ZapLogger, error) { return zl, nil } -func timeEncoder(t time.Time, enc zapcore.PrimitiveArrayEncoder) { +func (l *ZapLogger) timeEncoder(t time.Time, enc zapcore.PrimitiveArrayEncoder) { enc.AppendString(t.Format("2006-01-02 15:04:05")) } -func (l *ZapLogger) cores() zap.Option { +func (l *ZapLogger) cores() (zap.Option, error) { fileEncoder := zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()) - writer := l.getWriter() + writer, err := l.getWriter() + if err != nil { + return nil, err + } var cores []zapcore.Core if config.Config.Log.StorageLocation != "" { cores = []zapcore.Core{ @@ -86,16 +96,18 @@ func (l *ZapLogger) cores() zap.Option { } return zap.WrapCore(func(c zapcore.Core) zapcore.Core { return zapcore.NewTee(cores...) - }) + }), nil } -func (l *ZapLogger) getWriter() zapcore.WriteSyncer { - logf, _ := rotatelogs.New(config.Config.Log.StorageLocation+sp+"openIM"+"-"+".%Y_%m%d_%H", - rotatelogs.WithLinkName(config.Config.Log.StorageLocation+sp+"openIM"+"-"), - rotatelogs.WithMaxAge(2*24*time.Hour), - rotatelogs.WithRotationTime(time.Minute), +func (l *ZapLogger) getWriter() (zapcore.WriteSyncer, error) { + logf, err := rotatelogs.New(config.Config.Log.StorageLocation+sp+"OpenIM.log.all"+".%Y-%m-%d", + rotatelogs.WithRotationCount(config.Config.Log.RemainRotationCount), + rotatelogs.WithRotationTime(time.Duration(config.Config.Log.RotationTime)*time.Hour), ) - return zapcore.AddSync(logf) + if err != nil { + return nil, err + } + return zapcore.AddSync(logf), nil } func (l *ZapLogger) ToZap() *zap.SugaredLogger {