From d014cf8ac90bf96ecd17fdd8a97c8d8208fbbc01 Mon Sep 17 00:00:00 2001 From: "Xinwei Xiong(cubxxw-openim)" <3293172751nss@gmail.com> Date: Fri, 14 Jul 2023 13:21:43 +0800 Subject: [PATCH] feat: fix scripts support win Signed-off-by: Xinwei Xiong(cubxxw-openim) <3293172751nss@gmail.com> --- internal/api/group.go | 1 + internal/api/route.go | 10 ++--- internal/api/user.go | 6 +-- internal/msggateway/client.go | 26 ++++++----- internal/msggateway/compressor.go | 1 + internal/msggateway/constant.go | 4 +- internal/msggateway/context.go | 11 +++++ internal/msggateway/encoder.go | 5 ++- internal/msggateway/hub_server.go | 2 +- internal/msggateway/long_conn.go | 13 +++--- internal/msggateway/message_handler.go | 7 ++- internal/msggateway/n_ws_server.go | 11 ++--- internal/msggateway/options.go | 27 +++++++----- internal/msggateway/user_map.go | 7 ++- internal/msgtransfer/init.go | 9 ++-- .../msgtransfer/online_history_msg_handler.go | 32 ++++++++------ .../online_msg_to_mongo_handler.go | 6 ++- .../msgtransfer/persistent_msg_handler.go | 10 +++-- internal/push/offlinepush/fcm/push.go | 6 +-- internal/push/offlinepush/getui/body.go | 2 +- internal/push/offlinepush/getui/push.go | 11 +++-- .../push/offlinepush/jpush/body/platform.go | 3 +- .../push/offlinepush/jpush/body/pushobj.go | 1 + internal/push/push_handler.go | 9 ++-- internal/push/push_to_client.go | 8 ++-- internal/rpc/conversation/conversaion.go | 4 +- internal/rpc/friend/friend.go | 20 ++++----- internal/rpc/group/group.go | 17 +++++--- internal/rpc/group/statistics.go | 3 +- internal/rpc/msg/as_read.go | 2 +- internal/rpc/msg/lock.go | 6 ++- internal/rpc/msg/send.go | 4 +- internal/rpc/msg/server.go | 31 +++++++------ internal/rpc/msg/statistics.go | 3 +- internal/rpc/msg/sync_msg.go | 1 + internal/rpc/msg/verify.go | 5 +-- internal/rpc/third/s3.go | 3 +- internal/rpc/third/third.go | 5 ++- internal/rpc/third/tool.go | 5 ++- internal/tools/msg.go | 3 +- pkg/common/config/config.go | 2 +- pkg/common/config/parse.go | 2 +- pkg/common/constant/constant.go | 40 ++++++++--------- pkg/common/constant/platform_id_to_name.go | 8 ++-- pkg/common/convert/group.go | 2 +- pkg/common/convert/msg.go | 1 - pkg/common/db/cache/black.go | 6 +-- pkg/common/db/cache/conversation.go | 2 +- pkg/common/db/cache/friend.go | 4 +- pkg/common/db/cache/group.go | 11 ++--- pkg/common/db/cache/init_redis.go | 4 +- pkg/common/db/cache/msg.go | 4 +- pkg/common/db/controller/auth.go | 8 ++-- pkg/common/db/controller/black.go | 8 ++-- pkg/common/db/controller/conversation.go | 17 ++++---- pkg/common/db/controller/friend.go | 32 +++++++------- pkg/common/db/controller/group.go | 4 +- pkg/common/db/controller/msg.go | 27 ++++++++---- pkg/common/db/controller/s3.go | 5 ++- pkg/common/db/controller/user.go | 28 ++++++------ pkg/common/db/localcache/conversation.go | 1 - pkg/common/db/relation/friend_model.go | 22 +++++----- .../db/relation/friend_request_model.go | 14 +++--- pkg/common/db/relation/group_model.go | 5 ++- pkg/common/db/relation/mysql_init.go | 8 ++-- pkg/common/db/relation/user_model.go | 12 +++--- pkg/common/db/s3/cont/controller.go | 7 +-- pkg/common/db/s3/cont/error.go | 1 + pkg/common/db/s3/cos/cos.go | 7 +-- pkg/common/db/s3/minio/minio.go | 6 +-- pkg/common/db/s3/oss/oss.go | 7 +-- pkg/common/db/s3/oss/sign.go | 3 +- pkg/common/db/table/unrelation/msg.go | 5 ++- pkg/common/db/unrelation/mongo.go | 4 +- pkg/common/db/unrelation/msg.go | 43 +++++++++++-------- pkg/common/kafka/consumer_group.go | 1 - pkg/common/kafka/producer.go | 20 +++++---- pkg/common/log/zap.go | 3 +- pkg/common/mw/gin.go | 2 +- pkg/common/prome/gather.go | 24 +++++++---- pkg/common/tokenverify/jwt_token.go | 13 +++--- pkg/discoveryregistry/zookeeper/discover.go | 7 +-- pkg/errs/code.go | 8 ++-- pkg/rpcclient/conversation.go | 1 + pkg/rpcclient/friend.go | 3 +- pkg/rpcclient/msg.go | 2 +- pkg/rpcclient/notification/conevrsation.go | 6 +-- pkg/rpcclient/notification/group.go | 6 ++- pkg/rpcclient/third.go | 5 ++- scripts/make-rules/golang.mk | 14 ++++-- 90 files changed, 463 insertions(+), 352 deletions(-) diff --git a/internal/api/group.go b/internal/api/group.go index 519c70004..00cdebeab 100644 --- a/internal/api/group.go +++ b/internal/api/group.go @@ -131,6 +131,7 @@ func (o *GroupApi) GetSuperGroupsInfo(c *gin.Context) { func (o *GroupApi) GroupCreateCount(c *gin.Context) { a2r.Call(group.GroupClient.GroupCreateCount, o.Client, c) } + func (o *GroupApi) GetGroups(c *gin.Context) { a2r.Call(group.GroupClient.GetGroups, o.Client, c) } diff --git a/internal/api/route.go b/internal/api/route.go index 421b33b47..2dfb9f6c4 100644 --- a/internal/api/route.go +++ b/internal/api/route.go @@ -72,7 +72,7 @@ func NewGinRouter(discov discoveryregistry.SvcDiscoveryRegistry, rdb redis.Unive userRouterGroup.POST("/get_users_online_status", ParseToken, u.GetUsersOnlineStatus) userRouterGroup.POST("/get_users_online_token_detail", ParseToken, u.GetUsersOnlineTokenDetail) } - //friend routing group + // friend routing group friendRouterGroup := r.Group("/friend", ParseToken) { f := NewFriendApi(*friendRpc) @@ -120,7 +120,7 @@ func NewGinRouter(discov discoveryregistry.SvcDiscoveryRegistry, rdb redis.Unive superGroupRouterGroup.POST("/get_joined_group_list", g.GetJoinedSuperGroupList) superGroupRouterGroup.POST("/get_groups_info", g.GetSuperGroupsInfo) } - //certificate + // certificate authRouterGroup := r.Group("/auth") { a := NewAuthApi(*authRpc) @@ -128,7 +128,7 @@ func NewGinRouter(discov discoveryregistry.SvcDiscoveryRegistry, rdb redis.Unive authRouterGroup.POST("/parse_token", a.ParseToken) authRouterGroup.POST("/force_logout", ParseToken, a.ForceLogout) } - //Third service + // Third service thirdGroup := r.Group("/third", ParseToken) { t := NewThirdApi(*thirdRpc) @@ -145,7 +145,7 @@ func NewGinRouter(discov discoveryregistry.SvcDiscoveryRegistry, rdb redis.Unive objectGroup.POST("/access_url", t.AccessURL) objectGroup.GET("/*name", t.ObjectRedirect) } - //Message + // Message msgGroup := r.Group("/msg", ParseToken) { msgGroup.POST("/newest_seq", m.GetSeq) @@ -167,7 +167,7 @@ func NewGinRouter(discov discoveryregistry.SvcDiscoveryRegistry, rdb redis.Unive msgGroup.POST("/batch_send_msg", m.BatchSendMsg) msgGroup.POST("/check_msg_is_send_success", m.CheckMsgIsSendSuccess) } - //Conversation + // Conversation conversationGroup := r.Group("/conversation", ParseToken) { c := NewConversationApi(*conversationRpc) diff --git a/internal/api/user.go b/internal/api/user.go index 4218e6c70..28cb74509 100644 --- a/internal/api/user.go +++ b/internal/api/user.go @@ -78,7 +78,7 @@ func (u *UserApi) GetUsersOnlineStatus(c *gin.Context) { var respResult []*msggateway.GetUsersOnlineStatusResp_SuccessResult flag := false - //Online push message + // Online push message for _, v := range conns { msgClient := msggateway.NewMsgGatewayClient(v) reply, err := msgClient.GetUsersOnlineStatus(c, &req) @@ -131,7 +131,7 @@ func (u *UserApi) GetUsersOnlineTokenDetail(c *gin.Context) { apiresp.GinError(c, err) return } - //Online push message + // Online push message for _, v := range conns { msgClient := msggateway.NewMsgGatewayClient(v) reply, err := msgClient.GetUsersOnlineStatus(c, &req) @@ -160,7 +160,6 @@ func (u *UserApi) GetUsersOnlineTokenDetail(c *gin.Context) { } } } - } for p, tokens := range m { t := new(msggateway.SinglePlatformToken) @@ -176,5 +175,4 @@ func (u *UserApi) GetUsersOnlineTokenDetail(c *gin.Context) { } apiresp.GinSuccess(c, respResult) - } diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index 34f9cf47d..03e37fcc6 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -31,10 +31,12 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" ) -var ErrConnClosed = errors.New("conn has closed") -var ErrNotSupportMessageProtocol = errors.New("not support message protocol") -var ErrClientClosed = errors.New("client actively close the connection") -var ErrPanic = errors.New("panic error") +var ( + ErrConnClosed = errors.New("conn has closed") + ErrNotSupportMessageProtocol = errors.New("not support message protocol") + ErrClientClosed = errors.New("client actively close the connection") + ErrPanic = errors.New("panic error") +) const ( // MessageText is for UTF-8 encoded text messages like JSON. @@ -102,10 +104,12 @@ func (c *Client) ResetClient( c.closedErr = nil c.token = token } + func (c *Client) pongHandler(_ string) error { c.conn.SetReadDeadline(pongWait) return nil } + func (c *Client) readMessage() { defer func() { if r := recover(); r != nil { @@ -124,7 +128,7 @@ func (c *Client) readMessage() { return } log.ZDebug(c.ctx, "readMessage", "messageType", messageType) - if c.closed == true { //连接刚置位已经关闭,但是协程还没退出的场景 + if c.closed == true { // 连接刚置位已经关闭,但是协程还没退出的场景 c.closedErr = ErrConnClosed return } @@ -148,8 +152,8 @@ func (c *Client) readMessage() { default: } } - } + func (c *Client) handleMessage(message []byte) error { if c.IsCompress { var decompressErr error @@ -198,26 +202,26 @@ func (c *Client) handleMessage(message []byte) error { } c.replyMessage(ctx, &binaryReq, messageErr, resp) return nil - } + func (c *Client) setAppBackgroundStatus(ctx context.Context, req Req) ([]byte, error) { resp, isBackground, messageErr := c.longConnServer.SetUserDeviceBackground(ctx, req) if messageErr != nil { return nil, messageErr } c.IsBackground = isBackground - //todo callback + // todo callback return resp, nil - } + func (c *Client) close() { c.w.Lock() defer c.w.Unlock() c.closed = true c.conn.Close() c.longConnServer.UnRegister(c) - } + func (c *Client) replyMessage(ctx context.Context, binaryReq *Req, err error, resp []byte) { errResp := apiresp.ParseError(err) mReply := Resp{ @@ -234,6 +238,7 @@ func (c *Client) replyMessage(ctx context.Context, binaryReq *Req, err error, re log.ZWarn(ctx, "wireBinaryMsg replyMessage", err, "resp", mReply.String()) } } + func (c *Client) PushMessage(ctx context.Context, msgData *sdkws.MsgData) error { var msg sdkws.PushMessages conversationID := utils.GetConversationIDByMsg(msgData) @@ -296,5 +301,4 @@ func (c *Client) writePongMsg() error { } _ = c.conn.SetWriteDeadline(writeWait) return c.conn.WriteMessage(PongMessage, nil) - } diff --git a/internal/msggateway/compressor.go b/internal/msggateway/compressor.go index 97a9b1eff..99b827454 100644 --- a/internal/msggateway/compressor.go +++ b/internal/msggateway/compressor.go @@ -33,6 +33,7 @@ type GzipCompressor struct { func NewGzipCompressor() *GzipCompressor { return &GzipCompressor{compressProtocol: "gzip"} } + func (g *GzipCompressor) Compress(rawData []byte) ([]byte, error) { gzipBuffer := bytes.Buffer{} gz := gzip.NewWriter(&gzipBuffer) diff --git a/internal/msggateway/constant.go b/internal/msggateway/constant.go index 043c9d688..497930a6e 100644 --- a/internal/msggateway/constant.go +++ b/internal/msggateway/constant.go @@ -27,11 +27,13 @@ const ( GzipCompressionProtocol = "gzip" BackgroundStatus = "isBackground" ) + const ( WebSocket = iota + 1 ) + const ( - //Websocket Protocol + //Websocket Protocol. WSGetNewestSeq = 1001 WSPullMsgBySeqList = 1002 WSSendMsg = 1003 diff --git a/internal/msggateway/context.go b/internal/msggateway/context.go index e5ccc00f4..e1d066467 100644 --- a/internal/msggateway/context.go +++ b/internal/msggateway/context.go @@ -71,9 +71,11 @@ func newContext(respWriter http.ResponseWriter, req *http.Request) *UserConnCont ConnID: utils.Md5(req.RemoteAddr + "_" + strconv.Itoa(int(utils.GetCurrentTimestampByMill()))), } } + func (c *UserConnContext) GetRemoteAddr() string { return c.RemoteAddr } + func (c *UserConnContext) Query(key string) (string, bool) { var value string if value = c.Req.URL.Query().Get(key); value == "" { @@ -81,6 +83,7 @@ func (c *UserConnContext) Query(key string) (string, bool) { } return value, true } + func (c *UserConnContext) GetHeader(key string) (string, bool) { var value string if value = c.Req.Header.Get(key); value == "" { @@ -88,27 +91,35 @@ func (c *UserConnContext) GetHeader(key string) (string, bool) { } return value, true } + func (c *UserConnContext) SetHeader(key, value string) { c.RespWriter.Header().Set(key, value) } + func (c *UserConnContext) ErrReturn(error string, code int) { http.Error(c.RespWriter, error, code) } + func (c *UserConnContext) GetConnID() string { return c.ConnID } + func (c *UserConnContext) GetUserID() string { return c.Req.URL.Query().Get(WsUserID) } + func (c *UserConnContext) GetPlatformID() string { return c.Req.URL.Query().Get(PlatformID) } + func (c *UserConnContext) GetOperationID() string { return c.Req.URL.Query().Get(OperationID) } + func (c *UserConnContext) GetToken() string { return c.Req.URL.Query().Get(Token) } + func (c *UserConnContext) GetBackground() bool { b, err := strconv.ParseBool(c.Req.URL.Query().Get(BackgroundStatus)) if err != nil { diff --git a/internal/msggateway/encoder.go b/internal/msggateway/encoder.go index a34a66e66..41ab02d0c 100644 --- a/internal/msggateway/encoder.go +++ b/internal/msggateway/encoder.go @@ -26,12 +26,12 @@ type Encoder interface { Decode(encodeData []byte, decodeData interface{}) error } -type GobEncoder struct { -} +type GobEncoder struct{} func NewGobEncoder() *GobEncoder { return &GobEncoder{} } + func (g *GobEncoder) Encode(data interface{}) ([]byte, error) { buff := bytes.Buffer{} enc := gob.NewEncoder(&buff) @@ -41,6 +41,7 @@ func (g *GobEncoder) Encode(data interface{}) ([]byte, error) { } return buff.Bytes(), nil } + func (g *GobEncoder) Decode(encodeData []byte, decodeData interface{}) error { buff := bytes.NewBuffer(encodeData) dec := gob.NewDecoder(buff) diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go index ea7b9fd8d..81aade1cb 100644 --- a/internal/msggateway/hub_server.go +++ b/internal/msggateway/hub_server.go @@ -197,6 +197,6 @@ func (s *Server) MultiTerminalLoginCheck( ctx context.Context, req *msggateway.MultiTerminalLoginCheckReq, ) (*msggateway.MultiTerminalLoginCheckResp, error) { - //TODO implement me + // TODO implement me panic("implement me") } diff --git a/internal/msggateway/long_conn.go b/internal/msggateway/long_conn.go index 309ec2d8b..c2fa324a9 100644 --- a/internal/msggateway/long_conn.go +++ b/internal/msggateway/long_conn.go @@ -22,14 +22,14 @@ import ( ) type LongConn interface { - //Close this connection + // Close this connection Close() error // WriteMessage Write message to connection,messageType means data type,can be set binary(2) and text(1). WriteMessage(messageType int, message []byte) error // ReadMessage Read message from connection. ReadMessage() (int, []byte, error) // SetReadDeadline sets the read deadline on the underlying network connection, - //after a read has timed out, will return an error. + // after a read has timed out, will return an error. SetReadDeadline(timeout time.Duration) error // SetWriteDeadline sets to write deadline when send message,when read has timed out,will return error. SetWriteDeadline(timeout time.Duration) error @@ -58,6 +58,7 @@ func newGWebSocket(protocolType int, handshakeTimeout time.Duration) *GWebSocket func (d *GWebSocket) Close() error { return d.conn.Close() } + func (d *GWebSocket) GenerateLongConn(w http.ResponseWriter, r *http.Request) error { upgrader := &websocket.Upgrader{ HandshakeTimeout: d.handshakeTimeout, @@ -69,10 +70,10 @@ func (d *GWebSocket) GenerateLongConn(w http.ResponseWriter, r *http.Request) er } d.conn = conn return nil - +} } func (d *GWebSocket) WriteMessage(messageType int, message []byte) error { - //d.setSendConn(d.conn) + // d.setSendConn(d.conn) return d.conn.WriteMessage(messageType, message) } @@ -83,6 +84,7 @@ func (d *GWebSocket) WriteMessage(messageType int, message []byte) error { func (d *GWebSocket) ReadMessage() (int, []byte, error) { return d.conn.ReadMessage() } + func (d *GWebSocket) SetReadDeadline(timeout time.Duration) error { return d.conn.SetReadDeadline(time.Now().Add(timeout)) } @@ -97,7 +99,6 @@ func (d *GWebSocket) Dial(urlStr string, requestHeader http.Header) (*http.Respo d.conn = conn } return httpResp, err - } func (d *GWebSocket) IsNil() bool { @@ -110,9 +111,11 @@ func (d *GWebSocket) IsNil() bool { func (d *GWebSocket) SetConnNil() { d.conn = nil } + func (d *GWebSocket) SetReadLimit(limit int64) { d.conn.SetReadLimit(limit) } + func (d *GWebSocket) SetPongHandler(handler PongHandler) { d.conn.SetPongHandler(handler) } diff --git a/internal/msggateway/message_handler.go b/internal/msggateway/message_handler.go index 874101a05..bc650aae6 100644 --- a/internal/msggateway/message_handler.go +++ b/internal/msggateway/message_handler.go @@ -75,8 +75,10 @@ type GrpcHandler struct { func NewGrpcHandler(validate *validator.Validate, client discoveryregistry.SvcDiscoveryRegistry) *GrpcHandler { msgRpcClient := rpcclient.NewMessageRpcClient(client) pushRpcClient := rpcclient.NewPushRpcClient(client) - return &GrpcHandler{msgRpcClient: &msgRpcClient, - pushClient: &pushRpcClient, validate: validate} + return &GrpcHandler{ + msgRpcClient: &msgRpcClient, + pushClient: &pushRpcClient, validate: validate, + } } func (g GrpcHandler) GetSeq(context context.Context, data Req) ([]byte, error) { @@ -164,6 +166,7 @@ func (g GrpcHandler) UserLogout(context context.Context, data Req) ([]byte, erro } return c, nil } + func (g GrpcHandler) SetUserDeviceBackground(_ context.Context, data Req) ([]byte, bool, error) { req := sdkws.SetAppBackgroundStatusReq{} if err := proto.Unmarshal(data.Data, &req); err != nil { diff --git a/internal/msggateway/n_ws_server.go b/internal/msggateway/n_ws_server.go index c204dc115..2988e2c08 100644 --- a/internal/msggateway/n_ws_server.go +++ b/internal/msggateway/n_ws_server.go @@ -86,6 +86,7 @@ type kickHandler struct { func (ws *WsServer) SetDiscoveryRegistry(client discoveryregistry.SvcDiscoveryRegistry) { ws.MessageHandler = NewGrpcHandler(ws.validate, client) } + func (ws *WsServer) SetCacheHandler(cache cache.MsgModel) { ws.cache = cache } @@ -113,7 +114,6 @@ func NewWsServer(opts ...Option) (*WsServer, error) { } if config.port < 1024 { return nil, errors.New("port not allow to listen") - } v := validator.New() return &WsServer{ @@ -134,6 +134,7 @@ func NewWsServer(opts ...Option) (*WsServer, error) { Encoder: NewGobEncoder(), }, nil } + func (ws *WsServer) Run() error { var client *Client go func() { @@ -150,7 +151,7 @@ func (ws *WsServer) Run() error { }() http.HandleFunc("/", ws.wsHandler) // http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {}) - return http.ListenAndServe(":"+utils.IntToString(ws.port), nil) //Start listening + return http.ListenAndServe(":"+utils.IntToString(ws.port), nil) // Start listening } func (ws *WsServer) registerClient(client *Client) { @@ -165,7 +166,6 @@ func (ws *WsServer) registerClient(client *Client) { log.ZDebug(client.ctx, "user not exist", "userID", client.UserID, "platformID", client.PlatformID) atomic.AddInt64(&ws.onlineUserNum, 1) atomic.AddInt64(&ws.onlineUserConnNum, 1) - } else { i := &kickHandler{ clientOK: clientOK, @@ -176,7 +176,7 @@ func (ws *WsServer) registerClient(client *Client) { log.ZDebug(client.ctx, "user exist", "userID", client.UserID, "platformID", client.PlatformID) if clientOK { ws.clients.Set(client.UserID, client) - //已经有同平台的连接存在 + // 已经有同平台的连接存在 log.ZInfo(client.ctx, "repeat login", "userID", client.UserID, "platformID", client.PlatformID, "old remote addr", getRemoteAdders(oldClients)) atomic.AddInt64(&ws.onlineUserConnNum, 1) } else { @@ -194,6 +194,7 @@ func (ws *WsServer) registerClient(client *Client) { ws.onlineUserConnNum, ) } + func getRemoteAdders(client []*Client) string { var ret string for i, c := range client { @@ -284,8 +285,8 @@ func (ws *WsServer) multiTerminalLoginChecker(info *kickHandler) { } } } - } + func (ws *WsServer) unregisterClient(client *Client) { defer ws.clientPool.Put(client) isDeleteUser := ws.clients.delete(client.UserID, client.ctx.GetRemoteAddr()) diff --git a/internal/msggateway/options.go b/internal/msggateway/options.go index cee415f99..24cbbe43f 100644 --- a/internal/msggateway/options.go +++ b/internal/msggateway/options.go @@ -16,33 +16,38 @@ package msggateway import "time" -type Option func(opt *configs) -type configs struct { - //长连接监听端口 - port int - //长连接允许最大链接数 - maxConnNum int64 - //连接握手超时时间 - handshakeTimeout time.Duration - //允许消息最大长度 - messageMaxMsgLength int -} +type ( + Option func(opt *configs) + configs struct { + // 长连接监听端口 + port int + // 长连接允许最大链接数 + maxConnNum int64 + // 连接握手超时时间 + handshakeTimeout time.Duration + // 允许消息最大长度 + messageMaxMsgLength int + } +) func WithPort(port int) Option { return func(opt *configs) { opt.port = port } } + func WithMaxConnNum(num int64) Option { return func(opt *configs) { opt.maxConnNum = num } } + func WithHandshakeTimeout(t time.Duration) Option { return func(opt *configs) { opt.handshakeTimeout = t } } + func WithMessageMaxMsgLength(length int) Option { return func(opt *configs) { opt.messageMaxMsgLength = length diff --git a/internal/msggateway/user_map.go b/internal/msggateway/user_map.go index 9e6757731..7e62557b2 100644 --- a/internal/msggateway/user_map.go +++ b/internal/msggateway/user_map.go @@ -29,6 +29,7 @@ type UserMap struct { func newUserMap() *UserMap { return &UserMap{} } + func (u *UserMap) GetAll(key string) ([]*Client, bool) { allClients, ok := u.m.Load(key) if ok { @@ -36,6 +37,7 @@ func (u *UserMap) GetAll(key string) ([]*Client, bool) { } return nil, ok } + func (u *UserMap) Get(key string, platformID int) ([]*Client, bool, bool) { allClients, userExisted := u.m.Load(key) if userExisted { @@ -47,12 +49,12 @@ func (u *UserMap) Get(key string, platformID int) ([]*Client, bool, bool) { } if len(clients) > 0 { return clients, userExisted, true - } return clients, userExisted, false } return nil, userExisted, false } + func (u *UserMap) Set(key string, v *Client) { allClients, existed := u.m.Load(key) if existed { @@ -67,6 +69,7 @@ func (u *UserMap) Set(key string, v *Client) { u.m.Store(key, clients) } } + func (u *UserMap) delete(key string, connRemoteAddr string) (isDeleteUser bool) { allClients, existed := u.m.Load(key) if existed { @@ -87,6 +90,7 @@ func (u *UserMap) delete(key string, connRemoteAddr string) (isDeleteUser bool) } return existed } + func (u *UserMap) deleteClients(key string, clients []*Client) (isDeleteUser bool) { m := utils.SliceToMapAny(clients, func(c *Client) (string, struct{}) { return c.ctx.GetRemoteAddr(), struct{}{} @@ -110,6 +114,7 @@ func (u *UserMap) deleteClients(key string, clients []*Client) (isDeleteUser boo } return existed } + func (u *UserMap) DeleteAll(key string) { u.m.Delete(key) } diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 0a52dea96..aa31783b3 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -84,9 +84,12 @@ func StartTransfer(prometheusPort int) error { func NewMsgTransfer(chatLogDatabase controller.ChatLogDatabase, msgDatabase controller.CommonMsgDatabase, - conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) *MsgTransfer { - return &MsgTransfer{persistentCH: NewPersistentConsumerHandler(chatLogDatabase), historyCH: NewOnlineHistoryRedisConsumerHandler(msgDatabase, conversationRpcClient, groupRpcClient), - historyMongoCH: NewOnlineHistoryMongoConsumerHandler(msgDatabase)} + conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient, +) *MsgTransfer { + return &MsgTransfer{ + persistentCH: NewPersistentConsumerHandler(chatLogDatabase), historyCH: NewOnlineHistoryRedisConsumerHandler(msgDatabase, conversationRpcClient, groupRpcClient), + historyMongoCH: NewOnlineHistoryMongoConsumerHandler(msgDatabase), + } } func (m *MsgTransfer) initPrometheus() { diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 574288de0..2a6f69f1a 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -38,10 +38,12 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" ) -const ConsumerMsgs = 3 -const SourceMessages = 4 -const MongoMessages = 5 -const ChannelNum = 100 +const ( + ConsumerMsgs = 3 + SourceMessages = 4 + MongoMessages = 5 + ChannelNum = 100 +) type MsgChannelValue struct { uniqueKey string @@ -85,7 +87,7 @@ func NewOnlineHistoryRedisConsumerHandler( ) *OnlineHistoryRedisConsumerHandler { var och OnlineHistoryRedisConsumerHandler och.msgDatabase = database - och.msgDistributionCh = make(chan Cmd2Value) //no buffer channel + och.msgDistributionCh = make(chan Cmd2Value) // no buffer channel go och.MessagesDistributionHandle() for i := 0; i < ChannelNum; i++ { och.chArrays[i] = make(chan Cmd2Value, 50) @@ -93,8 +95,10 @@ func NewOnlineHistoryRedisConsumerHandler( } och.conversationRpcClient = conversationRpcClient och.groupRpcClient = groupRpcClient - och.historyConsumerGroup = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0, - OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.LatestMsgToRedis.Topic}, + och.historyConsumerGroup = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{ + KafkaVersion: sarama.V2_0_0_0, + OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false, + }, []string{config.Config.Kafka.LatestMsgToRedis.Topic}, config.Config.Kafka.Addr, config.Config.Kafka.ConsumerGroupID.MsgToRedis) // statistics.NewStatistics(&och.singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d // second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) @@ -163,7 +167,7 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) { } } -// 获取消息/通知 存储的消息列表, 不存储并且推送的消息列表, +// 获取消息/通知 存储的消息列表, 不存储并且推送的消息列表,. func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList( totalMsgs []*ContextMsg, ) (storageMsgList, notStorageMsgList, storageNotificatoinList, notStorageNotificationList, modifyMsgList []*sdkws.MsgData) { @@ -312,7 +316,7 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() { triggerChannelValue := cmd.Value.(TriggerChannelValue) ctx := triggerChannelValue.ctx consumerMessages := triggerChannelValue.cMsgList - //Aggregation map[userid]message list + // Aggregation map[userid]message list log.ZDebug(ctx, "batch messages come to distribution center", "length", len(consumerMessages)) for i := 0; i < len(consumerMessages); i++ { ctxMsg := &ContextMsg{} @@ -378,13 +382,13 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() { } } } + func withAggregationCtx(ctx context.Context, values []*ContextMsg) context.Context { var allMessageOperationID string for i, v := range values { if opid := mcontext.GetOperationID(v.ctx); opid != "" { if i == 0 { allMessageOperationID += opid - } else { allMessageOperationID += "$" + opid } @@ -431,13 +435,15 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim( ctx := mcontext.WithTriggerIDContext(context.Background(), utils.OperationIDGenerator()) log.ZDebug(ctx, "timer trigger msg consumer start", "length", len(ccMsg)) for i := 0; i < len(ccMsg)/split; i++ { - //log.Debug() + // log.Debug() och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ - ctx: ctx, cMsgList: ccMsg[i*split : (i+1)*split]}} + ctx: ctx, cMsgList: ccMsg[i*split : (i+1)*split], + }} } if (len(ccMsg) % split) > 0 { och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ - ctx: ctx, cMsgList: ccMsg[split*(len(ccMsg)/split):]}} + ctx: ctx, cMsgList: ccMsg[split*(len(ccMsg)/split):], + }} } log.ZDebug(ctx, "timer trigger msg consumer end", "length", len(ccMsg)) } diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index a7bfa3c8f..f2f14f0ed 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -34,8 +34,10 @@ type OnlineHistoryMongoConsumerHandler struct { func NewOnlineHistoryMongoConsumerHandler(database controller.CommonMsgDatabase) *OnlineHistoryMongoConsumerHandler { mc := &OnlineHistoryMongoConsumerHandler{ - historyConsumerGroup: kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0, - OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.MsgToMongo.Topic}, + historyConsumerGroup: kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{ + KafkaVersion: sarama.V2_0_0_0, + OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false, + }, []string{config.Config.Kafka.MsgToMongo.Topic}, config.Config.Kafka.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo), msgDatabase: database, } diff --git a/internal/msgtransfer/persistent_msg_handler.go b/internal/msgtransfer/persistent_msg_handler.go index 9ee0f5393..debc4905f 100644 --- a/internal/msgtransfer/persistent_msg_handler.go +++ b/internal/msgtransfer/persistent_msg_handler.go @@ -36,8 +36,10 @@ type PersistentConsumerHandler struct { func NewPersistentConsumerHandler(database controller.ChatLogDatabase) *PersistentConsumerHandler { return &PersistentConsumerHandler{ - persistentConsumerGroup: kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0, - OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.LatestMsgToRedis.Topic}, + persistentConsumerGroup: kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{ + KafkaVersion: sarama.V2_0_0_0, + OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false, + }, []string{config.Config.Kafka.LatestMsgToRedis.Topic}, config.Config.Kafka.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMySql), chatLogDatabase: database, } @@ -59,9 +61,9 @@ func (pc *PersistentConsumerHandler) handleChatWs2Mysql( } return log.ZDebug(ctx, "handleChatWs2Mysql", "msg", msgFromMQ.MsgData) - //Control whether to store history messages (mysql) + // Control whether to store history messages (mysql) isPersist := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsPersistent) - //Only process receiver data + // Only process receiver data if isPersist { switch msgFromMQ.MsgData.SessionType { case constant.SingleChatType, constant.NotificationChatType: diff --git a/internal/push/offlinepush/fcm/push.go b/internal/push/offlinepush/fcm/push.go index 8b880ae1f..a5294961d 100644 --- a/internal/push/offlinepush/fcm/push.go +++ b/internal/push/offlinepush/fcm/push.go @@ -95,7 +95,7 @@ func (f *Fcm) Push(ctx context.Context, userIDs []string, title, content string, if err == nil { apns.Payload.Aps.Badge = &unreadCountSum } else { - //log.Error(operationID, "IncrUserBadgeUnreadCountSum redis err", err.Error(), uid) + // log.Error(operationID, "IncrUserBadgeUnreadCountSum redis err", err.Error(), uid) Fail++ continue } @@ -107,7 +107,7 @@ func (f *Fcm) Push(ctx context.Context, userIDs []string, title, content string, zero := 1 apns.Payload.Aps.Badge = &zero } else { - //log.Error(operationID, "GetUserBadgeUnreadCountSum redis err", err.Error(), uid) + // log.Error(operationID, "GetUserBadgeUnreadCountSum redis err", err.Error(), uid) Fail++ continue } @@ -127,7 +127,7 @@ func (f *Fcm) Push(ctx context.Context, userIDs []string, title, content string, response, err := f.fcmMsgCli.SendAll(ctx, messages) if err != nil { Fail = Fail + messageCount - //log.Info(operationID, "some token push err", err.Error(), messageCount) + // log.Info(operationID, "some token push err", err.Error(), messageCount) } else { Success = Success + response.SuccessCount Fail = Fail + response.FailureCount diff --git a/internal/push/offlinepush/getui/body.go b/internal/push/offlinepush/getui/body.go index bd0f7b07f..915c6f603 100644 --- a/internal/push/offlinepush/getui/body.go +++ b/internal/push/offlinepush/getui/body.go @@ -145,7 +145,7 @@ func newPushReq(title, content string) PushReq { } func newBatchPushReq(userIDs []string, taskID string) PushReq { - var IsAsync = true + IsAsync := true return PushReq{Audience: &Audience{Alias: userIDs}, IsAsync: &IsAsync, TaskID: &taskID} } diff --git a/internal/push/offlinepush/getui/push.go b/internal/push/offlinepush/getui/push.go index 2681aede6..ada5ac8b6 100644 --- a/internal/push/offlinepush/getui/push.go +++ b/internal/push/offlinepush/getui/push.go @@ -15,17 +15,16 @@ package getui import ( - "sync" - - "github.com/go-redis/redis" - "context" "crypto/sha256" "encoding/hex" "errors" "strconv" + "sync" "time" + "github.com/go-redis/redis" + "github.com/OpenIMSDK/Open-IM-Server/internal/push/offlinepush" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache" @@ -49,7 +48,7 @@ const ( taskURL = "/push/list/message" batchPushURL = "/push/list/alias" - // codes + // codes. tokenExpireCode = 10001 tokenExpireTime = 60 * 60 * 23 taskIDTTL = 1000 * 60 * 60 * 24 @@ -142,7 +141,7 @@ func (g *Client) GetTaskID(ctx context.Context, token string, pushReq PushReq) ( return respTask.TaskID, nil } -// max num is 999 +// max num is 999. func (g *Client) batchPush(ctx context.Context, token string, userIDs []string, pushReq PushReq) error { taskID, err := g.GetTaskID(ctx, token, pushReq) if err != nil { diff --git a/internal/push/offlinepush/jpush/body/platform.go b/internal/push/offlinepush/jpush/body/platform.go index eae782c63..d15609b53 100644 --- a/internal/push/offlinepush/jpush/body/platform.go +++ b/internal/push/offlinepush/jpush/body/platform.go @@ -66,6 +66,7 @@ func (p *Platform) Set(os string) error { return nil } + func (p *Platform) SetPlatform(platform string) error { switch platform { case constant.AndroidPlatformStr: @@ -75,7 +76,7 @@ func (p *Platform) SetPlatform(platform string) error { default: return errors.New("platform err") } - +} } func (p *Platform) SetIOS() error { return p.Set(IOS) diff --git a/internal/push/offlinepush/jpush/body/pushobj.go b/internal/push/offlinepush/jpush/body/pushobj.go index 950f93777..c8c112f69 100644 --- a/internal/push/offlinepush/jpush/body/pushobj.go +++ b/internal/push/offlinepush/jpush/body/pushobj.go @@ -37,6 +37,7 @@ func (p *PushObj) SetNotification(no *Notification) { func (p *PushObj) SetMessage(m *Message) { p.Message = m } + func (p *PushObj) SetOptions(o *Options) { p.Options = o } diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index 86db1c762..d8c04ccb0 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -37,8 +37,10 @@ type ConsumerHandler struct { func NewConsumerHandler(pusher *Pusher) *ConsumerHandler { var consumerHandler ConsumerHandler consumerHandler.pusher = pusher - consumerHandler.pushConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0, - OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.MsgToPush.Topic}, config.Config.Kafka.Addr, + consumerHandler.pushConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{ + KafkaVersion: sarama.V2_0_0_0, + OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false, + }, []string{config.Config.Kafka.MsgToPush.Topic}, config.Config.Kafka.Addr, config.Config.Kafka.ConsumerGroupID.MsgToPush) return &consumerHandler } @@ -76,7 +78,8 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) { func (ConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } func (ConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, - claim sarama.ConsumerGroupClaim) error { + claim sarama.ConsumerGroupClaim, +) error { for msg := range claim.Messages() { ctx := c.pushConsumerGroup.GetContextFromMsg(msg) c.handleMs2PsChat(ctx, msg.Value) diff --git a/internal/push/push_to_client.go b/internal/push/push_to_client.go index f12b4b4c8..0583b453d 100644 --- a/internal/push/push_to_client.go +++ b/internal/push/push_to_client.go @@ -54,7 +54,8 @@ var errNoOfflinePusher = errors.New("no offlinePusher is configured") func NewPusher(discov discoveryregistry.SvcDiscoveryRegistry, offlinePusher offlinepush.OfflinePusher, database controller.PushDatabase, groupLocalCache *localcache.GroupLocalCache, conversationLocalCache *localcache.ConversationLocalCache, - conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient, msgRpcClient *rpcclient.MessageRpcClient) *Pusher { + conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient, msgRpcClient *rpcclient.MessageRpcClient, +) *Pusher { return &Pusher{ discov: discov, database: database, @@ -223,7 +224,7 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws } needOfflinePushUserIDs = utils.DifferenceString(notNotificationUserIDs, needOfflinePushUserIDs) } - //Use offline push messaging + // Use offline push messaging if len(needOfflinePushUserIDs) > 0 { var offlinePushUserIDs []string err = callbackOfflinePush(ctx, needOfflinePushUserIDs, msg, &offlinePushUserIDs) @@ -254,7 +255,7 @@ func (p *Pusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, if err != nil { return nil, err } - //Online push message + // Online push message for _, v := range conns { msgClient := msggateway.NewMsgGatewayClient(v) reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(ctx, &msggateway.OnlineBatchPushOneMsgReq{MsgData: msg, PushToUserIDs: pushToUserIDs}) @@ -265,7 +266,6 @@ func (p *Pusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, if reply != nil && reply.SinglePushResult != nil { wsResults = append(wsResults, reply.SinglePushResult...) } - } return wsResults, nil } diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index 5372d4220..d28623f39 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -184,7 +184,7 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbConver return &pbConversation.SetConversationsResp{}, nil } -// 获取超级大群开启免打扰的用户ID +// 获取超级大群开启免打扰的用户ID. func (c *conversationServer) GetRecvMsgNotNotifyUserIDs(ctx context.Context, req *pbConversation.GetRecvMsgNotNotifyUserIDsReq) (*pbConversation.GetRecvMsgNotNotifyUserIDsResp, error) { userIDs, err := c.conversationDatabase.FindRecvMsgNotNotifyUserIDs(ctx, req.GroupID) if err != nil { @@ -193,7 +193,7 @@ func (c *conversationServer) GetRecvMsgNotNotifyUserIDs(ctx context.Context, req return &pbConversation.GetRecvMsgNotNotifyUserIDsResp{UserIDs: userIDs}, nil } -// create conversation without notification for msg redis transfer +// create conversation without notification for msg redis transfer. func (c *conversationServer) CreateSingleChatConversations(ctx context.Context, req *pbConversation.CreateSingleChatConversationsReq) (*pbConversation.CreateSingleChatConversationsResp, error) { var conversation tableRelation.ConversationModel conversation.ConversationID = utils.GetConversationIDBySessionType(constant.SingleChatType, req.RecvID, req.SendID) diff --git a/internal/rpc/friend/friend.go b/internal/rpc/friend/friend.go index 4c528d1d1..f511305a4 100644 --- a/internal/rpc/friend/friend.go +++ b/internal/rpc/friend/friend.go @@ -83,7 +83,7 @@ func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error { return nil } -// ok +// ok. func (s *friendServer) ApplyToAddFriend( ctx context.Context, req *pbfriend.ApplyToAddFriendReq, @@ -116,7 +116,7 @@ func (s *friendServer) ApplyToAddFriend( return resp, nil } -// ok +// ok. func (s *friendServer) ImportFriends( ctx context.Context, req *pbfriend.ImportFriendReq, @@ -142,7 +142,7 @@ func (s *friendServer) ImportFriends( return &pbfriend.ImportFriendResp{}, nil } -// ok +// ok. func (s *friendServer) RespondFriendApply( ctx context.Context, req *pbfriend.RespondFriendApplyReq, @@ -178,7 +178,7 @@ func (s *friendServer) RespondFriendApply( return nil, errs.ErrArgs.Wrap("req.HandleResult != -1/1") } -// ok +// ok. func (s *friendServer) DeleteFriend( ctx context.Context, req *pbfriend.DeleteFriendReq, @@ -199,7 +199,7 @@ func (s *friendServer) DeleteFriend( return resp, nil } -// ok +// ok. func (s *friendServer) SetFriendRemark( ctx context.Context, req *pbfriend.SetFriendRemarkReq, @@ -220,7 +220,7 @@ func (s *friendServer) SetFriendRemark( return resp, nil } -// ok +// ok. func (s *friendServer) GetDesignatedFriends( ctx context.Context, req *pbfriend.GetDesignatedFriendsReq, @@ -240,7 +240,7 @@ func (s *friendServer) GetDesignatedFriends( return resp, nil } -// ok 获取接收到的好友申请(即别人主动申请的) +// ok 获取接收到的好友申请(即别人主动申请的). func (s *friendServer) GetPaginationFriendsApplyTo( ctx context.Context, req *pbfriend.GetPaginationFriendsApplyToReq, @@ -263,7 +263,7 @@ func (s *friendServer) GetPaginationFriendsApplyTo( return resp, nil } -// ok 获取主动发出去的好友申请列表 +// ok 获取主动发出去的好友申请列表. func (s *friendServer) GetPaginationFriendsApplyFrom( ctx context.Context, req *pbfriend.GetPaginationFriendsApplyFromReq, @@ -286,7 +286,7 @@ func (s *friendServer) GetPaginationFriendsApplyFrom( return resp, nil } -// ok +// ok. func (s *friendServer) IsFriend( ctx context.Context, req *pbfriend.IsFriendReq, @@ -300,7 +300,7 @@ func (s *friendServer) IsFriend( return resp, nil } -// ok +// ok. func (s *friendServer) GetPaginationFriends( ctx context.Context, req *pbfriend.GetPaginationFriendsReq, diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 161e8b8f8..56c0284a6 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -17,14 +17,18 @@ package group import ( "context" "fmt" - pbConversation "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/conversation" - "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/wrapperspb" "math/big" "math/rand" "strconv" "strings" "time" + pbConversation "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/conversation" + "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/wrapperspb" + + pbConversation "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/conversation" + "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/wrapperspb" + "github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient/notification" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/convert" @@ -229,7 +233,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR } }() } else { - //s.Notification.GroupCreatedNotification(ctx, group, groupMembers, userMap) + // s.Notification.GroupCreatedNotification(ctx, group, groupMembers, userMap) tips := &sdkws.GroupCreatedTips{ Group: resp.GroupInfo, OperationTime: group.CreateTime.UnixMilli(), @@ -258,7 +262,7 @@ func (s *groupServer) GetJoinedGroupList(ctx context.Context, req *pbGroup.GetJo pageNumber = req.Pagination.PageNumber showNumber = req.Pagination.ShowNumber } - //total, members, err := s.GroupDatabase.PageGroupMember(ctx, nil, []string{req.FromUserID}, nil, pageNumber, showNumber) + // total, members, err := s.GroupDatabase.PageGroupMember(ctx, nil, []string{req.FromUserID}, nil, pageNumber, showNumber) total, members, err := s.GroupDatabase.PageGetJoinGroup(ctx, req.FromUserID, pageNumber, showNumber) if err != nil { return nil, err @@ -515,7 +519,7 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbGroup.KickGrou Notification: group.Notification, Introduction: group.Introduction, FaceURL: group.FaceURL, - //OwnerUserID: owner[0].UserID, + // OwnerUserID: owner[0].UserID, CreateTime: group.CreateTime.UnixMilli(), MemberCount: num, Ex: group.Ex, @@ -900,7 +904,6 @@ func (s *groupServer) SetGroupInfo(ctx context.Context, req *pbGroup.SetGroupInf }() num++ s.Notification.GroupInfoSetAnnouncementNotification(ctx, &sdkws.GroupInfoSetAnnouncementTips{Group: tips.Group, OpUser: tips.OpUser}) - } switch len(data) - num { case 0: @@ -1104,7 +1107,7 @@ func (s *groupServer) DismissGroup(ctx context.Context, req *pbGroup.DismissGrou if err != nil { return nil, err } - //s.Notification.GroupDismissedNotification(ctx, req) + // s.Notification.GroupDismissedNotification(ctx, req) tips := &sdkws.GroupDismissedTips{ Group: s.groupDB2PB(group, owner.UserID, num), OpUser: &sdkws.GroupMemberFullInfo{}, diff --git a/internal/rpc/group/statistics.go b/internal/rpc/group/statistics.go index 9b83a5ead..d638cb6fd 100644 --- a/internal/rpc/group/statistics.go +++ b/internal/rpc/group/statistics.go @@ -16,9 +16,10 @@ package group import ( "context" + "time" + "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/group" - "time" ) func (s *groupServer) GroupCreateCount(ctx context.Context, req *group.GroupCreateCountReq) (*group.GroupCreateCountResp, error) { diff --git a/internal/rpc/msg/as_read.go b/internal/rpc/msg/as_read.go index 1176924d5..f83c2ad80 100644 --- a/internal/rpc/msg/as_read.go +++ b/internal/rpc/msg/as_read.go @@ -39,7 +39,7 @@ func (m *msgServer) GetConversationsHasReadAndMaxSeq(ctx context.Context, req *m if err != nil { return nil, err } - var conversationMaxSeqMap = make(map[string]int64) + conversationMaxSeqMap := make(map[string]int64) for _, conversation := range conversations { if conversation.MaxSeq != 0 { conversationMaxSeqMap[conversation.ConversationID] = conversation.MaxSeq diff --git a/internal/rpc/msg/lock.go b/internal/rpc/msg/lock.go index 50224c6a3..dd99ab822 100644 --- a/internal/rpc/msg/lock.go +++ b/internal/rpc/msg/lock.go @@ -36,6 +36,7 @@ type LockerMessage struct { func NewLockerMessage(cache cache.MsgModel) *LockerMessage { return &LockerMessage{cache: cache} } + func (l *LockerMessage) LockMessageTypeKey(ctx context.Context, clientMsgID, typeKey string) (err error) { for i := 0; i < 3; i++ { err = l.cache.LockMessageTypeKey(ctx, clientMsgID, typeKey) @@ -47,7 +48,7 @@ func (l *LockerMessage) LockMessageTypeKey(ctx context.Context, clientMsgID, typ } } return err - +} } func (l *LockerMessage) LockGlobalMessage(ctx context.Context, clientMsgID string) (err error) { for i := 0; i < 3; i++ { @@ -60,11 +61,12 @@ func (l *LockerMessage) LockGlobalMessage(ctx context.Context, clientMsgID strin } } return err - +} } func (l *LockerMessage) UnLockMessageTypeKey(ctx context.Context, clientMsgID string, typeKey string) error { return l.cache.UnLockMessageTypeKey(ctx, clientMsgID, typeKey) } + func (l *LockerMessage) UnLockGlobalMessage(ctx context.Context, clientMsgID string) error { return l.cache.UnLockMessageTypeKey(ctx, clientMsgID, GlOBALLOCK) } diff --git a/internal/rpc/msg/send.go b/internal/rpc/msg/send.go index 671450eee..66d8eb1ae 100644 --- a/internal/rpc/msg/send.go +++ b/internal/rpc/msg/send.go @@ -84,6 +84,7 @@ func (m *msgServer) sendMsgSuperGroupChat( resp.ClientMsgID = req.MsgData.ClientMsgID return resp, nil } + func (m *msgServer) setConversationAtInfo(nctx context.Context, msg *sdkws.MsgData) { log.ZDebug(nctx, "setConversationAtInfo", "msg", msg) ctx := mcontext.NewCtx("@@@" + mcontext.GetOperationID(nctx)) @@ -101,7 +102,7 @@ func (m *msgServer) setConversationAtInfo(nctx context.Context, msg *sdkws.MsgDa return } atUserID = utils.DifferenceString([]string{constant.AtAllString}, msg.AtUserIDList) - if len(atUserID) == 0 { //just @everyone + if len(atUserID) == 0 { // just @everyone conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtAll} } else { //@Everyone and @other people conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtAllAtMe} @@ -123,7 +124,6 @@ func (m *msgServer) setConversationAtInfo(nctx context.Context, msg *sdkws.MsgDa log.ZWarn(ctx, "SetConversations", err, msg.AtUserIDList, conversation) } } - } func (m *msgServer) sendMsgNotification( diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index cf50dfb9f..08cff738b 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -16,6 +16,7 @@ package msg import ( "context" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/relation" "google.golang.org/grpc" @@ -32,20 +33,22 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient" ) -type MessageInterceptorChain []MessageInterceptorFunc -type msgServer struct { - RegisterCenter discoveryregistry.SvcDiscoveryRegistry - MsgDatabase controller.CommonMsgDatabase - Group *rpcclient.GroupRpcClient - User *rpcclient.UserRpcClient - Conversation *rpcclient.ConversationRpcClient - friend *rpcclient.FriendRpcClient - GroupLocalCache *localcache.GroupLocalCache - ConversationLocalCache *localcache.ConversationLocalCache - MessageLocker MessageLocker - Handlers MessageInterceptorChain - notificationSender *rpcclient.NotificationSender -} +type ( + MessageInterceptorChain []MessageInterceptorFunc + msgServer struct { + RegisterCenter discoveryregistry.SvcDiscoveryRegistry + MsgDatabase controller.CommonMsgDatabase + Group *rpcclient.GroupRpcClient + User *rpcclient.UserRpcClient + Conversation *rpcclient.ConversationRpcClient + friend *rpcclient.FriendRpcClient + GroupLocalCache *localcache.GroupLocalCache + ConversationLocalCache *localcache.ConversationLocalCache + MessageLocker MessageLocker + Handlers MessageInterceptorChain + notificationSender *rpcclient.NotificationSender + } +) func (m *msgServer) addInterceptorHandler(interceptorFunc ...MessageInterceptorFunc) { m.Handlers = append(m.Handlers, interceptorFunc...) diff --git a/internal/rpc/msg/statistics.go b/internal/rpc/msg/statistics.go index 309534507..d3c0ecd8a 100644 --- a/internal/rpc/msg/statistics.go +++ b/internal/rpc/msg/statistics.go @@ -16,11 +16,12 @@ package msg import ( "context" + "time" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" - "time" ) func (m *msgServer) GetActiveUser(ctx context.Context, req *msg.GetActiveUserReq) (*msg.GetActiveUserResp, error) { diff --git a/internal/rpc/msg/sync_msg.go b/internal/rpc/msg/sync_msg.go index 01510ffef..59b23dda5 100644 --- a/internal/rpc/msg/sync_msg.go +++ b/internal/rpc/msg/sync_msg.go @@ -16,6 +16,7 @@ package msg import ( "context" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg" diff --git a/internal/rpc/msg/verify.go b/internal/rpc/msg/verify.go index be2945182..3a0276e28 100644 --- a/internal/rpc/msg/verify.go +++ b/internal/rpc/msg/verify.go @@ -28,9 +28,7 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" ) -var ( - ExcludeContentType = []int{constant.HasReadReceipt} -) +var ExcludeContentType = []int{constant.HasReadReceipt} type Validator interface { validate(pb *msg.SendMsgReq) (bool, int32, string) @@ -126,6 +124,7 @@ func (m *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgRe return nil } } + func (m *msgServer) encapsulateMsgData(msg *sdkws.MsgData) { msg.ServerMsgID = GetMsgID(msg.SendID) msg.SendTime = utils.GetCurrentTimestampByMill() diff --git a/internal/rpc/third/s3.go b/internal/rpc/third/s3.go index 984fca94f..7873d4c54 100644 --- a/internal/rpc/third/s3.go +++ b/internal/rpc/third/s3.go @@ -16,6 +16,8 @@ package third import ( "context" + "time" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/s3/cont" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" @@ -23,7 +25,6 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/third" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" - "time" ) func (t *thirdServer) PartLimit(ctx context.Context, req *third.PartLimitReq) (*third.PartLimitResp, error) { diff --git a/internal/rpc/third/third.go b/internal/rpc/third/third.go index 99ddcb342..6b53d8b3e 100644 --- a/internal/rpc/third/third.go +++ b/internal/rpc/third/third.go @@ -17,12 +17,13 @@ package third import ( "context" "fmt" + "net/url" + "time" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/s3" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/s3/cos" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/s3/minio" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/s3/oss" - "net/url" - "time" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache" diff --git a/internal/rpc/third/tool.go b/internal/rpc/third/tool.go index 247f3e21d..e6b1f7e17 100644 --- a/internal/rpc/third/tool.go +++ b/internal/rpc/third/tool.go @@ -18,12 +18,13 @@ import ( "context" "errors" "fmt" + "strings" + "unicode/utf8" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/tokenverify" "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/third" - "strings" - "unicode/utf8" ) func toPbMapArray(m map[string][]string) []*third.KeyValues { diff --git a/internal/tools/msg.go b/internal/tools/msg.go index 40539c757..cd714ee0e 100644 --- a/internal/tools/msg.go +++ b/internal/tools/msg.go @@ -48,7 +48,8 @@ type MsgTool struct { } func NewMsgTool(msgDatabase controller.CommonMsgDatabase, userDatabase controller.UserDatabase, - groupDatabase controller.GroupDatabase, conversationDatabase controller.ConversationDatabase, msgNotificationSender *notification.MsgNotificationSender) *MsgTool { + groupDatabase controller.GroupDatabase, conversationDatabase controller.ConversationDatabase, msgNotificationSender *notification.MsgNotificationSender, +) *MsgTool { return &MsgTool{ msgDatabase: msgDatabase, userDatabase: userDatabase, diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index ae5599891..f9b2b9783 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -31,7 +31,7 @@ type CallBackConfig struct { type NotificationConf struct { IsSendMsg bool `yaml:"isSendMsg"` - ReliabilityLevel int `yaml:"reliabilityLevel"` // 1 online 2 presistent + ReliabilityLevel int `yaml:"reliabilityLevel"` // 1 online 2 persistent UnreadCount bool `yaml:"unreadCount"` OfflinePush POfflinePush `yaml:"offlinePush"` } diff --git a/pkg/common/config/parse.go b/pkg/common/config/parse.go index 38f033ab4..60c4802a4 100644 --- a/pkg/common/config/parse.go +++ b/pkg/common/config/parse.go @@ -30,7 +30,7 @@ import ( var ( _, b, _, _ = runtime.Caller(0) - // Root folder of this project + // Root folder of this project. Root = filepath.Join(filepath.Dir(b), "../../..") ) diff --git a/pkg/common/constant/constant.go b/pkg/common/constant/constant.go index c9652984f..78a9435bf 100644 --- a/pkg/common/constant/constant.go +++ b/pkg/common/constant/constant.go @@ -45,7 +45,7 @@ const ( SignalMsg = 202 CustomNotification = 203 - //SysRelated. + // SysRelated. NotificationBegin = 1000 FriendApplicationApprovedNotification = 1201 // add_friend_response @@ -113,37 +113,37 @@ const ( NotificationEnd = 5000 - //status. + // status. MsgNormal = 1 MsgDeleted = 4 - //MsgFrom. + // MsgFrom. UserMsgType = 100 SysMsgType = 200 - //SessionType. + // SessionType. SingleChatType = 1 GroupChatType = 2 SuperGroupChatType = 3 NotificationChatType = 4 - //token. + // token. NormalToken = 0 InValidToken = 1 KickedToken = 2 ExpiredToken = 3 - //MultiTerminalLogin. + // MultiTerminalLogin. DefalutNotKick = 0 - //Full-end login, but the same end is mutually exclusive. + // Full-end login, but the same end is mutually exclusive. AllLoginButSameTermKick = 1 - //Only one of the endpoints can log in. + // Only one of the endpoints can log in. SingleTerminalLogin = 2 - //The web side can be online at the same time, and the other side can only log in at one end. + // The web side can be online at the same time, and the other side can only log in at one end. WebAndOther = 3 // The PC side is mutually exclusive, and the mobile side is mutually exclusive, but the web side can be online at // the same time. PcMobileAndWeb = 4 - //The PC terminal can be online at the same time,but other terminal only one of the endpoints can login. + // The PC terminal can be online at the same time,but other terminal only one of the endpoints can login. PCAndOther = 5 OnlineStatus = "online" @@ -151,12 +151,12 @@ const ( Registered = "registered" UnRegistered = "unregistered" - //MsgReceiveOpt. + // MsgReceiveOpt. ReceiveMessage = 0 NotReceiveMessage = 1 ReceiveNotNotifyMessage = 2 - //OptionsKey. + // OptionsKey. IsHistory = "history" IsPersistent = "persistent" IsOfflinePush = "offlinePush" @@ -170,13 +170,13 @@ const ( IsNotNotification = "isNotNotification" IsSendMsg = "isSendMsg" - //GroupStatus. + // GroupStatus. GroupOk = 0 GroupBanChat = 1 GroupStatusDismissed = 2 GroupStatusMuted = 3 - //GroupType. + // GroupType. NormalGroup = 0 SuperGroup = 1 WorkingGroup = 2 @@ -184,19 +184,19 @@ const ( GroupBaned = 3 GroupBanPrivateChat = 4 - //UserJoinGroupSource. + // UserJoinGroupSource. JoinByAdmin = 1 JoinByInvitation = 2 JoinBySearch = 3 JoinByQRCode = 4 - //Minio. + // Minio. MinioDurationTimes = 3600 - //Aws. + // Aws. AwsDurationTimes = 3600 - //callbackCommand. + // callbackCommand. CallbackBeforeSendSingleMsgCommand = "callbackBeforeSendSingleMsgCommand" CallbackAfterSendSingleMsgCommand = "callbackAfterSendSingleMsgCommand" CallbackBeforeSendGroupMsgCommand = "callbackBeforeSendGroupMsgCommand" @@ -217,10 +217,10 @@ const ( CallbackGetMessageListReactionExtensionsCommand = "callbackGetMessageListReactionExtensionsCommand" CallbackAddMessageListReactionExtensionsCommand = "callbackAddMessageListReactionExtensionsCommand" - //callback actionCode. + // callback actionCode. ActionAllow = 0 ActionForbidden = 1 - //callback callbackHandleCode. + // callback callbackHandleCode. CallbackHandleSuccess = 0 CallbackHandleFailed = 1 diff --git a/pkg/common/constant/platform_id_to_name.go b/pkg/common/constant/platform_id_to_name.go index 169e1e758..3bc819ed9 100644 --- a/pkg/common/constant/platform_id_to_name.go +++ b/pkg/common/constant/platform_id_to_name.go @@ -15,9 +15,9 @@ package constant // fixme 1<--->IOS 2<--->Android 3<--->Windows -// fixme 4<--->OSX 5<--->Web 6<--->MiniWeb 7<--->Linux +// fixme 4<--->OSX 5<--->Web 6<--->MiniWeb 7<--->Linux. const ( - //Platform ID. + // Platform ID. IOSPlatformID = 1 AndroidPlatformID = 2 WindowsPlatformID = 3 @@ -29,7 +29,7 @@ const ( IPadPlatformID = 9 AdminPlatformID = 10 - //Platform string match to Platform ID. + // Platform string match to Platform ID. IOSPlatformStr = "IOS" AndroidPlatformStr = "Android" WindowsPlatformStr = "Windows" @@ -41,7 +41,7 @@ const ( IPadPlatformStr = "IPad" AdminPlatformStr = "Admin" - //terminal types. + // terminal types. TerminalPC = "PC" TerminalMobile = "Mobile" ) diff --git a/pkg/common/convert/group.go b/pkg/common/convert/group.go index e8808d199..9ec812574 100644 --- a/pkg/common/convert/group.go +++ b/pkg/common/convert/group.go @@ -76,7 +76,7 @@ func Db2PbGroupMember(m *relation.GroupMemberModel) *sdkws.GroupMemberFullInfo { JoinTime: m.JoinTime.UnixMilli(), Nickname: m.Nickname, FaceURL: m.FaceURL, - //AppMangerLevel: m.AppMangerLevel, + // AppMangerLevel: m.AppMangerLevel, JoinSource: m.JoinSource, OperatorUserID: m.OperatorUserID, Ex: m.Ex, diff --git a/pkg/common/convert/msg.go b/pkg/common/convert/msg.go index 59215bb86..b937369d5 100644 --- a/pkg/common/convert/msg.go +++ b/pkg/common/convert/msg.go @@ -54,7 +54,6 @@ func MsgPb2DB(msg *sdkws.MsgData) *unrelation.MsgDataModel { msgDataModel.AttachedInfo = msg.AttachedInfo msgDataModel.Ex = msg.Ex return &msgDataModel - } func MsgDB2Pb(msgModel *unrelation.MsgDataModel) *sdkws.MsgData { diff --git a/pkg/common/db/cache/black.go b/pkg/common/db/cache/black.go index d6b82ef33..bd990a4e0 100644 --- a/pkg/common/db/cache/black.go +++ b/pkg/common/db/cache/black.go @@ -29,13 +29,13 @@ const ( blackExpireTime = time.Second * 60 * 60 * 12 ) -// args fn will exec when no data in msgCache +// args fn will exec when no data in msgCache. type BlackCache interface { - //get blackIDs from msgCache + // get blackIDs from msgCache metaCache NewCache() BlackCache GetBlackIDs(ctx context.Context, userID string) (blackIDs []string, err error) - //del user's blackIDs msgCache, exec when a user's black list changed + // del user's blackIDs msgCache, exec when a user's black list changed DelBlackIDs(ctx context.Context, userID string) BlackCache } diff --git a/pkg/common/db/cache/conversation.go b/pkg/common/db/cache/conversation.go index c35f0c240..d3a9d44aa 100644 --- a/pkg/common/db/cache/conversation.go +++ b/pkg/common/db/cache/conversation.go @@ -41,7 +41,7 @@ const ( conversationExpireTime = time.Second * 60 * 60 * 12 ) -// arg fn will exec when no data in msgCache +// arg fn will exec when no data in msgCache. type ConversationCache interface { metaCache NewCache() ConversationCache diff --git a/pkg/common/db/cache/friend.go b/pkg/common/db/cache/friend.go index 4695987dd..b15ed95d1 100644 --- a/pkg/common/db/cache/friend.go +++ b/pkg/common/db/cache/friend.go @@ -32,7 +32,7 @@ const ( friendKey = "FRIEND_INFO:" ) -// args fn will exec when no data in msgCache +// args fn will exec when no data in msgCache. type FriendCache interface { metaCache NewCache() FriendCache @@ -109,7 +109,7 @@ func (f *FriendCacheRedis) DelFriendIDs(ownerUserID ...string) FriendCache { return new } -// todo +// todo. func (f *FriendCacheRedis) GetTwoWayFriendIDs( ctx context.Context, ownerUserID string, diff --git a/pkg/common/db/cache/group.go b/pkg/common/db/cache/group.go index 86adc2d0a..44ae04ecf 100644 --- a/pkg/common/db/cache/group.go +++ b/pkg/common/db/cache/group.go @@ -106,7 +106,8 @@ func NewGroupCacheRedis( opts rockscache.Options, ) GroupCache { rcClient := rockscache.NewClient(rdb, opts) - return &GroupCacheRedis{rcClient: rcClient, expireTime: groupExpireTime, + return &GroupCacheRedis{ + rcClient: rcClient, expireTime: groupExpireTime, groupDB: groupDB, groupMemberDB: groupMemberDB, groupRequestDB: groupRequestDB, mongoDB: mongoClient, metaCache: NewMetaCacheRedis(rcClient), } @@ -176,7 +177,7 @@ func (g *GroupCacheRedis) GetGroupMemberIndex(groupMember *relationTb.GroupMembe return 0, errIndex } -// / groupInfo +// / groupInfo. func (g *GroupCacheRedis) GetGroupsInfo( ctx context.Context, groupIDs []string, @@ -265,7 +266,7 @@ func (g *GroupCacheRedis) GetSuperGroupMemberIDs( ) } -// userJoinSuperGroup +// userJoinSuperGroup. func (g *GroupCacheRedis) DelJoinedSuperGroupIDs(userIDs ...string) GroupCache { new := g.NewCache() var keys []string @@ -286,7 +287,7 @@ func (g *GroupCacheRedis) DelSuperGroupMemberIDs(groupIDs ...string) GroupCache return new } -// groupMembersHash +// groupMembersHash. func (g *GroupCacheRedis) GetGroupMembersHash(ctx context.Context, groupID string) (hashCode uint64, err error) { return getCache( ctx, @@ -331,7 +332,7 @@ func (g *GroupCacheRedis) DelGroupMembersHash(groupID string) GroupCache { return cache } -// groupMemberIDs +// groupMemberIDs. func (g *GroupCacheRedis) GetGroupMemberIDs(ctx context.Context, groupID string) (groupMemberIDs []string, err error) { return getCache( ctx, diff --git a/pkg/common/db/cache/init_redis.go b/pkg/common/db/cache/init_redis.go index be0431adf..104d48ffd 100644 --- a/pkg/common/db/cache/init_redis.go +++ b/pkg/common/db/cache/init_redis.go @@ -28,10 +28,10 @@ import ( ) const ( - maxRetry = 10 //number of retries + maxRetry = 10 // number of retries ) -// NewRedis Initialize redis connection +// NewRedis Initialize redis connection. func NewRedis() (redis.UniversalClient, error) { if len(config.Config.Redis.Address) == 0 { return nil, errors.New("redis address is empty") diff --git a/pkg/common/db/cache/msg.go b/pkg/common/db/cache/msg.go index 5f57103b0..179a0ee61 100644 --- a/pkg/common/db/cache/msg.go +++ b/pkg/common/db/cache/msg.go @@ -212,6 +212,7 @@ func (c *msgCache) GetMaxSeqs(ctx context.Context, conversationIDs []string) (m func (c *msgCache) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) { return c.getSeq(ctx, conversationID, c.getMaxSeqKey) } + func (c *msgCache) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error { return c.setSeq(ctx, conversationID, minSeq, c.getMinSeqKey) } @@ -235,6 +236,7 @@ func (c *msgCache) SetMinSeqs(ctx context.Context, seqs map[string]int64) error func (c *msgCache) GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) { return c.getSeqs(ctx, conversationIDs, c.getMinSeqKey) } + func (c *msgCache) GetMinSeq(ctx context.Context, conversationID string) (int64, error) { return c.getSeq(ctx, conversationID, c.getMinSeqKey) } @@ -359,7 +361,7 @@ func (c *msgCache) GetMessagesBySeq( ) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) { pipe := c.rdb.Pipeline() for _, v := range seqs { - //MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1 + // MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1 key := c.getMessageCacheKey(conversationID, v) if err := pipe.Get(ctx, key).Err(); err != nil && err != redis.Nil { return nil, nil, err diff --git a/pkg/common/db/controller/auth.go b/pkg/common/db/controller/auth.go index 454d9707f..678d98b11 100644 --- a/pkg/common/db/controller/auth.go +++ b/pkg/common/db/controller/auth.go @@ -26,9 +26,9 @@ import ( ) type AuthDatabase interface { - //结果为空 不返回错误 + // 结果为空 不返回错误 GetTokensWithoutError(ctx context.Context, userID string, platformID int) (map[string]int, error) - //创建token + // 创建token CreateToken(ctx context.Context, userID string, platformID int) (string, error) } @@ -43,7 +43,7 @@ func NewAuthDatabase(cache cache.MsgModel, accessSecret string, accessExpire int return &authDatabase{cache: cache, accessSecret: accessSecret, accessExpire: accessExpire} } -// 结果为空 不返回错误 +// 结果为空 不返回错误. func (a *authDatabase) GetTokensWithoutError( ctx context.Context, userID string, @@ -52,7 +52,7 @@ func (a *authDatabase) GetTokensWithoutError( return a.cache.GetTokensWithoutError(ctx, userID, platformID) } -// 创建token +// 创建token. func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformID int) (string, error) { tokens, err := a.cache.GetTokensWithoutError(ctx, userID, platformID) if err != nil { diff --git a/pkg/common/db/controller/black.go b/pkg/common/db/controller/black.go index 13b375787..99cf25ebd 100644 --- a/pkg/common/db/controller/black.go +++ b/pkg/common/db/controller/black.go @@ -48,7 +48,7 @@ func NewBlackDatabase(black relation.BlackModelInterface, cache cache.BlackCache return &blackDatabase{black, cache} } -// Create 增加黑名单 +// Create 增加黑名单. func (b *blackDatabase) Create(ctx context.Context, blacks []*relation.BlackModel) (err error) { if err := b.black.Create(ctx, blacks); err != nil { return err @@ -56,7 +56,7 @@ func (b *blackDatabase) Create(ctx context.Context, blacks []*relation.BlackMode return b.deleteBlackIDsCache(ctx, blacks) } -// Delete 删除黑名单 +// Delete 删除黑名单. func (b *blackDatabase) Delete(ctx context.Context, blacks []*relation.BlackModel) (err error) { if err := b.black.Delete(ctx, blacks); err != nil { return err @@ -72,7 +72,7 @@ func (b *blackDatabase) deleteBlackIDsCache(ctx context.Context, blacks []*relat return cache.ExecDel(ctx) } -// FindOwnerBlacks 获取黑名单列表 +// FindOwnerBlacks 获取黑名单列表. func (b *blackDatabase) FindOwnerBlacks( ctx context.Context, ownerUserID string, @@ -81,7 +81,7 @@ func (b *blackDatabase) FindOwnerBlacks( return b.black.FindOwnerBlacks(ctx, ownerUserID, pageNumber, showNumber) } -// CheckIn 检查user2是否在user1的黑名单列表中(inUser1Blacks==true) 检查user1是否在user2的黑名单列表中(inUser2Blacks==true) +// CheckIn 检查user2是否在user1的黑名单列表中(inUser1Blacks==true) 检查user1是否在user2的黑名单列表中(inUser2Blacks==true). func (b *blackDatabase) CheckIn( ctx context.Context, userID1, userID2 string, diff --git a/pkg/common/db/controller/conversation.go b/pkg/common/db/controller/conversation.go index 659d250c6..3cb4d1055 100644 --- a/pkg/common/db/controller/conversation.go +++ b/pkg/common/db/controller/conversation.go @@ -27,21 +27,21 @@ import ( ) type ConversationDatabase interface { - //UpdateUserConversationFiled 更新用户该会话的属性信息 + // UpdateUserConversationFiled 更新用户该会话的属性信息 UpdateUsersConversationFiled(ctx context.Context, userIDs []string, conversationID string, args map[string]interface{}) error - //CreateConversation 创建一批新的会话 + // CreateConversation 创建一批新的会话 CreateConversation(ctx context.Context, conversations []*relationTb.ConversationModel) error - //SyncPeerUserPrivateConversation 同步对端私聊会话内部保证事务操作 + // SyncPeerUserPrivateConversation 同步对端私聊会话内部保证事务操作 SyncPeerUserPrivateConversationTx(ctx context.Context, conversation []*relationTb.ConversationModel) error - //FindConversations 根据会话ID获取某个用户的多个会话 + // FindConversations 根据会话ID获取某个用户的多个会话 FindConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error) - //FindRecvMsgNotNotifyUserIDs 获取超级大群开启免打扰的用户ID + // FindRecvMsgNotNotifyUserIDs 获取超级大群开启免打扰的用户ID FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) - //GetUserAllConversation 获取一个用户在服务器上所有的会话 + // GetUserAllConversation 获取一个用户在服务器上所有的会话 GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*relationTb.ConversationModel, error) - //SetUserConversations 设置用户多个会话属性,如果会话不存在则创建,否则更新,内部保证原子性 + // SetUserConversations 设置用户多个会话属性,如果会话不存在则创建,否则更新,内部保证原子性 SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationTb.ConversationModel) error - //SetUsersConversationFiledTx 设置多个用户会话关于某个字段的更新操作,如果会话不存在则创建,否则更新,内部保证事务操作 + // SetUsersConversationFiledTx 设置多个用户会话关于某个字段的更新操作,如果会话不存在则创建,否则更新,内部保证事务操作 SetUsersConversationFiledTx(ctx context.Context, userIDs []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) error CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string) error GetConversationIDs(ctx context.Context, userID string) ([]string, error) @@ -98,7 +98,6 @@ func (c *conversationDatabase) SetUsersConversationFiledTx(ctx context.Context, temp.OwnerUserID = v temp.CreateTime = now conversations = append(conversations, temp) - } if len(conversations) > 0 { err = conversationTx.Create(ctx, conversations) diff --git a/pkg/common/db/controller/friend.go b/pkg/common/db/controller/friend.go index 4d549efcf..6b41b87f2 100644 --- a/pkg/common/db/controller/friend.go +++ b/pkg/common/db/controller/friend.go @@ -93,7 +93,7 @@ func NewFriendDatabase( return &friendDatabase{friend: friend, friendRequest: friendRequest, cache: cache, tx: tx} } -// ok 检查user2是否在user1的好友列表中(inUser1Friends==true) 检查user1是否在user2的好友列表中(inUser2Friends==true) +// ok 检查user2是否在user1的好友列表中(inUser1Friends==true) 检查user1是否在user2的好友列表中(inUser2Friends==true). func (f *friendDatabase) CheckIn( ctx context.Context, userID1, userID2 string, @@ -109,7 +109,7 @@ func (f *friendDatabase) CheckIn( return utils.IsContain(userID2, userID1FriendIDs), utils.IsContain(userID1, userID2FriendIDs), nil } -// 增加或者更新好友申请 如果之前有记录则更新,没有记录则新增 +// 增加或者更新好友申请 如果之前有记录则更新,没有记录则新增. func (f *friendDatabase) AddFriendRequest( ctx context.Context, fromUserID, toUserID string, @@ -118,11 +118,11 @@ func (f *friendDatabase) AddFriendRequest( ) (err error) { return f.tx.Transaction(func(tx any) error { _, err := f.friendRequest.NewTx(tx).Take(ctx, fromUserID, toUserID) - //有db错误 + // 有db错误 if err != nil && errs.Unwrap(err) != gorm.ErrRecordNotFound { return err } - //无错误 则更新 + // 无错误 则更新 if err == nil { m := make(map[string]interface{}, 1) m["handle_result"] = 0 @@ -135,7 +135,7 @@ func (f *friendDatabase) AddFriendRequest( } return nil } - //gorm.ErrRecordNotFound 错误,则新增 + // gorm.ErrRecordNotFound 错误,则新增 if err := f.friendRequest.NewTx(tx).Create(ctx, []*relation.FriendRequestModel{{FromUserID: fromUserID, ToUserID: toUserID, ReqMsg: reqMsg, Ex: ex, CreateTime: time.Now(), HandleTime: time.Unix(0, 0)}}); err != nil { return err } @@ -143,7 +143,7 @@ func (f *friendDatabase) AddFriendRequest( }) } -// (1)先判断是否在好友表 (在不在都不返回错误) (2)对于不在好友列表的 插入即可 +// (1)先判断是否在好友表 (在不在都不返回错误) (2)对于不在好友列表的 插入即可. func (f *friendDatabase) BecomeFriends( ctx context.Context, ownerUserID string, @@ -152,7 +152,7 @@ func (f *friendDatabase) BecomeFriends( ) (err error) { cache := f.cache.NewCache() if err := f.tx.Transaction(func(tx any) error { - //先find 找出重复的 去掉重复的 + // 先find 找出重复的 去掉重复的 fs1, err := f.friend.NewTx(tx).FindFriends(ctx, ownerUserID, friendUserIDs) if err != nil { return err @@ -194,7 +194,7 @@ func (f *friendDatabase) BecomeFriends( return cache.ExecDel(ctx) } -// 拒绝好友申请 (1)检查是否有申请记录且为未处理状态 (没有记录返回错误) (2)修改申请记录 已拒绝 +// 拒绝好友申请 (1)检查是否有申请记录且为未处理状态 (没有记录返回错误) (2)修改申请记录 已拒绝. func (f *friendDatabase) RefuseFriendRequest( ctx context.Context, friendRequest *relation.FriendRequestModel, @@ -215,7 +215,7 @@ func (f *friendDatabase) RefuseFriendRequest( return nil } -// AgreeFriendRequest 同意好友申请 (1)检查是否有申请记录且为未处理状态 (没有记录返回错误) (2)检查是否好友(不返回错误) (3) 建立双向好友关系(存在的忽略) +// AgreeFriendRequest 同意好友申请 (1)检查是否有申请记录且为未处理状态 (没有记录返回错误) (2)检查是否好友(不返回错误) (3) 建立双向好友关系(存在的忽略). func (f *friendDatabase) AgreeFriendRequest( ctx context.Context, friendRequest *relation.FriendRequestModel, @@ -289,7 +289,7 @@ func (f *friendDatabase) AgreeFriendRequest( }) } -// 删除好友 外部判断是否好友关系 +// 删除好友 外部判断是否好友关系. func (f *friendDatabase) Delete(ctx context.Context, ownerUserID string, friendUserIDs []string) (err error) { if err := f.friend.Delete(ctx, ownerUserID, friendUserIDs); err != nil { return err @@ -297,7 +297,7 @@ func (f *friendDatabase) Delete(ctx context.Context, ownerUserID string, friendU return f.cache.DelFriendIDs(append(friendUserIDs, ownerUserID)...).ExecDel(ctx) } -// 更新好友备注 零值也支持 +// 更新好友备注 零值也支持. func (f *friendDatabase) UpdateRemark(ctx context.Context, ownerUserID, friendUserID, remark string) (err error) { if err := f.friend.UpdateRemark(ctx, ownerUserID, friendUserID, remark); err != nil { return err @@ -305,7 +305,7 @@ func (f *friendDatabase) UpdateRemark(ctx context.Context, ownerUserID, friendUs return f.cache.DelFriend(ownerUserID, friendUserID).ExecDel(ctx) } -// 获取ownerUserID的好友列表 无结果不返回错误 +// 获取ownerUserID的好友列表 无结果不返回错误. func (f *friendDatabase) PageOwnerFriends( ctx context.Context, ownerUserID string, @@ -314,7 +314,7 @@ func (f *friendDatabase) PageOwnerFriends( return f.friend.FindOwnerFriends(ctx, ownerUserID, pageNumber, showNumber) } -// friendUserID在哪些人的好友列表中 +// friendUserID在哪些人的好友列表中. func (f *friendDatabase) PageInWhoseFriends( ctx context.Context, friendUserID string, @@ -323,7 +323,7 @@ func (f *friendDatabase) PageInWhoseFriends( return f.friend.FindInWhoseFriends(ctx, friendUserID, pageNumber, showNumber) } -// 获取我发出去的好友申请 无结果不返回错误 +// 获取我发出去的好友申请 无结果不返回错误. func (f *friendDatabase) PageFriendRequestFromMe( ctx context.Context, userID string, @@ -332,7 +332,7 @@ func (f *friendDatabase) PageFriendRequestFromMe( return f.friendRequest.FindFromUserID(ctx, userID, pageNumber, showNumber) } -// 获取我收到的的好友申请 无结果不返回错误 +// 获取我收到的的好友申请 无结果不返回错误. func (f *friendDatabase) PageFriendRequestToMe( ctx context.Context, userID string, @@ -341,7 +341,7 @@ func (f *friendDatabase) PageFriendRequestToMe( return f.friendRequest.FindToUserID(ctx, userID, pageNumber, showNumber) } -// 获取某人指定好友的信息 如果有好友不存在,也返回错误 +// 获取某人指定好友的信息 如果有好友不存在,也返回错误. func (f *friendDatabase) FindFriendsWithError( ctx context.Context, ownerUserID string, diff --git a/pkg/common/db/controller/group.go b/pkg/common/db/controller/group.go index 9010e3350..dff427580 100644 --- a/pkg/common/db/controller/group.go +++ b/pkg/common/db/controller/group.go @@ -203,7 +203,7 @@ func (g *groupDatabase) CreateGroup( groups []*relationTb.GroupModel, groupMembers []*relationTb.GroupMemberModel, ) error { - var cache = g.cache.NewCache() + cache := g.cache.NewCache() if err := g.tx.Transaction(func(tx any) error { if len(groups) > 0 { if err := g.groupDB.NewTx(tx).Create(ctx, groups); err != nil { @@ -473,7 +473,7 @@ func (g *groupDatabase) UpdateGroupMember( } func (g *groupDatabase) UpdateGroupMembers(ctx context.Context, data []*relationTb.BatchUpdateGroupMember) error { - var cache = g.cache.NewCache() + cache := g.cache.NewCache() if err := g.tx.Transaction(func(tx any) error { for _, item := range data { if err := g.groupMemberDB.NewTx(tx).Update(ctx, item.GroupID, item.UserID, item.Map); err != nil { diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index 2e5699db3..9a86f72bd 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -15,10 +15,13 @@ package controller import ( + "context" + "errors" + "time" + relation2 "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/relation" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation" "gorm.io/gorm" - "time" "github.com/redis/go-redis/v9" @@ -32,9 +35,6 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome" "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" - "context" - "errors" - pbMsg "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" @@ -172,7 +172,7 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI return nil } num := db.msg.GetSingleGocMsgNum() - //num = 100 + // num = 100 for i, field := range fields { // 检查类型 var ok bool switch key { @@ -390,7 +390,7 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) { for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, seqs) { - //log.ZDebug(ctx, "getMsgBySeqs", "docID", docID, "seqs", seqs) + // log.ZDebug(ctx, "getMsgBySeqs", "docID", docID, "seqs", seqs) msgs, err := db.findMsgInfoBySeq(ctx, userID, docID, seqs) if err != nil { return nil, err @@ -637,7 +637,6 @@ func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string log.ZDebug(ctx, "deleteMsgRecursion finished", "conversationID", conversationID, "userID", userID, "index", index) break } - } } } @@ -652,7 +651,7 @@ func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string return seqs, nil } -// this is struct for recursion +// this is struct for recursion. type delMsgRecursionStruct struct { minSeq int64 delDocIDs []string @@ -665,7 +664,7 @@ func (d *delMsgRecursionStruct) getSetMinSeq() int64 { // index 0....19(del) 20...69 // seq 70 // set minSeq 21 -// recursion 删除list并且返回设置的最小seq +// recursion 删除list并且返回设置的最小seq. func (db *commonMsgDatabase) deleteMsgRecursion(ctx context.Context, conversationID string, index int64, delStruct *delMsgRecursionStruct, remainTime int64) (int64, error) { // find from oldest list msgDocModel, err := db.msgDocDatabase.GetMsgDocModelByIndex(ctx, conversationID, index, 1) @@ -791,15 +790,19 @@ func (db *commonMsgDatabase) CleanUpUserConversationsMsgs(ctx context.Context, u func (db *commonMsgDatabase) SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error { return db.cache.SetMaxSeq(ctx, conversationID, maxSeq) } + func (db *commonMsgDatabase) GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) { return db.cache.GetMaxSeqs(ctx, conversationIDs) } + func (db *commonMsgDatabase) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) { return db.cache.GetMaxSeq(ctx, conversationID) } + func (db *commonMsgDatabase) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error { return db.cache.SetMinSeq(ctx, conversationID, minSeq) } + func (db *commonMsgDatabase) SetMinSeqs(ctx context.Context, seqs map[string]int64) error { return db.cache.SetMinSeqs(ctx, seqs) } @@ -807,18 +810,23 @@ func (db *commonMsgDatabase) SetMinSeqs(ctx context.Context, seqs map[string]int func (db *commonMsgDatabase) GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) { return db.cache.GetMinSeqs(ctx, conversationIDs) } + func (db *commonMsgDatabase) GetMinSeq(ctx context.Context, conversationID string) (int64, error) { return db.cache.GetMinSeq(ctx, conversationID) } + func (db *commonMsgDatabase) GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) { return db.cache.GetConversationUserMinSeq(ctx, conversationID, userID) } + func (db *commonMsgDatabase) GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (map[string]int64, error) { return db.cache.GetConversationUserMinSeqs(ctx, conversationID, userIDs) } + func (db *commonMsgDatabase) SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error { return db.cache.SetConversationUserMinSeq(ctx, conversationID, userID, minSeq) } + func (db *commonMsgDatabase) SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) { return db.cache.SetConversationUserMinSeqs(ctx, conversationID, seqs) } @@ -834,6 +842,7 @@ func (db *commonMsgDatabase) UserSetHasReadSeqs(ctx context.Context, userID stri func (db *commonMsgDatabase) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error { return db.cache.SetHasReadSeq(ctx, userID, conversationID, hasReadSeq) } + func (db *commonMsgDatabase) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) { return db.cache.GetHasReadSeqs(ctx, userID, conversationIDs) } diff --git a/pkg/common/db/controller/s3.go b/pkg/common/db/controller/s3.go index ecc438ef2..cdd1ac4c2 100644 --- a/pkg/common/db/controller/s3.go +++ b/pkg/common/db/controller/s3.go @@ -16,11 +16,12 @@ package controller import ( "context" + "path/filepath" + "time" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/s3" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/s3/cont" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation" - "path/filepath" - "time" ) type S3Database interface { diff --git a/pkg/common/db/controller/user.go b/pkg/common/db/controller/user.go index fe14f0a1c..e15033a62 100644 --- a/pkg/common/db/controller/user.go +++ b/pkg/common/db/controller/user.go @@ -26,19 +26,19 @@ import ( ) type UserDatabase interface { - //获取指定用户的信息 如有userID未找到 也返回错误 + // 获取指定用户的信息 如有userID未找到 也返回错误 FindWithError(ctx context.Context, userIDs []string) (users []*relation.UserModel, err error) - //获取指定用户的信息 如有userID未找到 不返回错误 + // 获取指定用户的信息 如有userID未找到 不返回错误 Find(ctx context.Context, userIDs []string) (users []*relation.UserModel, err error) - //插入多条 外部保证userID 不重复 且在db中不存在 + // 插入多条 外部保证userID 不重复 且在db中不存在 Create(ctx context.Context, users []*relation.UserModel) (err error) - //更新(非零值) 外部保证userID存在 + // 更新(非零值) 外部保证userID存在 Update(ctx context.Context, user *relation.UserModel) (err error) - //更新(零值) 外部保证userID存在 + // 更新(零值) 外部保证userID存在 UpdateByMap(ctx context.Context, userID string, args map[string]interface{}) (err error) - //如果没找到,不返回错误 + // 如果没找到,不返回错误 Page(ctx context.Context, pageNumber, showNumber int32) (users []*relation.UserModel, count int64, err error) - //只要有一个存在就为true + // 只要有一个存在就为true IsExist(ctx context.Context, userIDs []string) (exist bool, err error) //获取所有用户ID GetAllUserID(ctx context.Context, pageNumber, showNumber int32) ([]string, error) @@ -75,7 +75,7 @@ func (u *userDatabase) InitOnce(ctx context.Context, users []*relation.UserModel return nil } -// 获取指定用户的信息 如有userID未找到 也返回错误 +// 获取指定用户的信息 如有userID未找到 也返回错误. func (u *userDatabase) FindWithError(ctx context.Context, userIDs []string) (users []*relation.UserModel, err error) { users, err = u.cache.GetUsersInfo(ctx, userIDs) if err != nil { @@ -87,13 +87,13 @@ func (u *userDatabase) FindWithError(ctx context.Context, userIDs []string) (use return } -// 获取指定用户的信息 如有userID未找到 不返回错误 +// 获取指定用户的信息 如有userID未找到 不返回错误. func (u *userDatabase) Find(ctx context.Context, userIDs []string) (users []*relation.UserModel, err error) { users, err = u.cache.GetUsersInfo(ctx, userIDs) return } -// 插入多条 外部保证userID 不重复 且在db中不存在 +// 插入多条 外部保证userID 不重复 且在db中不存在. func (u *userDatabase) Create(ctx context.Context, users []*relation.UserModel) (err error) { if err := u.tx.Transaction(func(tx any) error { err = u.userDB.Create(ctx, users) @@ -111,7 +111,7 @@ func (u *userDatabase) Create(ctx context.Context, users []*relation.UserModel) return u.cache.DelUsersInfo(userIDs...).ExecDel(ctx) } -// 更新(非零值) 外部保证userID存在 +// 更新(非零值) 外部保证userID存在. func (u *userDatabase) Update(ctx context.Context, user *relation.UserModel) (err error) { if err := u.userDB.Update(ctx, user); err != nil { return err @@ -119,7 +119,7 @@ func (u *userDatabase) Update(ctx context.Context, user *relation.UserModel) (er return u.cache.DelUsersInfo(user.UserID).ExecDel(ctx) } -// 更新(零值) 外部保证userID存在 +// 更新(零值) 外部保证userID存在. func (u *userDatabase) UpdateByMap(ctx context.Context, userID string, args map[string]interface{}) (err error) { if err := u.userDB.UpdateByMap(ctx, userID, args); err != nil { return err @@ -127,7 +127,7 @@ func (u *userDatabase) UpdateByMap(ctx context.Context, userID string, args map[ return u.cache.DelUsersInfo(userID).ExecDel(ctx) } -// 获取,如果没找到,不返回错误 +// 获取,如果没找到,不返回错误. func (u *userDatabase) Page( ctx context.Context, pageNumber, showNumber int32, @@ -135,7 +135,7 @@ func (u *userDatabase) Page( return u.userDB.Page(ctx, pageNumber, showNumber) } -// userIDs是否存在 只要有一个存在就为true +// userIDs是否存在 只要有一个存在就为true. func (u *userDatabase) IsExist(ctx context.Context, userIDs []string) (exist bool, err error) { users, err := u.userDB.Find(ctx, userIDs) if err != nil { diff --git a/pkg/common/db/localcache/conversation.go b/pkg/common/db/localcache/conversation.go index de9f0401d..36f82b93e 100644 --- a/pkg/common/db/localcache/conversation.go +++ b/pkg/common/db/localcache/conversation.go @@ -76,5 +76,4 @@ func (g *ConversationLocalCache) GetConversationIDs(ctx context.Context, userID return conversationIDsResp.ConversationIDs, nil } return hash.ids, nil - } diff --git a/pkg/common/db/relation/friend_model.go b/pkg/common/db/relation/friend_model.go index 5a54f9b7c..10daf8e80 100644 --- a/pkg/common/db/relation/friend_model.go +++ b/pkg/common/db/relation/friend_model.go @@ -35,12 +35,12 @@ func (f *FriendGorm) NewTx(tx any) relation.FriendModelInterface { return &FriendGorm{NewMetaDB(tx.(*gorm.DB), &relation.FriendModel{})} } -// 插入多条记录 +// 插入多条记录. func (f *FriendGorm) Create(ctx context.Context, friends []*relation.FriendModel) (err error) { return utils.Wrap(f.db(ctx).Create(&friends).Error, "") } -// 删除ownerUserID指定的好友 +// 删除ownerUserID指定的好友. func (f *FriendGorm) Delete(ctx context.Context, ownerUserID string, friendUserIDs []string) (err error) { err = utils.Wrap( f.db(ctx). @@ -52,7 +52,7 @@ func (f *FriendGorm) Delete(ctx context.Context, ownerUserID string, friendUserI return err } -// 更新ownerUserID单个好友信息 更新零值 +// 更新ownerUserID单个好友信息 更新零值. func (f *FriendGorm) UpdateByMap( ctx context.Context, ownerUserID string, @@ -65,12 +65,12 @@ func (f *FriendGorm) UpdateByMap( ) } -// 更新好友信息的非零值 +// 更新好友信息的非零值. func (f *FriendGorm) Update(ctx context.Context, friends []*relation.FriendModel) (err error) { return utils.Wrap(f.db(ctx).Updates(&friends).Error, "") } -// 更新好友备注(也支持零值 ) +// 更新好友备注(也支持零值 ). func (f *FriendGorm) UpdateRemark(ctx context.Context, ownerUserID, friendUserID, remark string) (err error) { if remark != "" { return utils.Wrap( @@ -86,7 +86,7 @@ func (f *FriendGorm) UpdateRemark(ctx context.Context, ownerUserID, friendUserID return utils.Wrap(f.db(ctx).Where("owner_user_id = ?", ownerUserID).Updates(m).Error, "") } -// 获取单个好友信息,如没找到 返回错误 +// 获取单个好友信息,如没找到 返回错误. func (f *FriendGorm) Take( ctx context.Context, ownerUserID, friendUserID string, @@ -98,7 +98,7 @@ func (f *FriendGorm) Take( ) } -// 查找好友关系,如果是双向关系,则都返回 +// 查找好友关系,如果是双向关系,则都返回. func (f *FriendGorm) FindUserState( ctx context.Context, userID1, userID2 string, @@ -112,7 +112,7 @@ func (f *FriendGorm) FindUserState( ) } -// 获取 owner指定的好友列表 如果有friendUserIDs不存在,也不返回错误 +// 获取 owner指定的好友列表 如果有friendUserIDs不存在,也不返回错误. func (f *FriendGorm) FindFriends( ctx context.Context, ownerUserID string, @@ -124,7 +124,7 @@ func (f *FriendGorm) FindFriends( ) } -// 获取哪些人添加了friendUserID 如果有ownerUserIDs不存在,也不返回错误 +// 获取哪些人添加了friendUserID 如果有ownerUserIDs不存在,也不返回错误. func (f *FriendGorm) FindReversalFriends( ctx context.Context, friendUserID string, @@ -136,7 +136,7 @@ func (f *FriendGorm) FindReversalFriends( ) } -// 获取ownerUserID好友列表 支持翻页 +// 获取ownerUserID好友列表 支持翻页. func (f *FriendGorm) FindOwnerFriends( ctx context.Context, ownerUserID string, @@ -158,7 +158,7 @@ func (f *FriendGorm) FindOwnerFriends( return } -// 获取哪些人添加了friendUserID 支持翻页 +// 获取哪些人添加了friendUserID 支持翻页. func (f *FriendGorm) FindInWhoseFriends( ctx context.Context, friendUserID string, diff --git a/pkg/common/db/relation/friend_request_model.go b/pkg/common/db/relation/friend_request_model.go index 06fd08ff6..fe08ac06c 100644 --- a/pkg/common/db/relation/friend_request_model.go +++ b/pkg/common/db/relation/friend_request_model.go @@ -35,12 +35,12 @@ func (f *FriendRequestGorm) NewTx(tx any) relation.FriendRequestModelInterface { return &FriendRequestGorm{NewMetaDB(tx.(*gorm.DB), &relation.FriendRequestModel{})} } -// 插入多条记录 +// 插入多条记录. func (f *FriendRequestGorm) Create(ctx context.Context, friendRequests []*relation.FriendRequestModel) (err error) { return utils.Wrap(f.db(ctx).Create(&friendRequests).Error, "") } -// 删除记录 +// 删除记录. func (f *FriendRequestGorm) Delete(ctx context.Context, fromUserID, toUserID string) (err error) { return utils.Wrap( f.db(ctx). @@ -51,7 +51,7 @@ func (f *FriendRequestGorm) Delete(ctx context.Context, fromUserID, toUserID str ) } -// 更新零值 +// 更新零值. func (f *FriendRequestGorm) UpdateByMap( ctx context.Context, fromUserID string, @@ -68,7 +68,7 @@ func (f *FriendRequestGorm) UpdateByMap( ) } -// 更新记录 (非零值) +// 更新记录 (非零值). func (f *FriendRequestGorm) Update(ctx context.Context, friendRequest *relation.FriendRequestModel) (err error) { return utils.Wrap( f.db(ctx). @@ -79,7 +79,7 @@ func (f *FriendRequestGorm) Update(ctx context.Context, friendRequest *relation. ) } -// 获取来指定用户的好友申请 未找到 不返回错误 +// 获取来指定用户的好友申请 未找到 不返回错误. func (f *FriendRequestGorm) Find( ctx context.Context, fromUserID, toUserID string, @@ -104,7 +104,7 @@ func (f *FriendRequestGorm) Take( return friendRequest, err } -// 获取toUserID收到的好友申请列表 +// 获取toUserID收到的好友申请列表. func (f *FriendRequestGorm) FindToUserID( ctx context.Context, toUserID string, @@ -126,7 +126,7 @@ func (f *FriendRequestGorm) FindToUserID( return } -// 获取fromUserID发出去的好友申请列表 +// 获取fromUserID发出去的好友申请列表. func (f *FriendRequestGorm) FindFromUserID( ctx context.Context, fromUserID string, diff --git a/pkg/common/db/relation/group_model.go b/pkg/common/db/relation/group_model.go index 767f7db84..1bebe8b49 100644 --- a/pkg/common/db/relation/group_model.go +++ b/pkg/common/db/relation/group_model.go @@ -16,9 +16,11 @@ package relation import ( "context" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "time" + + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" + "gorm.io/gorm" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/ormutil" @@ -67,6 +69,7 @@ func (g *GroupGorm) Search(ctx context.Context, keyword string, pageNumber, show db = db.WithContext(ctx).Where("status!=?", constant.GroupStatusDismissed) return ormutil.GormSearch[relation.GroupModel](db, []string{"name"}, keyword, pageNumber, showNumber) } + func (g *GroupGorm) GetGroupIDsByGroupType(ctx context.Context, groupType int) (groupIDs []string, err error) { return groupIDs, utils.Wrap(g.DB.Model(&relation.GroupModel{}).Where("group_type = ? ", groupType).Pluck("group_id", &groupIDs).Error, "") } diff --git a/pkg/common/db/relation/mysql_init.go b/pkg/common/db/relation/mysql_init.go index 4a6758cba..267acce74 100644 --- a/pkg/common/db/relation/mysql_init.go +++ b/pkg/common/db/relation/mysql_init.go @@ -31,10 +31,10 @@ import ( ) const ( - maxRetry = 100 //number of retries + maxRetry = 100 // number of retries ) -// newMysqlGormDB Initialize the database connection +// newMysqlGormDB Initialize the database connection. func newMysqlGormDB() (*gorm.DB, error) { dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=true&loc=Local", config.Config.Mysql.Username, config.Config.Mysql.Password, config.Config.Mysql.Address[0], "mysql") @@ -84,7 +84,7 @@ func newMysqlGormDB() (*gorm.DB, error) { return db, nil } -// connectToDatabase Connection retry for mysql +// connectToDatabase Connection retry for mysql. func connectToDatabase(dsn string, maxRetry int) (*gorm.DB, error) { var db *gorm.DB var err error @@ -101,7 +101,7 @@ func connectToDatabase(dsn string, maxRetry int) (*gorm.DB, error) { return nil, err } -// NewGormDB gorm mysql +// NewGormDB gorm mysql. func NewGormDB() (*gorm.DB, error) { specialerror.AddReplace(gorm.ErrRecordNotFound, errs.ErrRecordNotFound) specialerror.AddErrHandler(replaceDuplicateKey) diff --git a/pkg/common/db/relation/user_model.go b/pkg/common/db/relation/user_model.go index 3da877c19..6e095f55f 100644 --- a/pkg/common/db/relation/user_model.go +++ b/pkg/common/db/relation/user_model.go @@ -34,35 +34,35 @@ func NewUserGorm(db *gorm.DB) relation.UserModelInterface { return &UserGorm{NewMetaDB(db, &relation.UserModel{})} } -// 插入多条 +// 插入多条. func (u *UserGorm) Create(ctx context.Context, users []*relation.UserModel) (err error) { return utils.Wrap(u.db(ctx).Create(&users).Error, "") } -// 更新用户信息 零值 +// 更新用户信息 零值. func (u *UserGorm) UpdateByMap(ctx context.Context, userID string, args map[string]interface{}) (err error) { return utils.Wrap(u.db(ctx).Model(&relation.UserModel{}).Where("user_id = ?", userID).Updates(args).Error, "") } -// 更新多个用户信息 非零值 +// 更新多个用户信息 非零值. func (u *UserGorm) Update(ctx context.Context, user *relation.UserModel) (err error) { return utils.Wrap(u.db(ctx).Model(user).Updates(user).Error, "") } -// 获取指定用户信息 不存在,也不返回错误 +// 获取指定用户信息 不存在,也不返回错误. func (u *UserGorm) Find(ctx context.Context, userIDs []string) (users []*relation.UserModel, err error) { err = utils.Wrap(u.db(ctx).Where("user_id in (?)", userIDs).Find(&users).Error, "") return users, err } -// 获取某个用户信息 不存在,则返回错误 +// 获取某个用户信息 不存在,则返回错误. func (u *UserGorm) Take(ctx context.Context, userID string) (user *relation.UserModel, err error) { user = &relation.UserModel{} err = utils.Wrap(u.db(ctx).Where("user_id = ?", userID).Take(&user).Error, "") return user, err } -// 获取用户信息 不存在,不返回错误 +// 获取用户信息 不存在,不返回错误. func (u *UserGorm) Page( ctx context.Context, pageNumber, showNumber int32, diff --git a/pkg/common/db/s3/cont/controller.go b/pkg/common/db/s3/cont/controller.go index 0ab700e21..b932f65b5 100644 --- a/pkg/common/db/s3/cont/controller.go +++ b/pkg/common/db/s3/cont/controller.go @@ -20,13 +20,14 @@ import ( "encoding/hex" "errors" "fmt" + "path" + "strings" + "time" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/s3" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" "github.com/google/uuid" - "path" - "strings" - "time" ) func New(impl s3.Interface) *Controller { diff --git a/pkg/common/db/s3/cont/error.go b/pkg/common/db/s3/cont/error.go index af09779ee..be77bcb9e 100644 --- a/pkg/common/db/s3/cont/error.go +++ b/pkg/common/db/s3/cont/error.go @@ -16,6 +16,7 @@ package cont import ( "fmt" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/s3" ) diff --git a/pkg/common/db/s3/cos/cos.go b/pkg/common/db/s3/cos/cos.go index c3a2b104c..4064ba181 100644 --- a/pkg/common/db/s3/cos/cos.go +++ b/pkg/common/db/s3/cos/cos.go @@ -18,14 +18,15 @@ import ( "context" "errors" "fmt" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/s3" - "github.com/tencentyun/cos-go-sdk-v5" "net/http" "net/url" "strconv" "strings" "time" + + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/s3" + "github.com/tencentyun/cos-go-sdk-v5" ) const ( diff --git a/pkg/common/db/s3/minio/minio.go b/pkg/common/db/s3/minio/minio.go index 48d192420..2f80c67bb 100644 --- a/pkg/common/db/s3/minio/minio.go +++ b/pkg/common/db/s3/minio/minio.go @@ -18,16 +18,12 @@ import ( "context" "errors" "fmt" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/s3" - "github.com/minio/minio-go/v7" - "github.com/minio/minio-go/v7/pkg/credentials" - "github.com/minio/minio-go/v7/pkg/signer" "net/http" "net/url" "strconv" "strings" "time" + ) const ( diff --git a/pkg/common/db/s3/oss/oss.go b/pkg/common/db/s3/oss/oss.go index 1b5745fe1..abb3f10d7 100644 --- a/pkg/common/db/s3/oss/oss.go +++ b/pkg/common/db/s3/oss/oss.go @@ -18,14 +18,15 @@ import ( "context" "errors" "fmt" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/s3" - "github.com/aliyun/aliyun-oss-go-sdk/oss" "net/http" "net/url" "strconv" "strings" "time" + + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/s3" + "github.com/aliyun/aliyun-oss-go-sdk/oss" ) const ( diff --git a/pkg/common/db/s3/oss/sign.go b/pkg/common/db/s3/oss/sign.go index a272a64c5..9811ac476 100644 --- a/pkg/common/db/s3/oss/sign.go +++ b/pkg/common/db/s3/oss/sign.go @@ -19,12 +19,13 @@ import ( "crypto/sha1" "crypto/sha256" "encoding/base64" - "github.com/aliyun/aliyun-oss-go-sdk/oss" "hash" "io" "net/http" "sort" "strings" + + "github.com/aliyun/aliyun-oss-go-sdk/oss" ) func (o *OSS) getAdditionalHeaderKeys(req *http.Request) ([]string, map[string]string) { diff --git a/pkg/common/db/table/unrelation/msg.go b/pkg/common/db/table/unrelation/msg.go index 3a8725f36..a4eab0142 100644 --- a/pkg/common/db/table/unrelation/msg.go +++ b/pkg/common/db/table/unrelation/msg.go @@ -16,10 +16,13 @@ package unrelation import ( "context" - "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg" "strconv" "time" + "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg" + + "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg" + "go.mongodb.org/mongo-driver/mongo" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" diff --git a/pkg/common/db/unrelation/mongo.go b/pkg/common/db/unrelation/mongo.go index 9ce9929c5..9c9f7db3b 100644 --- a/pkg/common/db/unrelation/mongo.go +++ b/pkg/common/db/unrelation/mongo.go @@ -32,14 +32,14 @@ import ( ) const ( - maxRetry = 10 //number of retries + maxRetry = 10 // number of retries ) type Mongo struct { db *mongo.Client } -// NewMongo Initialize MongoDB connection +// NewMongo Initialize MongoDB connection. func NewMongo() (*Mongo, error) { specialerror.AddReplace(mongo.ErrNoDocuments, errs.ErrRecordNotFound) uri := "mongodb://sample.host:27017/?maxPoolSize=20&w=majority" diff --git a/pkg/common/db/unrelation/msg.go b/pkg/common/db/unrelation/msg.go index 5d26d45b2..fb889aae4 100644 --- a/pkg/common/db/unrelation/msg.go +++ b/pkg/common/db/unrelation/msg.go @@ -19,9 +19,11 @@ import ( "encoding/json" "errors" "fmt" - "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg" "time" + + "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "go.mongodb.org/mongo-driver/bson" @@ -80,7 +82,7 @@ func (m *MsgMongoDriver) UpdateMsg( return res, nil } -// PushUnique value must slice +// PushUnique value must slice. func (m *MsgMongoDriver) PushUnique( ctx context.Context, docID string, @@ -555,7 +557,7 @@ func (m *MsgMongoDriver) MarkSingleChatMsgsAsRead( // } // } // -// ]) +// ]). func (m *MsgMongoDriver) RangeUserSendCount( ctx context.Context, start time.Time, @@ -1081,21 +1083,21 @@ func (m *MsgMongoDriver) SearchMessage(ctx context.Context, req *msg.SearchMessa func (m *MsgMongoDriver) searchMessage(ctx context.Context, req *msg.SearchMessageReq) ([]*table.MsgInfoModel, error) { var pipe mongo.Pipeline - conditon := bson.A{} + condition := bson.A{} if req.SendTime != "" { - conditon = append(conditon, bson.M{"$eq": bson.A{bson.M{"$dateToString": bson.M{"format": "%Y-%m-%d", "date": bson.M{"$toDate": "$$item.msg.send_time"}}}, req.SendTime}}) + condition = append(condition, bson.M{"$eq": bson.A{bson.M{"$dateToString": bson.M{"format": "%Y-%m-%d", "date": bson.M{"$toDate": "$$item.msg.send_time"}}}, req.SendTime}}) } if req.MsgType != 0 { - conditon = append(conditon, bson.M{"$eq": bson.A{"$$item.msg.content_type", req.MsgType}}) + condition = append(condition, bson.M{"$eq": bson.A{"$$item.msg.content_type", req.MsgType}}) } if req.SessionType != 0 { - conditon = append(conditon, bson.M{"$eq": bson.A{"$$item.msg.session_type", req.SessionType}}) + condition = append(condition, bson.M{"$eq": bson.A{"$$item.msg.session_type", req.SessionType}}) } if req.RecvID != "" { - conditon = append(conditon, bson.M{"$regexFind": bson.M{"input": "$$item.msg.recv_id", "regex": req.RecvID}}) + condition = append(condition, bson.M{"$regexFind": bson.M{"input": "$$item.msg.recv_id", "regex": req.RecvID}}) } if req.SendID != "" { - conditon = append(conditon, bson.M{"$regexFind": bson.M{"input": "$$item.msg.send_id", "regex": req.SendID}}) + condition = append(condition, bson.M{"$regexFind": bson.M{"input": "$$item.msg.send_id", "regex": req.SendID}}) } or := bson.A{ @@ -1131,15 +1133,20 @@ func (m *MsgMongoDriver) searchMessage(ctx context.Context, req *msg.SearchMessa }, { {"$project", bson.D{ - {"msgs", bson.D{ - {"$filter", bson.D{ - {"input", "$msgs"}, - {"as", "item"}, - {"cond", bson.D{ - {"$and", conditon}, + { + "msgs", bson.D{ + { + "$filter", bson.D{ + {"input", "$msgs"}, + {"as", "item"}, + { + "cond", bson.D{ + {"$and", conditon}, + }, + }, + }, }, - }}, - }}, + }, }, {"doc_id", 1}, }}, @@ -1159,7 +1166,7 @@ func (m *MsgMongoDriver) searchMessage(ctx context.Context, req *msg.SearchMessa return nil, errs.Wrap(mongo.ErrNoDocuments) } msgs := make([]*table.MsgInfoModel, 0) - for index, _ := range msgsDocs { + for index := range msgsDocs { for i := range msgsDocs[index].Msg { msg := msgsDocs[index].Msg[i] if msg == nil || msg.Msg == nil { diff --git a/pkg/common/kafka/consumer_group.go b/pkg/common/kafka/consumer_group.go index 0f801e8b2..60e69d4a6 100644 --- a/pkg/common/kafka/consumer_group.go +++ b/pkg/common/kafka/consumer_group.go @@ -52,7 +52,6 @@ func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []str func (mc *MConsumerGroup) GetContextFromMsg(cMsg *sarama.ConsumerMessage) context.Context { return GetContextWithMQHeader(cMsg.Headers) - } func (mc *MConsumerGroup) RegisterHandleAndConsumer(handler sarama.ConsumerGroupHandler) { diff --git a/pkg/common/kafka/producer.go b/pkg/common/kafka/producer.go index a749c76f8..d52d76817 100644 --- a/pkg/common/kafka/producer.go +++ b/pkg/common/kafka/producer.go @@ -17,12 +17,13 @@ package kafka import ( "context" "errors" + "time" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" log "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" - "time" "github.com/Shopify/sarama" "google.golang.org/protobuf/proto" @@ -31,7 +32,7 @@ import ( ) const ( - maxRetry = 10 //number of retries + maxRetry = 10 // number of retries ) var errEmptyMsg = errors.New("binary msg is empty") @@ -43,14 +44,14 @@ type Producer struct { producer sarama.SyncProducer } -// NewKafkaProducer Initialize kafka producer +// NewKafkaProducer Initialize kafka producer. func NewKafkaProducer(addr []string, topic string) *Producer { p := Producer{} - p.config = sarama.NewConfig() //Instantiate a sarama Config - p.config.Producer.Return.Successes = true //Whether to enable the successes channel to be notified after the message is sent successfully + p.config = sarama.NewConfig() // Instantiate a sarama Config + p.config.Producer.Return.Successes = true // Whether to enable the successes channel to be notified after the message is sent successfully p.config.Producer.Return.Errors = true - p.config.Producer.RequiredAcks = sarama.WaitForAll //Set producer Message Reply level 0 1 all - p.config.Producer.Partitioner = sarama.NewHashPartitioner //Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message. If there is no key, the partition will be selected randomly + p.config.Producer.RequiredAcks = sarama.WaitForAll // Set producer Message Reply level 0 1 all + p.config.Producer.Partitioner = sarama.NewHashPartitioner // Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message. If there is no key, the partition will be selected randomly if config.Config.Kafka.Username != "" && config.Config.Kafka.Password != "" { p.config.Net.SASL.Enable = true p.config.Net.SASL.User = config.Config.Kafka.Username @@ -61,7 +62,7 @@ func NewKafkaProducer(addr []string, topic string) *Producer { var producer sarama.SyncProducer var err error for i := 0; i <= maxRetry; i++ { - producer, err = sarama.NewSyncProducer(p.addr, p.config) //Initialize the client + producer, err = sarama.NewSyncProducer(p.addr, p.config) // Initialize the client if err == nil { p.producer = producer return &p @@ -92,7 +93,8 @@ func GetMQHeaderWithContext(ctx context.Context) ([]sarama.RecordHeader, error) {Key: []byte(constant.OperationID), Value: []byte(operationID)}, {Key: []byte(constant.OpUserID), Value: []byte(opUserID)}, {Key: []byte(constant.OpUserPlatform), Value: []byte(platform)}, - {Key: []byte(constant.ConnID), Value: []byte(connID)}}, err + {Key: []byte(constant.ConnID), Value: []byte(connID)}, + }, err } func GetContextWithMQHeader(header []*sarama.RecordHeader) context.Context { diff --git a/pkg/common/log/zap.go b/pkg/common/log/zap.go index 9cf3fe144..9d57acf85 100644 --- a/pkg/common/log/zap.go +++ b/pkg/common/log/zap.go @@ -45,7 +45,7 @@ var ( } ) -// InitFromConfig initializes a Zap-based logger +// InitFromConfig initializes a Zap-based logger. func InitFromConfig( loggerPrefixName, moduleName string, logLevel int, @@ -183,7 +183,6 @@ func (l *ZapLogger) customCallerEncoder(caller zapcore.EntryCaller, enc zapcore. // color = _levelToColor[zapcore.ErrorLevel] // } enc.AppendString(s) - } func (l *ZapLogger) timeEncoder(t time.Time, enc zapcore.PrimitiveArrayEncoder) { diff --git a/pkg/common/mw/gin.go b/pkg/common/mw/gin.go index a544c2a25..287cd5f53 100644 --- a/pkg/common/mw/gin.go +++ b/pkg/common/mw/gin.go @@ -53,7 +53,7 @@ func CorsHandler() gin.HandlerFunc { "content-type", "application/json", ) // Set the return format to json. - //Release all option pre-requests + // Release all option pre-requests if c.Request.Method == http.MethodOptions { c.JSON(http.StatusOK, "Options Request!") c.Abort() diff --git a/pkg/common/prome/gather.go b/pkg/common/prome/gather.go index 910df71d9..a6a41f0ca 100644 --- a/pkg/common/prome/gather.go +++ b/pkg/common/prome/gather.go @@ -20,17 +20,17 @@ import ( ) var ( - //auth rpc + //auth rpc. UserLoginCounter prometheus.Counter UserRegisterCounter prometheus.Counter - //seg + //seg. SeqGetSuccessCounter prometheus.Counter SeqGetFailedCounter prometheus.Counter SeqSetSuccessCounter prometheus.Counter SeqSetFailedCounter prometheus.Counter - //msg-db + //msg-db. MsgInsertRedisSuccessCounter prometheus.Counter MsgInsertRedisFailedCounter prometheus.Counter MsgInsertMongoSuccessCounter prometheus.Counter @@ -40,7 +40,7 @@ var ( MsgPullFromMongoSuccessCounter prometheus.Counter MsgPullFromMongoFailedCounter prometheus.Counter - //msg-ws + //msg-ws. MsgRecvTotalCounter prometheus.Counter GetNewestSeqTotalCounter prometheus.Counter PullMsgBySeqListTotalCounter prometheus.Counter @@ -50,7 +50,7 @@ var ( WorkSuperGroupChatMsgRecvSuccessCounter prometheus.Counter OnlineUserGauge prometheus.Gauge - //msg-msg + //msg-msg. SingleChatMsgProcessSuccessCounter prometheus.Counter SingleChatMsgProcessFailedCounter prometheus.Counter GroupChatMsgProcessSuccessCounter prometheus.Counter @@ -58,23 +58,23 @@ var ( WorkSuperGroupChatMsgProcessSuccessCounter prometheus.Counter WorkSuperGroupChatMsgProcessFailedCounter prometheus.Counter - //msg-push + //msg-push. MsgOnlinePushSuccessCounter prometheus.Counter MsgOfflinePushSuccessCounter prometheus.Counter MsgOfflinePushFailedCounter prometheus.Counter - // api + // api. ApiRequestCounter prometheus.Counter ApiRequestSuccessCounter prometheus.Counter ApiRequestFailedCounter prometheus.Counter - // grpc + // grpc. GrpcRequestCounter prometheus.Counter GrpcRequestSuccessCounter prometheus.Counter GrpcRequestFailedCounter prometheus.Counter SendMsgCounter prometheus.Counter - // conversation + // conversation. ConversationCreateSuccessCounter prometheus.Counter ConversationCreateFailedCounter prometheus.Counter ) @@ -88,6 +88,7 @@ func NewUserLoginCounter() { Help: "The number of user login", }) } + func NewUserRegisterCounter() { if UserRegisterCounter != nil { return @@ -107,6 +108,7 @@ func NewSeqGetSuccessCounter() { Help: "The number of successful get seq", }) } + func NewSeqGetFailedCounter() { if SeqGetFailedCounter != nil { return @@ -126,6 +128,7 @@ func NewSeqSetSuccessCounter() { Help: "The number of successful set seq", }) } + func NewSeqSetFailedCounter() { if SeqSetFailedCounter != nil { return @@ -305,6 +308,7 @@ func NewGetNewestSeqTotalCounter() { Help: "the number of get newest seq", }) } + func NewPullMsgBySeqListTotalCounter() { if PullMsgBySeqListTotalCounter != nil { return @@ -404,6 +408,7 @@ func NewWorkSuperGroupChatMsgProcessSuccessCounter() { Help: "The number of work/super group chat msg successful processed", }) } + func NewWorkSuperGroupChatMsgProcessFailedCounter() { if WorkSuperGroupChatMsgProcessFailedCounter != nil { return @@ -433,6 +438,7 @@ func NewMsgOfflinePushSuccessCounter() { Help: "The number of msg successful offline pushed", }) } + func NewMsgOfflinePushFailedCounter() { if MsgOfflinePushFailedCounter != nil { return diff --git a/pkg/common/tokenverify/jwt_token.go b/pkg/common/tokenverify/jwt_token.go index 68fee2602..317347b1d 100644 --- a/pkg/common/tokenverify/jwt_token.go +++ b/pkg/common/tokenverify/jwt_token.go @@ -29,7 +29,7 @@ import ( type Claims struct { UserID string - PlatformID int //login platform + PlatformID int // login platform jwt.RegisteredClaims } @@ -40,10 +40,11 @@ func BuildClaims(uid string, platformID int, ttl int64) Claims { UserID: uid, PlatformID: platformID, RegisteredClaims: jwt.RegisteredClaims{ - ExpiresAt: jwt.NewNumericDate(now.Add(time.Duration(ttl*24) * time.Hour)), //Expiration time - IssuedAt: jwt.NewNumericDate(now), //Issuing time - NotBefore: jwt.NewNumericDate(before), //Begin Effective time - }} + ExpiresAt: jwt.NewNumericDate(now.Add(time.Duration(ttl*24) * time.Hour)), // Expiration time + IssuedAt: jwt.NewNumericDate(now), // Issuing time + NotBefore: jwt.NewNumericDate(before), // Begin Effective time + }, + } } func secret() jwt.Keyfunc { @@ -101,9 +102,11 @@ func CheckAdmin(ctx context.Context) error { func ParseRedisInterfaceToken(redisToken interface{}) (*Claims, error) { return GetClaimFromToken(string(redisToken.([]uint8))) } + func IsManagerUserID(opUserID string) bool { return utils.IsContain(opUserID, config.Config.Manager.UserID) } + func WsVerifyToken(token, userID string, platformID int) error { claim, err := GetClaimFromToken(token) if err != nil { diff --git a/pkg/discoveryregistry/zookeeper/discover.go b/pkg/discoveryregistry/zookeeper/discover.go index 089d34afe..298894040 100644 --- a/pkg/discoveryregistry/zookeeper/discover.go +++ b/pkg/discoveryregistry/zookeeper/discover.go @@ -29,8 +29,10 @@ import ( "google.golang.org/grpc/resolver" ) -var ErrConnIsNil = errors.New("conn is nil") -var ErrConnIsNilButLocalNotNil = errors.New("conn is nil, but local is not nil") +var ( + ErrConnIsNil = errors.New("conn is nil") + ErrConnIsNilButLocalNotNil = errors.New("conn is nil, but local is not nil") +) func (s *ZkClient) watch() { for { @@ -54,7 +56,6 @@ func (s *ZkClient) watch() { case zk.EventNotWatching: } } - } func (s *ZkClient) GetConnsRemote(serviceName string) (conns []resolver.Address, err error) { diff --git a/pkg/errs/code.go b/pkg/errs/code.go index 4b7193824..9f70ab5e6 100644 --- a/pkg/errs/code.go +++ b/pkg/errs/code.go @@ -43,10 +43,10 @@ const ( CallbackError = 80000 - //通用错误码. - ServerInternalError = 500 //服务器内部错误 - ArgsError = 1001 //输入参数错误 - NoPermissionError = 1002 //权限不足 + // 通用错误码. + ServerInternalError = 500 // 服务器内部错误 + ArgsError = 1001 // 输入参数错误 + NoPermissionError = 1002 // 权限不足 DuplicateKeyError = 1003 RecordNotFoundError = 1004 // 记录不存在 diff --git a/pkg/rpcclient/conversation.go b/pkg/rpcclient/conversation.go index c3648a050..1e866c555 100644 --- a/pkg/rpcclient/conversation.go +++ b/pkg/rpcclient/conversation.go @@ -71,6 +71,7 @@ func (c *ConversationRpcClient) SetConversationMaxSeq(ctx context.Context, owner _, err := c.Client.SetConversationMaxSeq(ctx, &pbConversation.SetConversationMaxSeqReq{OwnerUserID: ownerUserIDs, ConversationID: conversationID, MaxSeq: maxSeq}) return err } + func (c *ConversationRpcClient) SetConversations(ctx context.Context, userIDs []string, conversation *pbConversation.ConversationReq) error { _, err := c.Client.SetConversations(ctx, &pbConversation.SetConversationsReq{UserIDs: userIDs, Conversation: conversation}) return err diff --git a/pkg/rpcclient/friend.go b/pkg/rpcclient/friend.go index 51b63700c..5a8ac0633 100644 --- a/pkg/rpcclient/friend.go +++ b/pkg/rpcclient/friend.go @@ -61,14 +61,13 @@ func (f *FriendRpcClient) GetFriendsInfo( return } -// possibleFriendUserID是否在userID的好友中 +// possibleFriendUserID是否在userID的好友中. func (f *FriendRpcClient) IsFriend(ctx context.Context, possibleFriendUserID, userID string) (bool, error) { resp, err := f.Client.IsFriend(ctx, &friend.IsFriendReq{UserID1: userID, UserID2: possibleFriendUserID}) if err != nil { return false, err } return resp.InUser1Friends, nil - } func (f *FriendRpcClient) GetFriendIDs(ctx context.Context, ownerUserID string) (friendIDs []string, err error) { diff --git a/pkg/rpcclient/msg.go b/pkg/rpcclient/msg.go index 09e44884f..7632060b0 100644 --- a/pkg/rpcclient/msg.go +++ b/pkg/rpcclient/msg.go @@ -27,7 +27,7 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" "google.golang.org/grpc" "google.golang.org/protobuf/proto" - // "google.golang.org/protobuf/proto" + // "google.golang.org/protobuf/proto". ) func newContentTypeConf() map[int32]config.NotificationConf { diff --git a/pkg/rpcclient/notification/conevrsation.go b/pkg/rpcclient/notification/conevrsation.go index 12e851375..d2995df3d 100644 --- a/pkg/rpcclient/notification/conevrsation.go +++ b/pkg/rpcclient/notification/conevrsation.go @@ -30,7 +30,7 @@ func NewConversationNotificationSender(msgRpcClient *rpcclient.MessageRpcClient) return &ConversationNotificationSender{rpcclient.NewNotificationSender(rpcclient.WithRpcClient(msgRpcClient))} } -// SetPrivate调用 +// SetPrivate调用. func (c *ConversationNotificationSender) ConversationSetPrivateNotification( ctx context.Context, sendID, recvID string, @@ -44,7 +44,7 @@ func (c *ConversationNotificationSender) ConversationSetPrivateNotification( return c.Notification(ctx, sendID, recvID, constant.ConversationPrivateChatNotification, tips) } -// 会话改变 +// 会话改变. func (c *ConversationNotificationSender) ConversationChangeNotification(ctx context.Context, userID string) error { tips := &sdkws.ConversationUpdateTips{ UserID: userID, @@ -52,7 +52,7 @@ func (c *ConversationNotificationSender) ConversationChangeNotification(ctx cont return c.Notification(ctx, userID, userID, constant.ConversationChangeNotification, tips) } -// 会话未读数同步 +// 会话未读数同步. func (c *ConversationNotificationSender) ConversationUnreadChangeNotification( ctx context.Context, userID, conversationID string, diff --git a/pkg/rpcclient/notification/group.go b/pkg/rpcclient/notification/group.go index 3f312c241..61b12d58d 100644 --- a/pkg/rpcclient/notification/group.go +++ b/pkg/rpcclient/notification/group.go @@ -442,8 +442,10 @@ func (g *GroupNotificationSender) GroupMemberMutedNotification(ctx context.Conte if err != nil { return err } - tips := &sdkws.GroupMemberMutedTips{Group: group, MutedSeconds: mutedSeconds, - OpUser: user[mcontext.GetOpUserID(ctx)], MutedUser: user[groupMemberUserID]} + tips := &sdkws.GroupMemberMutedTips{ + Group: group, MutedSeconds: mutedSeconds, + OpUser: user[mcontext.GetOpUserID(ctx)], MutedUser: user[groupMemberUserID], + } if err := g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { return err } diff --git a/pkg/rpcclient/third.go b/pkg/rpcclient/third.go index 578586ca3..e1181b8f4 100644 --- a/pkg/rpcclient/third.go +++ b/pkg/rpcclient/third.go @@ -16,9 +16,10 @@ package rpcclient import ( "context" + "net/url" + "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" - "net/url" "google.golang.org/grpc" @@ -54,7 +55,7 @@ func minioInit() (*minio.Client, error) { } opts := &minio.Options{ Creds: credentials.NewStaticV4(config.Config.Object.Minio.AccessKeyID, config.Config.Object.Minio.SecretAccessKey, ""), - //Region: config.Config.Credential.Minio.Location, + // Region: config.Config.Credential.Minio.Location, } if minioUrl.Scheme == "http" { opts.Secure = false diff --git a/scripts/make-rules/golang.mk b/scripts/make-rules/golang.mk index fefa36c04..adefa5085 100644 --- a/scripts/make-rules/golang.mk +++ b/scripts/make-rules/golang.mk @@ -57,10 +57,18 @@ ifeq (${BINS},) $(error Could not determine BINS, set ROOT_DIR or run in source dir) endif -ifeq (${COMMANDS},) +ifeq ($(OS),Windows_NT) + NULL := + SPACE := $(NULL) $(NULL) + ROOT_DIR := $(subst $(SPACE),\$(SPACE),$(shell cd)) +else + ROOT_DIR := $(shell pwd) +endif + +ifeq ($(strip $(COMMANDS)),) $(error Could not determine COMMANDS, set ROOT_DIR or run in source dir) endif -ifeq (${BINS},) +ifeq ($(strip $(BINS)),) $(error Could not determine BINS, set ROOT_DIR or run in source dir) endif @@ -184,4 +192,4 @@ go.clean: ## copyright.help: Show copyright help .PHONY: go.help go.help: scripts/make-rules/golang.mk - $(call smallhelp) + $(call smallhelp) \ No newline at end of file