From 08704fffb00c54e616f262bca090816471d9295c Mon Sep 17 00:00:00 2001 From: HFO4 <912394456@qq.com> Date: Thu, 19 Aug 2021 20:17:57 +0800 Subject: [PATCH] Feat: master node ping slave node in REST API --- models/node.go | 4 +- pkg/cluster/master.go | 5 +++ pkg/cluster/node.go | 12 +++++- pkg/cluster/slave.go | 79 +++++++++++++++++++++++++++++++++++- pkg/serializer/slave.go | 8 ++++ routers/controllers/slave.go | 12 ++++++ routers/router.go | 2 + service/node/fabric.go | 10 +++++ 8 files changed, 127 insertions(+), 5 deletions(-) create mode 100644 service/node/fabric.go diff --git a/models/node.go b/models/node.go index 40371e72..8fb67918 100644 --- a/models/node.go +++ b/models/node.go @@ -5,8 +5,8 @@ import "github.com/jinzhu/gorm" // Node 从机节点信息模型 type Node struct { gorm.Model - Status NodeStatus // 任务状态 - Type ModelType // 任务状态 + Status NodeStatus // 节点状态 + Type ModelType // 节点状态 Server string // 服务器地址 SecretKey string `gorm:"type:text"` // 通信密钥 Aria2Enabled bool // 是否支持用作离线下载节点 diff --git a/pkg/cluster/master.go b/pkg/cluster/master.go index 278a2296..27478428 100644 --- a/pkg/cluster/master.go +++ b/pkg/cluster/master.go @@ -2,12 +2,17 @@ package cluster import ( model "github.com/cloudreve/Cloudreve/v3/models" + "github.com/cloudreve/Cloudreve/v3/pkg/serializer" ) type MasterNode struct { Model *model.Node } +func (node *MasterNode) Ping(req *serializer.NodePingReq) (*serializer.NodePingResp, error) { + return &serializer.NodePingResp{}, nil +} + // IsFeatureEnabled 查询节点的某项功能是否启用 func (node *MasterNode) IsFeatureEnabled(feature string) bool { switch feature { diff --git a/pkg/cluster/node.go b/pkg/cluster/node.go index b3085686..7c08d5d0 100644 --- a/pkg/cluster/node.go +++ b/pkg/cluster/node.go @@ -1,17 +1,25 @@ package cluster -import model "github.com/cloudreve/Cloudreve/v3/models" +import ( + model "github.com/cloudreve/Cloudreve/v3/models" + "github.com/cloudreve/Cloudreve/v3/pkg/auth" + "github.com/cloudreve/Cloudreve/v3/pkg/request" + "github.com/cloudreve/Cloudreve/v3/pkg/serializer" +) type Node interface { IsFeatureEnabled(feature string) bool SubscribeStatusChange(callback func()) + Ping(req *serializer.NodePingReq) (*serializer.NodePingResp, error) } func getNodeFromDBModel(node *model.Node) Node { switch node.Type { case model.SlaveNodeType: return &SlaveNode{ - Model: node, + Model: node, + AuthInstance: auth.HMACAuth{SecretKey: []byte(node.SecretKey)}, + Client: request.HTTPClient{}, } default: return &MasterNode{ diff --git a/pkg/cluster/slave.go b/pkg/cluster/slave.go index 852f039c..64ab2696 100644 --- a/pkg/cluster/slave.go +++ b/pkg/cluster/slave.go @@ -1,13 +1,27 @@ package cluster import ( + "context" + "encoding/json" + "errors" model "github.com/cloudreve/Cloudreve/v3/models" + "github.com/cloudreve/Cloudreve/v3/pkg/auth" + "github.com/cloudreve/Cloudreve/v3/pkg/request" + "github.com/cloudreve/Cloudreve/v3/pkg/serializer" + "net/url" + "path" + "strings" "sync" + "time" ) type SlaveNode struct { - Model *model.Node + Model *model.Node + AuthInstance auth.Auth + Client request.Client + callback func() + ctx context.Context lock sync.RWMutex } @@ -26,5 +40,68 @@ func (node *SlaveNode) SubscribeStatusChange(callback func()) { node.lock.Unlock() } +// Ping 从机节点,返回从机负载 +func (node *SlaveNode) Ping(req *serializer.NodePingReq) (*serializer.NodePingResp, error) { + reqBodyEncoded, err := json.Marshal(req) + if err != nil { + return nil, err + } + + bodyReader := strings.NewReader(string(reqBodyEncoded)) + signTTL := model.GetIntSetting("slave_api_timeout", 60) + + resp, err := node.Client.Request( + "POST", + node.getAPIUrl("heartbeat"), + bodyReader, + request.WithCredential(node.AuthInstance, int64(signTTL)), + ).CheckHTTPResponse(200).DecodeResponse() + if err != nil { + return nil, err + } + + // 处理列取结果 + if resp.Code != 0 { + return nil, errors.New(resp.Error) + } + + var res serializer.NodePingResp + + if resStr, ok := resp.Data.(string); ok { + err = json.Unmarshal([]byte(resStr), &res) + if err != nil { + return nil, err + } + } + + return &res, nil +} + +// getAPIUrl 获取接口请求地址 +func (node *SlaveNode) getAPIUrl(scope string) string { + node.lock.RLock() + serverURL, err := url.Parse(node.Model.Server) + node.lock.RUnlock() + if err != nil { + return "" + } + + var controller *url.URL + controller, _ = url.Parse("/api/v3/slave") + controller.Path = path.Join(controller.Path, scope) + return serverURL.ResolveReference(controller).String() +} + +func (node *SlaveNode) pingLoop() { + t := time.NewTicker(time.Second) + + for { + select { + case <-t.C: + case <-node.ctx.Done(): + } + } +} + // PingLoop // RecoverLoop diff --git a/pkg/serializer/slave.go b/pkg/serializer/slave.go index e23e809d..f0ee1877 100644 --- a/pkg/serializer/slave.go +++ b/pkg/serializer/slave.go @@ -10,3 +10,11 @@ type ListRequest struct { Path string `json:"path"` Recursive bool `json:"recursive"` } + +// NodePingReq 从机节点Ping请求 +type NodePingReq struct { +} + +// NodePingResp 从机节点Ping响应 +type NodePingResp struct { +} diff --git a/routers/controllers/slave.go b/routers/controllers/slave.go index e10e2b0c..6d7e7f0c 100644 --- a/routers/controllers/slave.go +++ b/routers/controllers/slave.go @@ -11,6 +11,7 @@ import ( "github.com/cloudreve/Cloudreve/v3/pkg/serializer" "github.com/cloudreve/Cloudreve/v3/service/admin" "github.com/cloudreve/Cloudreve/v3/service/explorer" + "github.com/cloudreve/Cloudreve/v3/service/node" "github.com/gin-gonic/gin" ) @@ -175,3 +176,14 @@ func SlaveList(c *gin.Context) { c.JSON(200, ErrorResponse(err)) } } + +// SlaveHeartbeat 接受主机心跳包 +func SlaveHeartbeat(c *gin.Context) { + var service serializer.NodePingReq + if err := c.ShouldBindJSON(&service); err == nil { + res := node.HandleMasterHeartbeat(&service) + c.JSON(200, res) + } else { + c.JSON(200, ErrorResponse(err)) + } +} diff --git a/routers/router.go b/routers/router.go index 3ae63e91..3209a985 100644 --- a/routers/router.go +++ b/routers/router.go @@ -37,6 +37,8 @@ func InitSlaveRouter() *gin.Engine { { // Ping v3.POST("ping", controllers.SlavePing) + // 接收主机心跳包 + v3.POST("heartbeat", controllers.SlaveHeartbeat) // 上传 v3.POST("upload", controllers.SlaveUpload) // 下载 diff --git a/service/node/fabric.go b/service/node/fabric.go new file mode 100644 index 00000000..78dfc307 --- /dev/null +++ b/service/node/fabric.go @@ -0,0 +1,10 @@ +package node + +import "github.com/cloudreve/Cloudreve/v3/pkg/serializer" + +func HandleMasterHeartbeat(req *serializer.NodePingReq) serializer.Response { + return serializer.Response{ + Code: 0, + Data: serializer.NodePingResp{}, + } +}