Merge branch 'main' into send_msg

pull/954/head
wangchuxiao 2 years ago
commit 595f37a1c1

@ -37,8 +37,8 @@ require (
require github.com/google/uuid v1.3.0
require (
github.com/OpenIMSDK/protocol v0.0.14
github.com/OpenIMSDK/tools v0.0.13
github.com/OpenIMSDK/protocol v0.0.15
github.com/OpenIMSDK/tools v0.0.14
github.com/aliyun/aliyun-oss-go-sdk v2.2.8+incompatible
github.com/go-redis/redis v6.15.9+incompatible
github.com/go-sql-driver/mysql v1.7.1

@ -17,10 +17,10 @@ cloud.google.com/go/storage v1.30.1/go.mod h1:NfxhC0UJE1aXSx7CIIbCf7y9HKT7Biccwk
firebase.google.com/go v3.13.0+incompatible h1:3TdYC3DDi6aHn20qoRkxwGqNgdjtblwVAyRLQwGn/+4=
firebase.google.com/go v3.13.0+incompatible/go.mod h1:xlah6XbEyW6tbfSklcfe5FHJIwjt8toICdV5Wh9ptHs=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/OpenIMSDK/protocol v0.0.14 h1:cvQ3f8MTcyYygAnZ7Exq6zIbvHGCEV0fWdpzjQEDDBQ=
github.com/OpenIMSDK/protocol v0.0.14/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y=
github.com/OpenIMSDK/tools v0.0.13 h1:rcw4HS8S2DPZR9UOBxD8/ol9UBMzXBypzOVEytDRIMo=
github.com/OpenIMSDK/tools v0.0.13/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI=
github.com/OpenIMSDK/protocol v0.0.15 h1:KrrvdHH9kFF/tFYL2FXRPAr2e5F5DctSHfHq6MQjUI4=
github.com/OpenIMSDK/protocol v0.0.15/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y=
github.com/OpenIMSDK/tools v0.0.14 h1:WLof/+WxyPyRST+QkoTKubYCiV73uCLiL8pgnpH/yKQ=
github.com/OpenIMSDK/tools v0.0.14/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI=
github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM=
github.com/Shopify/sarama v1.29.0 h1:ARid8o8oieau9XrHI55f/L3EoRAhm9px6sonbD7yuUE=
github.com/Shopify/sarama v1.29.0/go.mod h1:2QpgD79wpdAESqNQMxNc0KYMkycd4slxGdV3TWSVqrU=

@ -26,7 +26,7 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/http"
)
func url() string {
func callBackURL() string {
return config.Config.Callback.CallbackUrl
}
@ -49,7 +49,7 @@ func CallbackUserOnline(ctx context.Context, userID string, platformID int, isAp
ConnID: connID,
}
resp := cbapi.CommonCallbackResp{}
return http.CallBackPostReturn(ctx, url(), &req, &resp, config.Config.Callback.CallbackUserOnline)
return http.CallBackPostReturn(ctx, callBackURL(), &req, &resp, config.Config.Callback.CallbackUserOnline)
}
func CallbackUserOffline(ctx context.Context, userID string, platformID int, connID string) error {
@ -70,7 +70,7 @@ func CallbackUserOffline(ctx context.Context, userID string, platformID int, con
ConnID: connID,
}
resp := &cbapi.CallbackUserOfflineResp{}
return http.CallBackPostReturn(ctx, url(), req, resp, config.Config.Callback.CallbackUserOffline)
return http.CallBackPostReturn(ctx, callBackURL(), req, resp, config.Config.Callback.CallbackUserOffline)
}
func CallbackUserKickOff(ctx context.Context, userID string, platformID int) error {
@ -90,7 +90,7 @@ func CallbackUserKickOff(ctx context.Context, userID string, platformID int) err
Seq: time.Now().UnixMilli(),
}
resp := &cbapi.CommonCallbackResp{}
return http.CallBackPostReturn(ctx, url(), req, resp, config.Config.Callback.CallbackUserOffline)
return http.CallBackPostReturn(ctx, callBackURL(), req, resp, config.Config.Callback.CallbackUserOffline)
}
// func callbackUserOnline(operationID, userID string, platformID int, token string, isAppBackground bool, connID

@ -16,6 +16,7 @@ package msggateway
import (
"net/http"
"net/url"
"strconv"
"time"
@ -71,6 +72,11 @@ func newContext(respWriter http.ResponseWriter, req *http.Request) *UserConnCont
ConnID: utils.Md5(req.RemoteAddr + "_" + strconv.Itoa(int(utils.GetCurrentTimestampByMill()))),
}
}
func newTempContext() *UserConnContext {
return &UserConnContext{
Req: &http.Request{URL: &url.URL{}},
}
}
func (c *UserConnContext) GetRemoteAddr() string {
return c.RemoteAddr
@ -116,9 +122,15 @@ func (c *UserConnContext) GetOperationID() string {
return c.Req.URL.Query().Get(OperationID)
}
func (c *UserConnContext) SetOperationID(operationID string) {
c.Req.URL.Query().Set(OperationID, operationID)
}
func (c *UserConnContext) GetToken() string {
return c.Req.URL.Query().Get(Token)
}
func (c *UserConnContext) SetToken(token string) {
c.Req.URL.RawQuery = Token + "=" + token
}
func (c *UserConnContext) GetBackground() bool {
b, err := strconv.ParseBool(c.Req.URL.Query().Get(BackgroundStatus))

@ -17,6 +17,8 @@ package msggateway
import (
"context"
"github.com/OpenIMSDK/tools/mcontext"
"github.com/OpenIMSDK/Open-IM-Server/pkg/authverify"
"github.com/OpenIMSDK/tools/errs"
@ -35,13 +37,13 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/startrpc"
)
func (s *Server) InitServer(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
func (s *Server) InitServer(disCov discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := cache.NewRedis()
if err != nil {
return err
}
msgModel := cache.NewMsgCacheModel(rdb)
s.LongConnServer.SetDiscoveryRegistry(client)
s.LongConnServer.SetDiscoveryRegistry(disCov)
s.LongConnServer.SetCacheHandler(msgModel)
msggateway.RegisterMsgGatewayServer(server, s)
return nil
@ -198,6 +200,20 @@ func (s *Server) MultiTerminalLoginCheck(
ctx context.Context,
req *msggateway.MultiTerminalLoginCheckReq,
) (*msggateway.MultiTerminalLoginCheckResp, error) {
// TODO implement me
panic("implement me")
if oldClients, userOK, clientOK := s.LongConnServer.GetUserPlatformCons(req.UserID, int(req.PlatformID)); userOK {
tempUserCtx := newTempContext()
tempUserCtx.SetToken(req.Token)
tempUserCtx.SetOperationID(mcontext.GetOperationID(ctx))
client := &Client{}
client.ctx = tempUserCtx
client.UserID = req.UserID
client.PlatformID = int(req.PlatformID)
i := &kickHandler{
clientOK: clientOK,
oldClients: oldClients,
newClient: client,
}
s.LongConnServer.SetKickHandlerInfo(i)
}
return &msggateway.MultiTerminalLoginCheckResp{}, nil
}

@ -23,6 +23,8 @@ import (
"sync/atomic"
"time"
"github.com/OpenIMSDK/protocol/msggateway"
"github.com/OpenIMSDK/Open-IM-Server/pkg/authverify"
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
@ -52,6 +54,7 @@ type LongConnServer interface {
SetDiscoveryRegistry(client discoveryregistry.SvcDiscoveryRegistry)
KickUserConn(client *Client) error
UnRegister(c *Client)
SetKickHandlerInfo(i *kickHandler)
Compressor
Encoder
MessageHandler
@ -78,6 +81,7 @@ type WsServer struct {
validate *validator.Validate
cache cache.MsgModel
userClient *rpcclient.UserRpcClient
disCov discoveryregistry.SvcDiscoveryRegistry
Compressor
Encoder
MessageHandler
@ -88,10 +92,11 @@ type kickHandler struct {
newClient *Client
}
func (ws *WsServer) SetDiscoveryRegistry(client discoveryregistry.SvcDiscoveryRegistry) {
ws.MessageHandler = NewGrpcHandler(ws.validate, client)
u := rpcclient.NewUserRpcClient(client)
func (ws *WsServer) SetDiscoveryRegistry(disCov discoveryregistry.SvcDiscoveryRegistry) {
ws.MessageHandler = NewGrpcHandler(ws.validate, disCov)
u := rpcclient.NewUserRpcClient(disCov)
ws.userClient = &u
ws.disCov = disCov
}
func (ws *WsServer) SetUserOnlineStatus(ctx context.Context, client *Client, status int32) {
@ -180,6 +185,31 @@ func (ws *WsServer) Run() error {
return http.ListenAndServe(":"+utils.IntToString(ws.port), nil) // Start listening
}
func (ws *WsServer) sendUserOnlineInfoToOtherNode(ctx context.Context, client *Client) error {
conns, err := ws.disCov.GetConns(ctx, config.Config.RpcRegisterName.OpenImMessageGatewayName)
if err != nil {
return err
}
// Online push user online message to other node
for _, v := range conns {
if v.Target() == ws.disCov.GetSelfConnTarget() {
log.ZDebug(ctx, "Filter out this node", "node", v.Target())
continue
}
msgClient := msggateway.NewMsgGatewayClient(v)
_, err := msgClient.MultiTerminalLoginCheck(ctx, &msggateway.MultiTerminalLoginCheckReq{UserID: client.UserID,
PlatformID: int32(client.PlatformID), Token: client.token})
if err != nil {
log.ZWarn(ctx, "MultiTerminalLoginCheck err", err, "node", v.Target())
continue
}
}
return nil
}
func (ws *WsServer) SetKickHandlerInfo(i *kickHandler) {
ws.kickHandlerChan <- i
}
func (ws *WsServer) registerClient(client *Client) {
var (
userOK bool
@ -211,6 +241,7 @@ func (ws *WsServer) registerClient(client *Client) {
atomic.AddInt64(&ws.onlineUserConnNum, 1)
}
}
ws.sendUserOnlineInfoToOtherNode(client.ctx, client)
ws.SetUserOnlineStatus(client.ctx, client, constant.Online)
log.ZInfo(
client.ctx,
@ -249,7 +280,10 @@ func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Clien
fallthrough
case constant.AllLoginButSameTermKick:
if clientOK {
ws.clients.deleteClients(newClient.UserID, oldClients)
isDeleteUser := ws.clients.deleteClients(newClient.UserID, oldClients)
if isDeleteUser {
atomic.AddInt64(&ws.onlineUserNum, -1)
}
for _, c := range oldClients {
err := c.KickOnlineMessage()
if err != nil {
@ -301,7 +335,8 @@ func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Clien
m[k] = constant.KickedToken
}
}
log.ZDebug(newClient.ctx, "set token map is ", "token map", m, "userID", newClient.UserID)
log.ZDebug(newClient.ctx, "set token map is ", "token map", m, "userID",
newClient.UserID, "token", newClient.ctx.GetToken())
err = ws.cache.SetTokenMapByUidPid(newClient.ctx, newClient.UserID, newClient.PlatformID, m)
if err != nil {
log.ZWarn(newClient.ctx, "SetTokenMapByUidPid err", err, "userID", newClient.UserID, "platformID", newClient.PlatformID)

@ -134,7 +134,7 @@ func (s *authServer) forceKickOff(ctx context.Context, userID string, platformID
return err
}
for _, v := range conns {
log.ZDebug(ctx, "forceKickOff", "conn", v.(*grpc.ClientConn).Target())
log.ZDebug(ctx, "forceKickOff", "conn", v.Target())
}
for _, v := range conns {
client := msggateway.NewMsgGatewayClient(v)

@ -49,6 +49,9 @@ func UpdateGroupInfoMap(ctx context.Context, group *sdkws.GroupInfoForSet) map[s
if group.ApplyMemberFriend != nil {
m["apply_member_friend"] = group.ApplyMemberFriend.Value
}
if group.Ex != nil {
m["ex"] = group.Ex.Value
}
return m
}

@ -78,7 +78,6 @@ func NewMinio() (s3.Interface, error) {
}
m := &Minio{
bucket: conf.Bucket,
bucketURL: conf.Endpoint + "/" + conf.Bucket + "/",
core: &minio.Core{Client: client},
lock: &sync.Mutex{},
init: false,
@ -86,6 +85,7 @@ func NewMinio() (s3.Interface, error) {
if conf.SignEndpoint == "" || conf.SignEndpoint == conf.Endpoint {
m.opts = opts
m.sign = m.core.Client
m.bucketURL = conf.Endpoint + "/" + conf.Bucket + "/"
} else {
su, err := url.Parse(conf.SignEndpoint)
if err != nil {
@ -99,6 +99,7 @@ func NewMinio() (s3.Interface, error) {
if err != nil {
return nil, err
}
m.bucketURL = conf.SignEndpoint + "/" + conf.Bucket + "/"
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

@ -16,6 +16,7 @@ package data_conversion
import (
"fmt"
"github.com/Shopify/sarama"
)

@ -17,11 +17,13 @@ package data_conversion
import (
"context"
"fmt"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
"time"
"github.com/OpenIMSDK/tools/log"
"gorm.io/driver/mysql"
"gorm.io/gorm"
"time"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
)
var (

Loading…
Cancel
Save