From c30c19ca3e4ca9fd41901cf23246e6f25122f7bc Mon Sep 17 00:00:00 2001 From: HFO4 <912394456@qq.com> Date: Sun, 1 Mar 2020 16:25:56 +0800 Subject: [PATCH] Feat: create SFC for oss callback --- go.mod | 1 + go.sum | 2 + models/migration.go | 1 + pkg/filesystem/driver/cos/handller.go | 27 +++++- pkg/filesystem/driver/cos/scf.go | 133 ++++++++++++++++++++++++++ pkg/filesystem/upload.go | 2 + pkg/request/request.go | 11 +++ routers/controllers/admin.go | 11 +++ routers/controllers/file.go | 2 + routers/controllers/user.go | 2 + routers/router.go | 4 +- service/admin/policy.go | 36 ++++++- 12 files changed, 229 insertions(+), 3 deletions(-) create mode 100644 pkg/filesystem/driver/cos/scf.go diff --git a/go.mod b/go.mod index 80bc9aa..2677e06 100644 --- a/go.mod +++ b/go.mod @@ -39,6 +39,7 @@ require ( github.com/smartystreets/goconvey v1.6.4 // indirect github.com/speps/go-hashids v2.0.0+incompatible github.com/stretchr/testify v1.4.0 + github.com/tencentcloud/tencentcloud-sdk-go v3.0.125+incompatible github.com/tencentyun/cos-go-sdk-v5 v0.0.0-20200120023323-87ff3bc489ac github.com/upyun/go-sdk v2.1.0+incompatible golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 diff --git a/go.sum b/go.sum index 07cfc29..9c13cbc 100644 --- a/go.sum +++ b/go.sum @@ -210,6 +210,8 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/tencentcloud/tencentcloud-sdk-go v3.0.125+incompatible h1:dqpmYaez7VBT7PCRBcBxkzlDOiTk7Td8ATiia1b1GuE= +github.com/tencentcloud/tencentcloud-sdk-go v3.0.125+incompatible/go.mod h1:0PfYow01SHPMhKY31xa+EFz2RStxIqj6JFAJS+IkCi4= github.com/tencentyun/cos-go-sdk-v5 v0.0.0-20200120023323-87ff3bc489ac h1:PSBhZblOjdwH7SIVgcue+7OlnLHkM45KuScLZ+PiVbQ= github.com/tencentyun/cos-go-sdk-v5 v0.0.0-20200120023323-87ff3bc489ac/go.mod h1:wQBO5HdAkLjj2q6XQiIfDSP8DXDNrppDRw2Kp/1BODA= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= diff --git a/models/migration.go b/models/migration.go index 33d568d..823e55b 100644 --- a/models/migration.go +++ b/models/migration.go @@ -108,6 +108,7 @@ solid #e9e9e9;"bgcolor="#fff"> 0 { + postPolicy.Conditions = append(postPolicy.Conditions, + []interface{}{"content-length-range", 0, handler.Policy.MaxSize}) + } + res, err := handler.getUploadCredential(ctx, postPolicy, keyTime) if err == nil { res.Callback = apiURL diff --git a/pkg/filesystem/driver/cos/scf.go b/pkg/filesystem/driver/cos/scf.go new file mode 100644 index 0000000..2874f5a --- /dev/null +++ b/pkg/filesystem/driver/cos/scf.go @@ -0,0 +1,133 @@ +package cos + +import ( + "archive/zip" + "bytes" + "encoding/base64" + model "github.com/HFO4/cloudreve/models" + "github.com/HFO4/cloudreve/pkg/hashid" + "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common" + "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common/profile" + scf "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/scf/v20180416" + "io" + "io/ioutil" + "net/url" + "strconv" + "strings" + "time" +) + +const scfFunc = `# -*- coding: utf8 -*- +# SCF配置COS触发,向 Cloudreve 发送回调 +from qcloud_cos_v5 import CosConfig +from qcloud_cos_v5 import CosS3Client +from qcloud_cos_v5 import CosServiceError +from qcloud_cos_v5 import CosClientError +import sys +import logging +import requests + +logging.basicConfig(level=logging.INFO, stream=sys.stdout) +logger = logging.getLogger() + + +def main_handler(event, context): + logger.info("start main handler") + for record in event['Records']: + try: + if "x-cos-meta-callback" not in record['cos']['cosObject']['meta']: + logger.info("Cannot find callback URL, skiped.") + return 'Success' + callback = record['cos']['cosObject']['meta']['x-cos-meta-callback'] + key = record['cos']['cosObject']['key'] + logger.info("Callback URL is " + callback) + + r = requests.get(callback) + print(r.text) + + + + except Exception as e: + print(e) + print('Error getting object {} callback url. '.format(key)) + raise e + return "Fail" + + return "Success" +` + +// CreateSCF 创建回调云函数 +func CreateSCF(policy *model.Policy, region string) error { + // 初始化客户端 + credential := common.NewCredential( + policy.AccessKey, + policy.SecretKey, + ) + cpf := profile.NewClientProfile() + client, err := scf.NewClient(credential, region, cpf) + if err != nil { + return err + } + + // 创建回调代码数据 + buff := &bytes.Buffer{} + bs64 := base64.NewEncoder(base64.StdEncoding, buff) + zipWriter := zip.NewWriter(bs64) + header := zip.FileHeader{ + Name: "callback.py", + Method: zip.Deflate, + } + writer, err := zipWriter.CreateHeader(&header) + if err != nil { + return err + } + _, err = io.Copy(writer, strings.NewReader(scfFunc)) + zipWriter.Close() + + // 创建云函数 + req := scf.NewCreateFunctionRequest() + funcName := "cloudreve_" + hashid.HashID(policy.ID, hashid.PolicyID) + strconv.FormatInt(time.Now().Unix(), 10) + zipFileBytes, _ := ioutil.ReadAll(buff) + zipFileStr := string(zipFileBytes) + codeSource := "ZipFile" + handler := "callback.main_handler" + desc := "Cloudreve 用回调函数" + timeout := int64(60) + runtime := "Python3.6" + req.FunctionName = &funcName + req.Code = &scf.Code{ + ZipFile: &zipFileStr, + } + req.Handler = &handler + req.Description = &desc + req.Timeout = &timeout + req.Runtime = &runtime + req.CodeSource = &codeSource + + _, err = client.CreateFunction(req) + if err != nil { + return err + } + + time.Sleep(time.Duration(5) * time.Second) + + // 创建触发器 + server, _ := url.Parse(policy.Server) + triggerType := "cos" + triggerDesc := `{"event":"cos:ObjectCreated:Post","filter":{"Prefix":"","Suffix":""}}` + enable := "OPEN" + + trigger := scf.NewCreateTriggerRequest() + trigger.FunctionName = &funcName + trigger.TriggerName = &server.Host + trigger.Type = &triggerType + trigger.TriggerDesc = &triggerDesc + trigger.Enable = &enable + + _, err = client.CreateTrigger(trigger) + if err != nil { + return err + } + + return nil +} diff --git a/pkg/filesystem/upload.go b/pkg/filesystem/upload.go index 49e6108..ee446ef 100644 --- a/pkg/filesystem/upload.go +++ b/pkg/filesystem/upload.go @@ -6,6 +6,7 @@ import ( "github.com/HFO4/cloudreve/pkg/cache" "github.com/HFO4/cloudreve/pkg/filesystem/driver/local" "github.com/HFO4/cloudreve/pkg/filesystem/fsctx" + "github.com/HFO4/cloudreve/pkg/request" "github.com/HFO4/cloudreve/pkg/serializer" "github.com/HFO4/cloudreve/pkg/util" "github.com/gin-gonic/gin" @@ -26,6 +27,7 @@ func (fs *FileSystem) Upload(ctx context.Context, file FileHeader) (err error) { // 上传前的钩子 err = fs.Trigger(ctx, "BeforeUpload") if err != nil { + request.BlackHole(file) return err } diff --git a/pkg/request/request.go b/pkg/request/request.go index aaf69d2..80c17e2 100644 --- a/pkg/request/request.go +++ b/pkg/request/request.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + model "github.com/HFO4/cloudreve/models" "github.com/HFO4/cloudreve/pkg/auth" "github.com/HFO4/cloudreve/pkg/serializer" "github.com/HFO4/cloudreve/pkg/util" @@ -271,3 +272,13 @@ func (instance NopRSCloser) Seek(offset int64, whence int) (int64, error) { return 0, errors.New("未实现") } + +// BlackHole 将客户端发来的数据放入黑洞 +func BlackHole(r io.Reader) { + if !model.IsTrueVal(model.GetSettingByName("reset_after_upload_failed")) { + _, err := io.Copy(ioutil.Discard, r) + if err != nil { + util.Log().Debug("黑洞数据出错,%s", err) + } + } +} diff --git a/routers/controllers/admin.go b/routers/controllers/admin.go index 80ba49d..445497d 100644 --- a/routers/controllers/admin.go +++ b/routers/controllers/admin.go @@ -183,3 +183,14 @@ func AdminAddCORS(c *gin.Context) { c.JSON(200, ErrorResponse(err)) } } + +// AdminAddCORS 创建跨域策略 +func AdminAddSCF(c *gin.Context) { + var service admin.PolicyService + if err := c.ShouldBindJSON(&service); err == nil { + res := service.AddSCF() + c.JSON(200, res) + } else { + c.JSON(200, ErrorResponse(err)) + } +} diff --git a/routers/controllers/file.go b/routers/controllers/file.go index 0a64087..c7bb701 100644 --- a/routers/controllers/file.go +++ b/routers/controllers/file.go @@ -8,6 +8,7 @@ import ( "github.com/HFO4/cloudreve/pkg/filesystem" "github.com/HFO4/cloudreve/pkg/filesystem/driver/local" "github.com/HFO4/cloudreve/pkg/filesystem/fsctx" + "github.com/HFO4/cloudreve/pkg/request" "github.com/HFO4/cloudreve/pkg/serializer" "github.com/HFO4/cloudreve/service/explorer" "github.com/gin-gonic/gin" @@ -276,6 +277,7 @@ func FileUploadStream(c *gin.Context) { // 非可用策略时拒绝上传 if user, ok := c.Get("user"); ok && !user.(*model.User).Policy.IsTransitUpload(fileSize) { + request.BlackHole(c.Request.Body) c.JSON(200, serializer.Err(serializer.CodePolicyNotAllowed, "当前存储策略无法使用", nil)) return } diff --git a/routers/controllers/user.go b/routers/controllers/user.go index 858da99..c91ce80 100644 --- a/routers/controllers/user.go +++ b/routers/controllers/user.go @@ -6,6 +6,7 @@ import ( model "github.com/HFO4/cloudreve/models" "github.com/HFO4/cloudreve/pkg/authn" "github.com/HFO4/cloudreve/pkg/qq" + "github.com/HFO4/cloudreve/pkg/request" "github.com/HFO4/cloudreve/pkg/serializer" "github.com/HFO4/cloudreve/pkg/thumb" "github.com/HFO4/cloudreve/pkg/util" @@ -298,6 +299,7 @@ func UploadAvatar(c *gin.Context) { // 取得头像上传大小限制 maxSize := model.GetIntSetting("avatar_size", 2097152) if c.Request.ContentLength == -1 || c.Request.ContentLength > int64(maxSize) { + request.BlackHole(c.Request.Body) c.JSON(200, serializer.Err(serializer.CodeUploadFailed, "头像尺寸太大", nil)) return } diff --git a/routers/router.go b/routers/router.go index 4607c71..1abe503 100644 --- a/routers/router.go +++ b/routers/router.go @@ -334,8 +334,10 @@ func InitMasterRouter() *gin.Engine { policy.POST("test/slave", controllers.AdminTestSlave) // 创建存储策略 policy.POST("", controllers.AdminAddPolicy) - // 创建存储策略 + // 创建跨域策略 policy.POST("cors", controllers.AdminAddCORS) + // 创建COS回调函数 + policy.POST("scf", controllers.AdminAddSCF) } } diff --git a/service/admin/policy.go b/service/admin/policy.go index 24c806c..bc7187c 100644 --- a/service/admin/policy.go +++ b/service/admin/policy.go @@ -7,10 +7,13 @@ import ( model "github.com/HFO4/cloudreve/models" "github.com/HFO4/cloudreve/pkg/auth" "github.com/HFO4/cloudreve/pkg/conf" + "github.com/HFO4/cloudreve/pkg/filesystem/driver/cos" "github.com/HFO4/cloudreve/pkg/filesystem/driver/oss" "github.com/HFO4/cloudreve/pkg/request" "github.com/HFO4/cloudreve/pkg/serializer" "github.com/HFO4/cloudreve/pkg/util" + cossdk "github.com/tencentyun/cos-go-sdk-v5" + "net/http" "net/url" "os" "path/filepath" @@ -40,7 +43,22 @@ type AddPolicyService struct { // PolicyService 存储策略ID服务 type PolicyService struct { - ID uint `json:"id" binding:"required"` + ID uint `json:"id" binding:"required"` + Region string `json:"region"` +} + +// AddSCF 创建回调云函数 +func (service *PolicyService) AddSCF() serializer.Response { + policy, err := model.GetPolicyByID(service.ID) + if err != nil { + return serializer.Err(serializer.CodeNotFound, "存储策略不存在", nil) + } + + if err := cos.CreateSCF(&policy, service.Region); err != nil { + return serializer.Err(serializer.CodeInternalSetting, "云函数创建失败", err) + } + + return serializer.Response{} } // AddCORS 创建跨域策略 @@ -59,6 +77,22 @@ func (service *PolicyService) AddCORS() serializer.Response { if err := handler.CORS(); err != nil { return serializer.Err(serializer.CodeInternalSetting, "跨域策略添加失败", err) } + case "cos": + u, _ := url.Parse(policy.Server) + b := &cossdk.BaseURL{BucketURL: u} + handler := cos.Driver{ + Policy: &policy, + HTTPClient: request.HTTPClient{}, + Client: cossdk.NewClient(b, &http.Client{ + Transport: &cossdk.AuthorizationTransport{ + SecretID: policy.AccessKey, + SecretKey: policy.SecretKey, + }, + }), + } + if err := handler.CORS(); err != nil { + return serializer.Err(serializer.CodeInternalSetting, "跨域策略添加失败", err) + } default: return serializer.ParamErr("不支持此策略", nil) }