From 0dc573253fc94eddca7838dbab6714f346fef3bd Mon Sep 17 00:00:00 2001 From: HFO4 <912394456@qq.com> Date: Mon, 18 Oct 2021 21:13:21 +0800 Subject: [PATCH] Feat: test aria2 rpc connection in slave --- models/migration.go | 5 --- pkg/aria2/aria2.go | 22 +++++++++++++ routers/controllers/admin.go | 9 +++++- routers/router.go | 5 +++ service/admin/aria2.go | 62 ++++++++++++++++++++++++++---------- service/admin/node.go | 13 ++++++-- service/admin/policy.go | 2 +- 7 files changed, 92 insertions(+), 26 deletions(-) diff --git a/models/migration.go b/models/migration.go index 3e493f03..8bbca38d 100644 --- a/models/migration.go +++ b/models/migration.go @@ -140,11 +140,6 @@ Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; verti {Name: "gravatar_server", Value: `https://www.gravatar.com/`, Type: "avatar"}, {Name: "defaultTheme", Value: `#3f51b5`, Type: "basic"}, {Name: "themes", Value: `{"#3f51b5":{"palette":{"primary":{"main":"#3f51b5"},"secondary":{"main":"#f50057"}}},"#2196f3":{"palette":{"primary":{"main":"#2196f3"},"secondary":{"main":"#FFC107"}}},"#673AB7":{"palette":{"primary":{"main":"#673AB7"},"secondary":{"main":"#2196F3"}}},"#E91E63":{"palette":{"primary":{"main":"#E91E63"},"secondary":{"main":"#42A5F5","contrastText":"#fff"}}},"#FF5722":{"palette":{"primary":{"main":"#FF5722"},"secondary":{"main":"#3F51B5"}}},"#FFC107":{"palette":{"primary":{"main":"#FFC107"},"secondary":{"main":"#26C6DA"}}},"#8BC34A":{"palette":{"primary":{"main":"#8BC34A","contrastText":"#fff"},"secondary":{"main":"#FF8A65","contrastText":"#fff"}}},"#009688":{"palette":{"primary":{"main":"#009688"},"secondary":{"main":"#4DD0E1","contrastText":"#fff"}}},"#607D8B":{"palette":{"primary":{"main":"#607D8B"},"secondary":{"main":"#F06292"}}},"#795548":{"palette":{"primary":{"main":"#795548"},"secondary":{"main":"#4CAF50","contrastText":"#fff"}}}}`, Type: "basic"}, - {Name: "aria2_token", Value: ``, Type: "aria2"}, - {Name: "aria2_rpcurl", Value: ``, Type: "aria2"}, - {Name: "aria2_temp_path", Value: ``, Type: "aria2"}, - {Name: "aria2_options", Value: `{}`, Type: "aria2"}, - {Name: "aria2_interval", Value: `60`, Type: "aria2"}, {Name: "max_worker_num", Value: `10`, Type: "task"}, {Name: "max_parallel_transfer", Value: `4`, Type: "task"}, {Name: "secret_key", Value: util.RandStringRunes(256), Type: "auth"}, diff --git a/pkg/aria2/aria2.go b/pkg/aria2/aria2.go index b86ecd99..d7f9abe0 100644 --- a/pkg/aria2/aria2.go +++ b/pkg/aria2/aria2.go @@ -1,11 +1,16 @@ package aria2 import ( + "context" + "fmt" + "net/url" "sync" + "time" model "github.com/cloudreve/Cloudreve/v3/models" "github.com/cloudreve/Cloudreve/v3/pkg/aria2/common" "github.com/cloudreve/Cloudreve/v3/pkg/aria2/monitor" + "github.com/cloudreve/Cloudreve/v3/pkg/aria2/rpc" "github.com/cloudreve/Cloudreve/v3/pkg/balancer" ) @@ -41,3 +46,20 @@ func Init(isReload bool) { } } } + +// TestRPCConnection 发送测试用的 RPC 请求,测试服务连通性 +func TestRPCConnection(server, secret string, timeout int) (rpc.VersionInfo, error) { + // 解析RPC服务地址 + rpcServer, err := url.Parse(server) + if err != nil { + return rpc.VersionInfo{}, fmt.Errorf("cannot parse RPC server: %w", err) + } + + rpcServer.Path = "/jsonrpc" + caller, err := rpc.New(context.Background(), rpcServer.String(), secret, time.Duration(timeout)*time.Second, nil) + if err != nil { + return rpc.VersionInfo{}, fmt.Errorf("cannot initialize rpc connection: %w", err) + } + + return caller.GetVersion() +} diff --git a/routers/controllers/admin.go b/routers/controllers/admin.go index 4c2cdaf1..4f3440f6 100644 --- a/routers/controllers/admin.go +++ b/routers/controllers/admin.go @@ -3,6 +3,7 @@ package controllers import ( "io" + model "github.com/cloudreve/Cloudreve/v3/models" "github.com/cloudreve/Cloudreve/v3/pkg/aria2" "github.com/cloudreve/Cloudreve/v3/pkg/email" "github.com/cloudreve/Cloudreve/v3/pkg/request" @@ -92,7 +93,13 @@ func AdminSendTestMail(c *gin.Context) { func AdminTestAria2(c *gin.Context) { var service admin.Aria2TestService if err := c.ShouldBindJSON(&service); err == nil { - res := service.Test() + var res serializer.Response + if service.Type == model.MasterNodeType { + res = service.TestMaster() + } else { + res = service.TestSlave() + } + c.JSON(200, res) } else { c.JSON(200, ErrorResponse(err)) diff --git a/routers/router.go b/routers/router.go index b1dd9c32..638aa757 100644 --- a/routers/router.go +++ b/routers/router.go @@ -40,6 +40,8 @@ func InitSlaveRouter() *gin.Engine { { // Ping v3.POST("ping", controllers.SlavePing) + // 测试 Aria2 RPC 连接 + v3.POST("ping/aria2", controllers.AdminTestAria2) // 接收主机心跳包 v3.POST("heartbeat", controllers.SlaveHeartbeat) // 上传 @@ -446,6 +448,9 @@ func InitMasterRouter() *gin.Engine { { // 列出从机节点 node.POST("list", controllers.AdminListNodes) + + // 列出从机节点 + node.POST("aria2/test", controllers.AdminTestAria2) } } diff --git a/service/admin/aria2.go b/service/admin/aria2.go index 8801c962..0df3275b 100644 --- a/service/admin/aria2.go +++ b/service/admin/aria2.go @@ -1,43 +1,71 @@ package admin import ( + "bytes" + "encoding/json" + model "github.com/cloudreve/Cloudreve/v3/models" "net/url" + "time" "github.com/cloudreve/Cloudreve/v3/pkg/aria2" + "github.com/cloudreve/Cloudreve/v3/pkg/auth" + "github.com/cloudreve/Cloudreve/v3/pkg/request" "github.com/cloudreve/Cloudreve/v3/pkg/serializer" ) // Aria2TestService aria2连接测试服务 type Aria2TestService struct { - Server string `json:"server" binding:"required"` - Token string `json:"token"` + Server string `json:"server" binding:"required"` + RPC string `json:"rpc" binding:"required"` + Secret string `json:"secret" binding:"required"` + Token string `json:"token"` + Type model.ModelType `json:"type"` } // Test 测试aria2连接 -func (service *Aria2TestService) Test() serializer.Response { - testRPC := aria2.RPCService{} - - // 解析RPC服务地址 - server, err := url.Parse(service.Server) +func (service *Aria2TestService) TestMaster() serializer.Response { + res, err := aria2.TestRPCConnection(service.RPC, service.Token, 5) if err != nil { - return serializer.ParamErr("无法解析 aria2 RPC 服务地址, "+err.Error(), nil) + return serializer.ParamErr(err.Error(), err) } - server.Path = "/jsonrpc" - if err := testRPC.Init(server.String(), service.Token, 5, map[string]interface{}{}); err != nil { - return serializer.ParamErr("无法初始化连接, "+err.Error(), nil) + if res.Version == "" { + return serializer.ParamErr("RPC 服务返回非预期响应", nil) } - defer testRPC.Caller.Close() + return serializer.Response{Data: res.Version} +} - info, err := testRPC.Caller.GetVersion() +func (service *Aria2TestService) TestSlave() serializer.Response { + slave, err := url.Parse(service.Server) if err != nil { - return serializer.ParamErr("无法请求 RPC 服务, "+err.Error(), nil) + return serializer.ParamErr("无法解析从机端地址,"+err.Error(), nil) } - if info.Version == "" { - return serializer.ParamErr("RPC 服务返回非预期响应", nil) + controller, _ := url.Parse("/api/v3/slave/ping/aria2") + + // 请求正文 + service.Type = model.MasterNodeType + bodyByte, _ := json.Marshal(service) + + r := request.NewClient() + res, err := r.Request( + "POST", + slave.ResolveReference(controller).String(), + bytes.NewReader(bodyByte), + request.WithTimeout(time.Duration(10)*time.Second), + request.WithCredential( + auth.HMACAuth{SecretKey: []byte(service.Secret)}, + int64(model.GetIntSetting("slave_api_timeout", 60)), + ), + ).DecodeResponse() + if err != nil { + return serializer.ParamErr("无连接到从机,"+err.Error(), nil) + } + + if res.Code != 0 { + return serializer.ParamErr("成功接到从机,但是从机返回:"+res.Msg, nil) } - return serializer.Response{Data: info.Version} + return serializer.Response{Data: res.Data.(string)} } diff --git a/service/admin/node.go b/service/admin/node.go index 17084fbf..6f70c7b1 100644 --- a/service/admin/node.go +++ b/service/admin/node.go @@ -2,6 +2,7 @@ package admin import ( model "github.com/cloudreve/Cloudreve/v3/models" + "github.com/cloudreve/Cloudreve/v3/pkg/cluster" "github.com/cloudreve/Cloudreve/v3/pkg/serializer" "strings" ) @@ -35,8 +36,16 @@ func (service *AdminListService) Nodes() serializer.Response { // 查询记录 tx.Limit(service.PageSize).Offset((service.Page - 1) * service.PageSize).Find(&res) + isActive := make(map[uint]bool) + for i := 0; i < len(res); i++ { + if node := cluster.Default.GetNodeByID(res[i].ID); node != nil { + isActive[res[i].ID] = node.IsActive() + } + } + return serializer.Response{Data: map[string]interface{}{ - "total": total, - "items": res, + "total": total, + "items": res, + "active": isActive, }} } diff --git a/service/admin/policy.go b/service/admin/policy.go index 657d7524..5c648ecd 100644 --- a/service/admin/policy.go +++ b/service/admin/policy.go @@ -245,7 +245,7 @@ func (service *SlaveTestService) Test() serializer.Response { } if res.Code != 0 { - return serializer.ParamErr("成功接到从机,但是"+res.Msg, nil) + return serializer.ParamErr("成功接到从机,但是从机返回:"+res.Msg, nil) } return serializer.Response{}