diff --git a/middleware/cluster_test.go b/middleware/cluster_test.go index 0ccdcb6..7e28e6f 100644 --- a/middleware/cluster_test.go +++ b/middleware/cluster_test.go @@ -6,7 +6,7 @@ import ( "github.com/cloudreve/Cloudreve/v3/pkg/aria2/common" "github.com/cloudreve/Cloudreve/v3/pkg/auth" "github.com/cloudreve/Cloudreve/v3/pkg/cluster" - "github.com/cloudreve/Cloudreve/v3/pkg/mocks" + "github.com/cloudreve/Cloudreve/v3/pkg/mocks/controllermock" "github.com/gin-gonic/gin" "github.com/jinzhu/gorm" "github.com/stretchr/testify/assert" @@ -82,7 +82,7 @@ func TestUseSlaveAria2Instance(t *testing.T) { // MasterSiteID not set { - testController := &mocks.SlaveControllerMock{} + testController := &controllermock.SlaveControllerMock{} useSlaveAria2InstanceFunc := UseSlaveAria2Instance(testController) c, _ := gin.CreateTestContext(httptest.NewRecorder()) c.Request = httptest.NewRequest("GET", "/", nil) @@ -92,7 +92,7 @@ func TestUseSlaveAria2Instance(t *testing.T) { // Cannot get aria2 instances { - testController := &mocks.SlaveControllerMock{} + testController := &controllermock.SlaveControllerMock{} useSlaveAria2InstanceFunc := UseSlaveAria2Instance(testController) c, _ := gin.CreateTestContext(httptest.NewRecorder()) c.Request = httptest.NewRequest("GET", "/", nil) @@ -105,7 +105,7 @@ func TestUseSlaveAria2Instance(t *testing.T) { // Success { - testController := &mocks.SlaveControllerMock{} + testController := &controllermock.SlaveControllerMock{} useSlaveAria2InstanceFunc := UseSlaveAria2Instance(testController) c, _ := gin.CreateTestContext(httptest.NewRecorder()) c.Request = httptest.NewRequest("GET", "/", nil) diff --git a/pkg/filesystem/driver/onedrive/api.go b/pkg/filesystem/driver/onedrive/api.go index d8f7b38..784d906 100644 --- a/pkg/filesystem/driver/onedrive/api.go +++ b/pkg/filesystem/driver/onedrive/api.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/cloudreve/Cloudreve/v3/pkg/conf" "io" "io/ioutil" "net/http" @@ -573,7 +574,7 @@ func sysError(err error) *RespError { func (client *Client) request(ctx context.Context, method string, url string, body io.Reader, option ...request.Option) (string, *RespError) { // 获取凭证 - err := client.UpdateCredential(ctx) + err := client.UpdateCredential(ctx, conf.SystemConfig.Mode == "slave") if err != nil { return "", sysError(err) } diff --git a/pkg/filesystem/driver/onedrive/client.go b/pkg/filesystem/driver/onedrive/client.go index dbbca3c..dfd205c 100644 --- a/pkg/filesystem/driver/onedrive/client.go +++ b/pkg/filesystem/driver/onedrive/client.go @@ -2,6 +2,7 @@ package onedrive import ( "errors" + "github.com/cloudreve/Cloudreve/v3/pkg/cluster" model "github.com/cloudreve/Cloudreve/v3/models" "github.com/cloudreve/Cloudreve/v3/pkg/request" @@ -28,7 +29,8 @@ type Client struct { ClientSecret string Redirect string - Request request.Client + Request request.Client + ClusterController cluster.Controller } // Endpoints OneDrive客户端相关设置 @@ -51,11 +53,12 @@ func NewClient(policy *model.Policy) (*Client, error) { Credential: &Credential{ RefreshToken: policy.AccessKey, }, - Policy: policy, - ClientID: policy.BucketName, - ClientSecret: policy.SecretKey, - Redirect: policy.OptionsSerialized.OdRedirect, - Request: request.NewClient(), + Policy: policy, + ClientID: policy.BucketName, + ClientSecret: policy.SecretKey, + Redirect: policy.OptionsSerialized.OdRedirect, + Request: request.NewClient(), + ClusterController: cluster.DefaultController, } if client.Endpoints.DriverResource == "" { diff --git a/pkg/filesystem/driver/onedrive/oauth.go b/pkg/filesystem/driver/onedrive/oauth.go index 0d5865b..d7edbb5 100644 --- a/pkg/filesystem/driver/onedrive/oauth.go +++ b/pkg/filesystem/driver/onedrive/oauth.go @@ -3,7 +3,6 @@ package onedrive import ( "context" "encoding/json" - "github.com/cloudreve/Cloudreve/v3/pkg/cluster" "io/ioutil" "net/http" "net/url" @@ -11,7 +10,6 @@ import ( "time" "github.com/cloudreve/Cloudreve/v3/pkg/cache" - "github.com/cloudreve/Cloudreve/v3/pkg/conf" "github.com/cloudreve/Cloudreve/v3/pkg/request" "github.com/cloudreve/Cloudreve/v3/pkg/util" ) @@ -125,8 +123,8 @@ func (client *Client) ObtainToken(ctx context.Context, opts ...Option) (*Credent } // UpdateCredential 更新凭证,并检查有效期 -func (client *Client) UpdateCredential(ctx context.Context) error { - if conf.SystemConfig.Mode == "slave" { +func (client *Client) UpdateCredential(ctx context.Context, isSlave bool) error { + if isSlave { return client.fetchCredentialFromMaster(ctx) } @@ -179,7 +177,7 @@ func (client *Client) UpdateCredential(ctx context.Context) error { // UpdateCredential 更新凭证,并检查有效期 func (client *Client) fetchCredentialFromMaster(ctx context.Context) error { - res, err := cluster.DefaultController.GetOneDriveToken(client.Policy.MasterID, client.Policy.ID) + res, err := client.ClusterController.GetOneDriveToken(client.Policy.MasterID, client.Policy.ID) if err != nil { return err } diff --git a/pkg/filesystem/driver/onedrive/oauth_test.go b/pkg/filesystem/driver/onedrive/oauth_test.go index 62243dc..61c5e75 100644 --- a/pkg/filesystem/driver/onedrive/oauth_test.go +++ b/pkg/filesystem/driver/onedrive/oauth_test.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "errors" + "github.com/cloudreve/Cloudreve/v3/pkg/mocks/controllermock" "io" "io/ioutil" "net/http" @@ -269,10 +270,10 @@ func TestClient_UpdateCredential(t *testing.T) { // 无有效的RefreshToken { - err := client.UpdateCredential(context.Background()) + err := client.UpdateCredential(context.Background(), false) asserts.Equal(ErrInvalidRefreshToken, err) client.Credential = nil - err = client.UpdateCredential(context.Background()) + err = client.UpdateCredential(context.Background(), false) asserts.Equal(ErrInvalidRefreshToken, err) } @@ -299,7 +300,7 @@ func TestClient_UpdateCredential(t *testing.T) { mock.ExpectBegin() mock.ExpectExec("UPDATE(.+)").WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectCommit() - err := client.UpdateCredential(context.Background()) + err := client.UpdateCredential(context.Background(), false) clientMock.AssertExpectations(t) asserts.NoError(mock.ExpectationsWereMet()) asserts.NoError(err) @@ -331,7 +332,7 @@ func TestClient_UpdateCredential(t *testing.T) { client.Credential = &Credential{ RefreshToken: "old_refresh_token", } - err := client.UpdateCredential(context.Background()) + err := client.UpdateCredential(context.Background(), false) clientMock.AssertExpectations(t) asserts.Error(err) } @@ -346,7 +347,7 @@ func TestClient_UpdateCredential(t *testing.T) { client.Credential = &Credential{ RefreshToken: "old_refresh_token", } - err := client.UpdateCredential(context.Background()) + err := client.UpdateCredential(context.Background(), false) asserts.NoError(err) asserts.Equal("AccessToken", client.Credential.AccessToken) asserts.Equal("RefreshToken", client.Credential.RefreshToken) @@ -359,8 +360,27 @@ func TestClient_UpdateCredential(t *testing.T) { AccessToken: "AccessToken2", ExpiresIn: time.Now().Add(time.Duration(10) * time.Second).Unix(), } - err := client.UpdateCredential(context.Background()) + err := client.UpdateCredential(context.Background(), false) asserts.NoError(err) asserts.Equal("AccessToken2", client.Credential.AccessToken) } + + // slave failed + { + mockController := &controllermock.SlaveControllerMock{} + mockController.On("GetOneDriveToken", testMock.Anything, testMock.Anything).Return("", errors.New("error")) + client.ClusterController = mockController + err := client.UpdateCredential(context.Background(), true) + asserts.Error(err) + } + + // slave success + { + mockController := &controllermock.SlaveControllerMock{} + mockController.On("GetOneDriveToken", testMock.Anything, testMock.Anything).Return("AccessToken3", nil) + client.ClusterController = mockController + err := client.UpdateCredential(context.Background(), true) + asserts.NoError(err) + asserts.Equal("AccessToken3", client.Credential.AccessToken) + } } diff --git a/pkg/filesystem/driver/remote/handler_test.go b/pkg/filesystem/driver/remote/handler_test.go index a1dc698..0a565f7 100644 --- a/pkg/filesystem/driver/remote/handler_test.go +++ b/pkg/filesystem/driver/remote/handler_test.go @@ -105,7 +105,7 @@ func TestHandler_Source(t *testing.T) { // 解析失败 自定义CDN { handler := Driver{ - Policy: &model.Policy{Server: "/", BaseURL: string(0x7f)}, + Policy: &model.Policy{Server: "/", BaseURL: string([]byte{0x7f})}, AuthInstance: auth.HMACAuth{}, } file := model.File{ diff --git a/pkg/filesystem/file_test.go b/pkg/filesystem/file_test.go index d80cd37..14f43cd 100644 --- a/pkg/filesystem/file_test.go +++ b/pkg/filesystem/file_test.go @@ -228,25 +228,25 @@ func TestFileSystem_deleteGroupedFile(t *testing.T) { }, } - // 全部失败 + // 全部不存在 { failed := fs.deleteGroupedFile(ctx, fs.GroupFileByPolicy(ctx, files)) asserts.Equal(map[uint][]string{ - 1: {"1_1.txt", "1_2.txt"}, - 2: {"2_1.txt", "2_2.txt"}, - 3: {"3_1.txt"}, + 1: {}, + 2: {}, + 3: {}, }, failed) } - // 部分失败 + // 部分不存在 { file, err := os.Create(util.RelativePath("1_1.txt")) asserts.NoError(err) _ = file.Close() failed := fs.deleteGroupedFile(ctx, fs.GroupFileByPolicy(ctx, files)) asserts.Equal(map[uint][]string{ - 1: {"1_2.txt"}, - 2: {"2_1.txt", "2_2.txt"}, - 3: {"3_1.txt"}, + 1: {}, + 2: {}, + 3: {}, }, failed) } // 部分失败,包含整组未知存储策略导致的失败 @@ -259,9 +259,9 @@ func TestFileSystem_deleteGroupedFile(t *testing.T) { files[3].Policy.Type = "unknown" failed := fs.deleteGroupedFile(ctx, fs.GroupFileByPolicy(ctx, files)) asserts.Equal(map[uint][]string{ - 1: {"1_2.txt"}, + 1: {}, 2: {"2_1.txt", "2_2.txt"}, - 3: {"3_1.txt"}, + 3: {}, }, failed) } } diff --git a/pkg/filesystem/filesystem_test.go b/pkg/filesystem/filesystem_test.go index 6ec56db..e071ece 100644 --- a/pkg/filesystem/filesystem_test.go +++ b/pkg/filesystem/filesystem_test.go @@ -1,6 +1,9 @@ package filesystem import ( + "github.com/cloudreve/Cloudreve/v3/pkg/cluster" + "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver/shadow/masterinslave" + "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver/shadow/slaveinmaster" "net/http/httptest" "github.com/DATA-DOG/go-sqlmock" @@ -104,6 +107,10 @@ func TestDispatchHandler(t *testing.T) { err = fs.DispatchHandler() asserts.NoError(err) + fs.Policy = &model.Policy{Type: "cos"} + err = fs.DispatchHandler() + asserts.NoError(err) + fs.Policy = &model.Policy{Type: "s3"} err = fs.DispatchHandler() asserts.NoError(err) @@ -262,3 +269,40 @@ func TestFileSystem_SetTargetByInterface(t *testing.T) { asserts.Len(fs.FileTarget, 1) } } + +func TestFileSystem_SwitchToSlaveHandler(t *testing.T) { + a := assert.New(t) + fs := FileSystem{ + User: &model.User{}, + } + mockNode := &cluster.MasterNode{ + Model: &model.Node{}, + } + fs.SwitchToSlaveHandler(mockNode) + a.IsType(&slaveinmaster.Driver{}, fs.Handler) +} + +func TestFileSystem_SwitchToShadowHandler(t *testing.T) { + a := assert.New(t) + fs := FileSystem{ + User: &model.User{}, + Policy: &model.Policy{}, + } + mockNode := &cluster.MasterNode{ + Model: &model.Node{}, + } + + // remote to local + { + fs.Policy.Type = "remote" + fs.SwitchToShadowHandler(mockNode, "", "") + a.IsType(&masterinslave.Driver{}, fs.Handler) + } + + // local to remote + { + fs.Policy.Type = "local" + fs.SwitchToShadowHandler(mockNode, "", "") + a.IsType(&masterinslave.Driver{}, fs.Handler) + } +} diff --git a/pkg/filesystem/hooks_test.go b/pkg/filesystem/hooks_test.go index 2548430..03fe325 100644 --- a/pkg/filesystem/hooks_test.go +++ b/pkg/filesystem/hooks_test.go @@ -708,3 +708,29 @@ func TestHookGiveBackCapacity(t *testing.T) { asserts.EqualValues(7, fs.User.Storage) } } + +func TestHookValidateCapacityWithoutIncrease(t *testing.T) { + a := assert.New(t) + fs := &FileSystem{ + User: &model.User{ + Model: gorm.Model{ID: 1}, + Storage: 10, + Group: model.Group{}, + }, + } + ctx := context.WithValue(context.Background(), fsctx.FileHeaderCtx, local.FileStream{Size: 1}) + + // not enough + { + fs.User.Group.MaxStorage = 10 + a.Error(HookValidateCapacityWithoutIncrease(ctx, fs)) + a.EqualValues(10, fs.User.Storage) + } + + // enough + { + fs.User.Group.MaxStorage = 11 + a.NoError(HookValidateCapacityWithoutIncrease(ctx, fs)) + a.EqualValues(10, fs.User.Storage) + } +} diff --git a/pkg/filesystem/manage_test.go b/pkg/filesystem/manage_test.go index 4156f34..f3186ef 100644 --- a/pkg/filesystem/manage_test.go +++ b/pkg/filesystem/manage_test.go @@ -356,47 +356,6 @@ func TestFileSystem_Delete(t *testing.T) { }} ctx := context.Background() - //全部未成功 - { - // 列出要删除的目录 - mock.ExpectQuery("SELECT(.+)"). - WillReturnRows( - sqlmock.NewRows([]string{"id"}). - AddRow(1). - AddRow(2). - AddRow(3), - ) - mock.ExpectQuery("SELECT(.+)"). - WithArgs(1, 2, 3). - WillReturnRows( - sqlmock.NewRows([]string{"id", "name", "source_name", "policy_id", "size"}). - AddRow(4, "1.txt", "1.txt", 2, 1), - ) - // 查询顶级的文件 - mock.ExpectQuery("SELECT(.+)").WillReturnRows(sqlmock.NewRows([]string{"id", "name", "source_name", "policy_id", "size"}).AddRow(1, "1.txt", "1.txt", 603, 2)) - mock.ExpectQuery("SELECT(.+)files(.+)"). - WillReturnRows(sqlmock.NewRows([]string{"id", "policy_id", "source_name"})) - // 查找软连接 - mock.ExpectQuery("SELECT(.+)").WillReturnRows(sqlmock.NewRows([]string{"id"})) - // 查询上传策略 - mock.ExpectQuery("SELECT(.+)").WillReturnRows(sqlmock.NewRows([]string{"id", "type"}).AddRow(603, "local")) - // 删除文件记录 - mock.ExpectBegin() - mock.ExpectExec("DELETE(.+)files"). - WillReturnResult(sqlmock.NewResult(0, 3)) - mock.ExpectCommit() - // 删除对应分享 - mock.ExpectBegin() - mock.ExpectExec("UPDATE(.+)shares"). - WillReturnResult(sqlmock.NewResult(0, 3)) - mock.ExpectCommit() - - err := fs.Delete(ctx, []uint{1}, []uint{1}, false) - asserts.Error(err) - asserts.Equal(203, err.(serializer.AppError).Code) - asserts.Equal(uint64(3), fs.User.Storage) - asserts.NoError(mock.ExpectationsWereMet()) - } //全部未成功,强制 { fs.CleanTargets() diff --git a/pkg/mocks/controllermock/c.go b/pkg/mocks/controllermock/c.go new file mode 100644 index 0000000..a2890b2 --- /dev/null +++ b/pkg/mocks/controllermock/c.go @@ -0,0 +1,43 @@ +package controllermock + +import ( + "github.com/cloudreve/Cloudreve/v3/pkg/aria2/common" + "github.com/cloudreve/Cloudreve/v3/pkg/cluster" + "github.com/cloudreve/Cloudreve/v3/pkg/mq" + "github.com/cloudreve/Cloudreve/v3/pkg/serializer" + "github.com/stretchr/testify/mock" +) + +type SlaveControllerMock struct { + mock.Mock +} + +func (s SlaveControllerMock) HandleHeartBeat(pingReq *serializer.NodePingReq) (serializer.NodePingResp, error) { + args := s.Called(pingReq) + return args.Get(0).(serializer.NodePingResp), args.Error(1) +} + +func (s SlaveControllerMock) GetAria2Instance(s2 string) (common.Aria2, error) { + args := s.Called(s2) + return args.Get(0).(common.Aria2), args.Error(1) +} + +func (s SlaveControllerMock) SendNotification(s3 string, s2 string, message mq.Message) error { + args := s.Called(s3, s2, message) + return args.Error(0) +} + +func (s SlaveControllerMock) SubmitTask(s3 string, i interface{}, s2 string, f func(interface{})) error { + args := s.Called(s3, i, s2, f) + return args.Error(0) +} + +func (s SlaveControllerMock) GetMasterInfo(s2 string) (*cluster.MasterInfo, error) { + args := s.Called(s2) + return args.Get(0).(*cluster.MasterInfo), args.Error(1) +} + +func (s SlaveControllerMock) GetOneDriveToken(s2 string, u uint) (string, error) { + args := s.Called(s2, u) + return args.String(0), args.Error(1) +} diff --git a/pkg/mocks/mocks.go b/pkg/mocks/mocks.go index 6b7e674..2b085f1 100644 --- a/pkg/mocks/mocks.go +++ b/pkg/mocks/mocks.go @@ -7,7 +7,6 @@ import ( "github.com/cloudreve/Cloudreve/v3/pkg/auth" "github.com/cloudreve/Cloudreve/v3/pkg/balancer" "github.com/cloudreve/Cloudreve/v3/pkg/cluster" - "github.com/cloudreve/Cloudreve/v3/pkg/mq" "github.com/cloudreve/Cloudreve/v3/pkg/request" "github.com/cloudreve/Cloudreve/v3/pkg/serializer" "github.com/cloudreve/Cloudreve/v3/pkg/task" @@ -15,40 +14,6 @@ import ( "io" ) -type SlaveControllerMock struct { - testMock.Mock -} - -func (s SlaveControllerMock) HandleHeartBeat(pingReq *serializer.NodePingReq) (serializer.NodePingResp, error) { - args := s.Called(pingReq) - return args.Get(0).(serializer.NodePingResp), args.Error(1) -} - -func (s SlaveControllerMock) GetAria2Instance(s2 string) (common.Aria2, error) { - args := s.Called(s2) - return args.Get(0).(common.Aria2), args.Error(1) -} - -func (s SlaveControllerMock) SendNotification(s3 string, s2 string, message mq.Message) error { - args := s.Called(s3, s2, message) - return args.Error(0) -} - -func (s SlaveControllerMock) SubmitTask(s3 string, i interface{}, s2 string, f func(interface{})) error { - args := s.Called(s3, i, s2, f) - return args.Error(0) -} - -func (s SlaveControllerMock) GetMasterInfo(s2 string) (*cluster.MasterInfo, error) { - args := s.Called(s2) - return args.Get(0).(*cluster.MasterInfo), args.Error(1) -} - -func (s SlaveControllerMock) GetOneDriveToken(s2 string, u uint) (string, error) { - args := s.Called(s2, u) - return args.String(0), args.Error(1) -} - type NodePoolMock struct { testMock.Mock } diff --git a/service/node/fabric.go b/service/node/fabric.go index 63b5ecf..9ad978d 100644 --- a/service/node/fabric.go +++ b/service/node/fabric.go @@ -4,6 +4,7 @@ import ( "encoding/gob" model "github.com/cloudreve/Cloudreve/v3/models" "github.com/cloudreve/Cloudreve/v3/pkg/cluster" + "github.com/cloudreve/Cloudreve/v3/pkg/conf" "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver/onedrive" "github.com/cloudreve/Cloudreve/v3/pkg/mq" "github.com/cloudreve/Cloudreve/v3/pkg/serializer" @@ -54,7 +55,7 @@ func (s *OneDriveCredentialService) Get(c *gin.Context) serializer.Response { return serializer.Err(serializer.CodeInternalSetting, "Cannot initialize OneDrive client", err) } - if err := client.UpdateCredential(c); err != nil { + if err := client.UpdateCredential(c, conf.SystemConfig.Mode == "slave"); err != nil { return serializer.Err(serializer.CodeInternalSetting, "Cannot refresh OneDrive credential", err) }