Feat: master node ping slave node in REST API

pull/1040/head
HFO4 4 years ago
parent 5316b8ed11
commit 08704fffb0

@ -5,8 +5,8 @@ import "github.com/jinzhu/gorm"
// Node 从机节点信息模型 // Node 从机节点信息模型
type Node struct { type Node struct {
gorm.Model gorm.Model
Status NodeStatus // 任务状态 Status NodeStatus // 节点状态
Type ModelType // 任务状态 Type ModelType // 节点状态
Server string // 服务器地址 Server string // 服务器地址
SecretKey string `gorm:"type:text"` // 通信密钥 SecretKey string `gorm:"type:text"` // 通信密钥
Aria2Enabled bool // 是否支持用作离线下载节点 Aria2Enabled bool // 是否支持用作离线下载节点

@ -2,12 +2,17 @@ package cluster
import ( import (
model "github.com/cloudreve/Cloudreve/v3/models" model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
) )
type MasterNode struct { type MasterNode struct {
Model *model.Node Model *model.Node
} }
func (node *MasterNode) Ping(req *serializer.NodePingReq) (*serializer.NodePingResp, error) {
return &serializer.NodePingResp{}, nil
}
// IsFeatureEnabled 查询节点的某项功能是否启用 // IsFeatureEnabled 查询节点的某项功能是否启用
func (node *MasterNode) IsFeatureEnabled(feature string) bool { func (node *MasterNode) IsFeatureEnabled(feature string) bool {
switch feature { switch feature {

@ -1,17 +1,25 @@
package cluster package cluster
import model "github.com/cloudreve/Cloudreve/v3/models" import (
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/auth"
"github.com/cloudreve/Cloudreve/v3/pkg/request"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
)
type Node interface { type Node interface {
IsFeatureEnabled(feature string) bool IsFeatureEnabled(feature string) bool
SubscribeStatusChange(callback func()) SubscribeStatusChange(callback func())
Ping(req *serializer.NodePingReq) (*serializer.NodePingResp, error)
} }
func getNodeFromDBModel(node *model.Node) Node { func getNodeFromDBModel(node *model.Node) Node {
switch node.Type { switch node.Type {
case model.SlaveNodeType: case model.SlaveNodeType:
return &SlaveNode{ return &SlaveNode{
Model: node, Model: node,
AuthInstance: auth.HMACAuth{SecretKey: []byte(node.SecretKey)},
Client: request.HTTPClient{},
} }
default: default:
return &MasterNode{ return &MasterNode{

@ -1,13 +1,27 @@
package cluster package cluster
import ( import (
"context"
"encoding/json"
"errors"
model "github.com/cloudreve/Cloudreve/v3/models" model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/auth"
"github.com/cloudreve/Cloudreve/v3/pkg/request"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
"net/url"
"path"
"strings"
"sync" "sync"
"time"
) )
type SlaveNode struct { type SlaveNode struct {
Model *model.Node Model *model.Node
AuthInstance auth.Auth
Client request.Client
callback func() callback func()
ctx context.Context
lock sync.RWMutex lock sync.RWMutex
} }
@ -26,5 +40,68 @@ func (node *SlaveNode) SubscribeStatusChange(callback func()) {
node.lock.Unlock() node.lock.Unlock()
} }
// Ping 从机节点,返回从机负载
func (node *SlaveNode) Ping(req *serializer.NodePingReq) (*serializer.NodePingResp, error) {
reqBodyEncoded, err := json.Marshal(req)
if err != nil {
return nil, err
}
bodyReader := strings.NewReader(string(reqBodyEncoded))
signTTL := model.GetIntSetting("slave_api_timeout", 60)
resp, err := node.Client.Request(
"POST",
node.getAPIUrl("heartbeat"),
bodyReader,
request.WithCredential(node.AuthInstance, int64(signTTL)),
).CheckHTTPResponse(200).DecodeResponse()
if err != nil {
return nil, err
}
// 处理列取结果
if resp.Code != 0 {
return nil, errors.New(resp.Error)
}
var res serializer.NodePingResp
if resStr, ok := resp.Data.(string); ok {
err = json.Unmarshal([]byte(resStr), &res)
if err != nil {
return nil, err
}
}
return &res, nil
}
// getAPIUrl 获取接口请求地址
func (node *SlaveNode) getAPIUrl(scope string) string {
node.lock.RLock()
serverURL, err := url.Parse(node.Model.Server)
node.lock.RUnlock()
if err != nil {
return ""
}
var controller *url.URL
controller, _ = url.Parse("/api/v3/slave")
controller.Path = path.Join(controller.Path, scope)
return serverURL.ResolveReference(controller).String()
}
func (node *SlaveNode) pingLoop() {
t := time.NewTicker(time.Second)
for {
select {
case <-t.C:
case <-node.ctx.Done():
}
}
}
// PingLoop // PingLoop
// RecoverLoop // RecoverLoop

@ -10,3 +10,11 @@ type ListRequest struct {
Path string `json:"path"` Path string `json:"path"`
Recursive bool `json:"recursive"` Recursive bool `json:"recursive"`
} }
// NodePingReq 从机节点Ping请求
type NodePingReq struct {
}
// NodePingResp 从机节点Ping响应
type NodePingResp struct {
}

@ -11,6 +11,7 @@ import (
"github.com/cloudreve/Cloudreve/v3/pkg/serializer" "github.com/cloudreve/Cloudreve/v3/pkg/serializer"
"github.com/cloudreve/Cloudreve/v3/service/admin" "github.com/cloudreve/Cloudreve/v3/service/admin"
"github.com/cloudreve/Cloudreve/v3/service/explorer" "github.com/cloudreve/Cloudreve/v3/service/explorer"
"github.com/cloudreve/Cloudreve/v3/service/node"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
) )
@ -175,3 +176,14 @@ func SlaveList(c *gin.Context) {
c.JSON(200, ErrorResponse(err)) c.JSON(200, ErrorResponse(err))
} }
} }
// SlaveHeartbeat 接受主机心跳包
func SlaveHeartbeat(c *gin.Context) {
var service serializer.NodePingReq
if err := c.ShouldBindJSON(&service); err == nil {
res := node.HandleMasterHeartbeat(&service)
c.JSON(200, res)
} else {
c.JSON(200, ErrorResponse(err))
}
}

@ -37,6 +37,8 @@ func InitSlaveRouter() *gin.Engine {
{ {
// Ping // Ping
v3.POST("ping", controllers.SlavePing) v3.POST("ping", controllers.SlavePing)
// 接收主机心跳包
v3.POST("heartbeat", controllers.SlaveHeartbeat)
// 上传 // 上传
v3.POST("upload", controllers.SlaveUpload) v3.POST("upload", controllers.SlaveUpload)
// 下载 // 下载

@ -0,0 +1,10 @@
package node
import "github.com/cloudreve/Cloudreve/v3/pkg/serializer"
func HandleMasterHeartbeat(req *serializer.NodePingReq) serializer.Response {
return serializer.Response{
Code: 0,
Data: serializer.NodePingResp{},
}
}
Loading…
Cancel
Save