diff --git a/aws/pusher/config.go b/aws/pusher/config.go new file mode 100644 index 0000000..a952866 --- /dev/null +++ b/aws/pusher/config.go @@ -0,0 +1,469 @@ +package pusher + +import ( + "os" + "path/filepath" + "regexp" + "time" + + sdkaws "github.com/aws/aws-sdk-go-v2/aws" + + sdksss "github.com/aws/aws-sdk-go-v2/service/s3" + + sdktps "github.com/aws/aws-sdk-go-v2/service/s3/types" + libsiz "github.com/nabbar/golib/size" +) + +var nonAlphanumericRegex = regexp.MustCompile(`[^a-zA-Z0-9 ]+`) + +func clearString(str string) string { + return nonAlphanumericRegex.ReplaceAllString(str, "") +} + +type Config struct { + // FuncGetClientS3 is a function type that represents a function that returns a pointer to + // an *github.com/aws/aws-sdk-go-v2/service/s3.Client object. + FuncGetClientS3 FuncClientS3 + // FuncCallOnUpload is a callback function to be executed when a part is uploaded or a complete object is uploaded. + // + // fct: is a function type that represents a function that takes three parameters: + // upd: UploadInfo that contains information about the last part uploaded, + // obj: ObjectInfo that contains information about the object being uploaded, + // e: error that contains any error that occurred during the upload. + FuncCallOnUpload FuncOnUpload + // FuncCallOnAbort is a callback function to be executed when the upload is aborted or close is called. + // + // fct: is a function type that represents a function that takes two parameters: + // obj: ObjectInfo that contains information about the object being uploaded, + // e: error that contains any error that occurred during the upload. + FuncCallOnAbort FuncOnFinish + // FuncCallOnComplete is a callback function to be executed when the upload is completed. + // + // fct: is a function type that represents a function that takes two parameters: + // obj: ObjectInfo that contains information about the object being uploaded, + // e: error that contains any error that occurred during the upload. + FuncCallOnComplete FuncOnFinish + // WorkingPath represents the working path for the part file. If the part file already exists, it will be overwritten / truncated. + // When content of uploaded part written, it will be appended to this working file. This file will grow up to the size of a part. + // If the size of a part has been reached, the current part is uploaded and the file is truncated. When the first part is uploaded, the mpu is created. + // If the complete method is called before the size of part has been reached, the upload is converted to a standard put object and mpu is set to false. + // + // WorkingPath can be a relative or absolute path, directory or file. + // If a directory is specified, the part file will be created in that directory with an unique name based on the bucket name and unique random part. . + WorkingPath string + // PartSize defines the size of each part for a multipart upload or the maximum size for a standard put object. + // The part size is a size that is at least of 5 MB and at most of 5 GB. + PartSize libsiz.Size + // BufferSize defines the buffer size for the pusher. + // This buffer size is useful if the readerFrom method is used. + BufferSize int + // CheckSum enables or disables the checksum for the pusher. + // This is a integrity check of the uploaded part and object. + // The checksum method used will be sha256 if true. + CheckSum bool + + // ObjectS3Options defines the options for the object s3. + ObjectS3Options struct { + // The name of the bucket where the multipart upload is initiated and where the + // object is uploaded. + // + // Directory buckets - When you use this operation with a directory bucket, you + // must use virtual-hosted-style requests in the format + // Bucket_name.s3express-az_id.region.amazonaws.com . Path-style requests are not + // supported. Directory bucket names must be unique in the chosen Availability + // Zone. Bucket names must follow the format bucket_base_name--az-id--x-s3 (for + // example, DOC-EXAMPLE-BUCKET--usw2-az1--x-s3 ). For information about bucket + // naming restrictions, see [Directory bucket naming rules]in the Amazon S3 User Guide. + // + // Access points - When you use this action with an access point, you must provide + // the alias of the access point in place of the bucket name or specify the access + // point ARN. When using the access point ARN, you must direct requests to the + // access point hostname. The access point hostname takes the form + // AccessPointName-AccountId.s3-accesspoint.Region.amazonaws.com. When using this + // action with an access point through the Amazon Web Services SDKs, you provide + // the access point ARN in place of the bucket name. For more information about + // access point ARNs, see [Using access points]in the Amazon S3 User Guide. + // + // Access points and Object Lambda access points are not supported by directory + // buckets. + // + // S3 on Outposts - When you use this action with Amazon S3 on Outposts, you must + // direct requests to the S3 on Outposts hostname. The S3 on Outposts hostname + // takes the form + // AccessPointName-AccountId.outpostID.s3-outposts.Region.amazonaws.com . When you + // use this action with S3 on Outposts through the Amazon Web Services SDKs, you + // provide the Outposts access point ARN in place of the bucket name. For more + // information about S3 on Outposts ARNs, see [What is S3 on Outposts?]in the Amazon S3 User Guide. + // + // [Directory bucket naming rules]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/directory-bucket-naming-rules.html + // [What is S3 on Outposts?]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/S3onOutposts.html + // [Using access points]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/using-access-points.html + // + // This member is required. + Bucket *string + + // Object key for which the multipart upload is to be initiated. + // + // This member is required. + Key *string + + // The canned ACL to apply to the object. For more information, see [Canned ACL] in the Amazon + // S3 User Guide. + // + // When adding a new object, you can use headers to grant ACL-based permissions to + // individual Amazon Web Services accounts or to predefined groups defined by + // Amazon S3. These permissions are then added to the ACL on the object. By + // default, all objects are private. Only the owner has full access control. For + // more information, see [Access Control List (ACL) Overview]and [Managing ACLs Using the REST API] in the Amazon S3 User Guide. + // + // If the bucket that you're uploading objects to uses the bucket owner enforced + // setting for S3 Object Ownership, ACLs are disabled and no longer affect + // permissions. Buckets that use this setting only accept PUT requests that don't + // specify an ACL or PUT requests that specify bucket owner full control ACLs, such + // as the bucket-owner-full-control canned ACL or an equivalent form of this ACL + // expressed in the XML format. PUT requests that contain other ACLs (for example, + // custom grants to certain Amazon Web Services accounts) fail and return a 400 + // error with the error code AccessControlListNotSupported . For more information, + // see [Controlling ownership of objects and disabling ACLs]in the Amazon S3 User Guide. + // + // - This functionality is not supported for directory buckets. + // + // - This functionality is not supported for Amazon S3 on Outposts. + // + // [Managing ACLs Using the REST API]: https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-using-rest-api.html + // [Access Control List (ACL) Overview]: https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html + // [Canned ACL]: https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html#CannedACL + // [Controlling ownership of objects and disabling ACLs]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/about-object-ownership.html + ACL sdktps.ObjectCannedACL + + // Can be used to specify caching behavior along the request/reply chain. For more + // information, see [http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.9]. + // + // [http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.9]: http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.9 + CacheControl *string + + // Specifies presentational information for the object. For more information, see [https://www.rfc-editor.org/rfc/rfc6266#section-4]. + // + // [https://www.rfc-editor.org/rfc/rfc6266#section-4]: https://www.rfc-editor.org/rfc/rfc6266#section-4 + ContentDisposition *string + + // Specifies what content encodings have been applied to the object and thus what + // decoding mechanisms must be applied to obtain the media-type referenced by the + // Content-Type header field. For more information, see [https://www.rfc-editor.org/rfc/rfc9110.html#field.content-encoding]. + // + // [https://www.rfc-editor.org/rfc/rfc9110.html#field.content-encoding]: https://www.rfc-editor.org/rfc/rfc9110.html#field.content-encoding + ContentEncoding *string + + // The language the content is in. + ContentLanguage *string + + // A standard MIME type describing the format of the contents. For more + // information, see [https://www.rfc-editor.org/rfc/rfc9110.html#name-content-type]. + // + // [https://www.rfc-editor.org/rfc/rfc9110.html#name-content-type]: https://www.rfc-editor.org/rfc/rfc9110.html#name-content-type + ContentType *string + + // The date and time at which the object is no longer cacheable. For more + // information, see [https://www.rfc-editor.org/rfc/rfc7234#section-5.3]. + // + // [https://www.rfc-editor.org/rfc/rfc7234#section-5.3]: https://www.rfc-editor.org/rfc/rfc7234#section-5.3 + Expires *time.Time + + // Gives the grantee READ, READ_ACP, and WRITE_ACP permissions on the object. + // + // - This functionality is not supported for directory buckets. + // + // - This functionality is not supported for Amazon S3 on Outposts. + GrantFullControl *string + + // Allows grantee to read the object data and its metadata. + // + // - This functionality is not supported for directory buckets. + // + // - This functionality is not supported for Amazon S3 on Outposts. + GrantRead *string + + // Allows grantee to read the object ACL. + // + // - This functionality is not supported for directory buckets. + // + // - This functionality is not supported for Amazon S3 on Outposts. + GrantReadACP *string + + // Allows grantee to write the ACL for the applicable object. + // + // - This functionality is not supported for directory buckets. + // + // - This functionality is not supported for Amazon S3 on Outposts. + GrantWriteACP *string + + // A map of metadata to store with the object in S3. + Metadata map[string]string + + // Specifies whether a legal hold will be applied to this object. For more + // information about S3 Object Lock, see [Object Lock]in the Amazon S3 User Guide. + // + // This functionality is not supported for directory buckets. + // + // [Object Lock]: https://docs.aws.amazon.com/AmazonS3/latest/dev/object-lock.html + ObjectLockLegalHoldStatus sdktps.ObjectLockLegalHoldStatus + + // The Object Lock mode that you want to apply to this object. + // + // This functionality is not supported for directory buckets. + ObjectLockMode sdktps.ObjectLockMode + + // The date and time when you want this object's Object Lock to expire. Must be + // formatted as a timestamp parameter. + // + // This functionality is not supported for directory buckets. + ObjectLockRetainUntilDate *time.Time + + // By default, Amazon S3 uses the STANDARD Storage Class to store newly created + // objects. The STANDARD storage class provides high durability and high + // availability. Depending on performance needs, you can specify a different + // Storage Class. For more information, see [Storage Classes]in the Amazon S3 User Guide. + // + // - For directory buckets, only the S3 Express One Zone storage class is + // supported to store newly created objects. + // + // - Amazon S3 on Outposts only uses the OUTPOSTS Storage Class. + // + // [Storage Classes]: https://docs.aws.amazon.com/AmazonS3/latest/dev/storage-class-intro.html + StorageClass sdktps.StorageClass + + // The tag-set for the object. The tag-set must be encoded as URL Query + // parameters. (For example, "Key1=Value1") + // + // This functionality is not supported for directory buckets. + Tagging *string + + // If the bucket is configured as a website, redirects requests for this object to + // another object in the same bucket or to an external URL. Amazon S3 stores the + // value of this header in the object metadata. For information about object + // metadata, see [Object Key and Metadata]in the Amazon S3 User Guide. + // + // In the following example, the request header sets the redirect to an object + // (anotherPage.html) in the same bucket: + // + // x-amz-website-redirect-location: /anotherPage.html + // + // In the following example, the request header sets the object redirect to + // another website: + // + // x-amz-website-redirect-location: http://www.example.com/ + // + // For more information about website hosting in Amazon S3, see [Hosting Websites on Amazon S3] and [How to Configure Website Page Redirects] in the + // Amazon S3 User Guide. + // + // This functionality is not supported for directory buckets. + // + // [How to Configure Website Page Redirects]: https://docs.aws.amazon.com/AmazonS3/latest/dev/how-to-page-redirect.html + // [Hosting Websites on Amazon S3]: https://docs.aws.amazon.com/AmazonS3/latest/dev/WebsiteHosting.html + // [Object Key and Metadata]: https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingMetadata.html + WebsiteRedirectLocation *string + } +} + +func (o *Config) getClientS3() *sdksss.Client { + if o == nil { + return nil + } else if f := o.FuncGetClientS3; f == nil { + return nil + } else { + return f() + } +} + +func (o *Config) onUpload(upd UploadInfo, obj ObjectInfo, e error) { + if o == nil { + return + } else if f := o.FuncCallOnUpload; f != nil { + f(upd, obj, e) + } +} + +func (o *Config) onAbort(obj ObjectInfo, e error) { + if o == nil { + return + } else if f := o.FuncCallOnAbort; f != nil { + f(obj, e) + } +} + +func (o *Config) onComplete(obj ObjectInfo, e error) { + if o == nil { + return + } else if f := o.FuncCallOnComplete; f != nil { + f(obj, e) + } +} + +func (o *Config) getPartSize() libsiz.Size { + if o == nil { + return PartSizeMinimal + } + + if o.PartSize < PartSizeMinimal { + o.PartSize = PartSizeMinimal + } else if o.PartSize > PartSizeMaximal { + o.PartSize = PartSizeMaximal + } + + return o.PartSize +} + +func (o *Config) getBufferSize() int { + if s := o.BufferSize; s < 512 { + return 512 + } else if s > 1024*1024 { + return 1024 * 1024 + } else { + return s + } +} + +func (o *Config) isCheckSum() bool { + return o.CheckSum +} + +func (o *Config) getUploadPartInput() *sdksss.UploadPartInput { + var chk sdktps.ChecksumAlgorithm + if o.CheckSum { + chk = sdktps.ChecksumAlgorithmSha256 + } + + return &sdksss.UploadPartInput{ + Bucket: o.ObjectS3Options.Bucket, + Key: o.ObjectS3Options.Key, + ChecksumAlgorithm: chk, + } +} + +func (o *Config) getPutObjectInput() *sdksss.PutObjectInput { + var chk sdktps.ChecksumAlgorithm + if o.CheckSum { + chk = sdktps.ChecksumAlgorithmSha256 + } + + return &sdksss.PutObjectInput{ + Bucket: o.ObjectS3Options.Bucket, + Key: o.ObjectS3Options.Key, + ACL: o.ObjectS3Options.ACL, + CacheControl: o.ObjectS3Options.CacheControl, + ChecksumAlgorithm: chk, + ContentDisposition: o.ObjectS3Options.ContentDisposition, + ContentEncoding: o.ObjectS3Options.ContentEncoding, + ContentLanguage: o.ObjectS3Options.ContentLanguage, + ContentType: o.ObjectS3Options.ContentType, + Expires: o.ObjectS3Options.Expires, + GrantFullControl: o.ObjectS3Options.GrantFullControl, + GrantRead: o.ObjectS3Options.GrantRead, + GrantReadACP: o.ObjectS3Options.GrantReadACP, + GrantWriteACP: o.ObjectS3Options.GrantWriteACP, + Metadata: o.ObjectS3Options.Metadata, + ObjectLockLegalHoldStatus: o.ObjectS3Options.ObjectLockLegalHoldStatus, + ObjectLockMode: o.ObjectS3Options.ObjectLockMode, + ObjectLockRetainUntilDate: o.ObjectS3Options.ObjectLockRetainUntilDate, + StorageClass: o.ObjectS3Options.StorageClass, + Tagging: o.ObjectS3Options.Tagging, + WebsiteRedirectLocation: o.ObjectS3Options.WebsiteRedirectLocation, + } +} + +func (o *Config) getListMultipartUploadsInput() *sdksss.ListMultipartUploadsInput { + return &sdksss.ListMultipartUploadsInput{ + Bucket: o.ObjectS3Options.Bucket, + MaxUploads: sdkaws.Int32(1000), + } +} + +func (o *Config) getCompleteMultipartUploadInput() *sdksss.CompleteMultipartUploadInput { + return &sdksss.CompleteMultipartUploadInput{ + Bucket: o.ObjectS3Options.Bucket, + Key: o.ObjectS3Options.Key, + MultipartUpload: &sdktps.CompletedMultipartUpload{ + Parts: make([]sdktps.CompletedPart, 0), + }, + } +} + +func (o *Config) getAbortMultipartUploadInput() *sdksss.AbortMultipartUploadInput { + return &sdksss.AbortMultipartUploadInput{ + Bucket: o.ObjectS3Options.Bucket, + Key: o.ObjectS3Options.Key, + } +} + +func (o *Config) getWorkingFile() (*os.File, error) { + inf, err := o.getWorkingPath() + if err != nil { + return nil, err + } else if inf.IsDir() { + return o.createTempWorkingFile(inf) + } else { + return os.Create(inf.Name()) + } +} + +func (o *Config) getWorkingPath() (os.FileInfo, error) { + if o.WorkingPath == "" { + o.WorkingPath = os.TempDir() + } + + var ( + err error + inf os.FileInfo + ) + + if inf, err = os.Stat(o.WorkingPath); err != nil && os.IsNotExist(err) { + if _, err = os.Stat(filepath.Dir(o.WorkingPath)); err != nil && os.IsNotExist(err) { + return nil, err + } else if err != nil { + return nil, err + } else if h, e := os.Create(o.WorkingPath); e != nil { + return nil, e + } else { + if e = h.Close(); e != nil { + return nil, e + } else if inf, err = os.Stat(o.WorkingPath); err != nil { + return nil, err + } + } + } else if err != nil { + return nil, err + } + + if inf == nil { + return nil, os.ErrInvalid + } else if !inf.IsDir() && !inf.Mode().IsRegular() { + return nil, os.ErrInvalid + } else if inf.IsDir() { + if h, e := os.CreateTemp(o.WorkingPath, "chk_*"); e != nil { + return nil, e + } else { + n := h.Name() + if e = h.Close(); e != nil { + return nil, e + } else if e = os.Remove(n); e != nil { + return nil, e + } + } + } + + return inf, nil +} + +func (o *Config) createTempWorkingFile(inf os.FileInfo) (*os.File, error) { + var pfx string + + if o.ObjectS3Options.Bucket != nil && len(*o.ObjectS3Options.Bucket) > 0 { + pfx = clearString(*o.ObjectS3Options.Bucket) + "_*" + } else if o.ObjectS3Options.Key != nil && len(*o.ObjectS3Options.Key) > 0 { + pfx = clearString(*o.ObjectS3Options.Key) + "_*" + } else { + pfx = "obj_*" + } + + return os.CreateTemp(inf.Name(), pfx) +} diff --git a/aws/pusher/errors.go b/aws/pusher/errors.go new file mode 100644 index 0000000..492fdf2 --- /dev/null +++ b/aws/pusher/errors.go @@ -0,0 +1,15 @@ +package pusher + +import "fmt" + +var ( + ErrInvalidInstance = fmt.Errorf("invalid instance") + ErrBytesWrote = fmt.Errorf("at least one byte is still in wrote") + ErrInvalidClient = fmt.Errorf("invalid aws S3 client") + ErrInvalidResponse = fmt.Errorf("invalid aws S3 response") + ErrInvalidUploadID = fmt.Errorf("invalid aws s3 MPU Upload ID") + ErrInvalidTMPFile = fmt.Errorf("invalid working or temporary file") + ErrEmptyContents = fmt.Errorf("no given contents") + ErrInvalidChecksum = fmt.Errorf("invalid checksum calculation for given data") + ErrWorkingPartFileExceedSize = fmt.Errorf("working or temporary file used exceed the aws S3 size limits") +) diff --git a/aws/pusher/file.go b/aws/pusher/file.go new file mode 100644 index 0000000..b77d901 --- /dev/null +++ b/aws/pusher/file.go @@ -0,0 +1,97 @@ +package pusher + +import ( + "io" + "os" + + sdksss "github.com/aws/aws-sdk-go-v2/service/s3" +) + +func (o *psh) getFile() (*os.File, error) { + if o == nil { + return nil, ErrInvalidInstance + } else if i := o.tmp.Load(); i != nil { + if v, k := i.(*os.File); !k { + return v, nil + } + } + + if f, e := o.cfg.getWorkingFile(); f == nil || e != nil { + return nil, e + } else { + o.tmp.Store(f) + return f, nil + } +} + +func (o *psh) fileReset() error { + if f, e := o.getFile(); e != nil { + return e + } else { + _, e = f.Seek(0, io.SeekStart) + o.tmp.Store(f) + return e + } +} + +func (o *psh) fileTruncate() error { + if f, e := o.getFile(); e != nil { + return e + } else { + o.prtSize.Store(0) + e = f.Truncate(0) + o.tmp.Store(f) + return e + } +} + +func (o *psh) fileClose() error { + if f, e := o.getFile(); e != nil { + return e + } else { + e = f.Close() + o.tmp.Store(f) + return e + } +} + +func (o *psh) fileRemove() error { + if f, e := o.getFile(); e != nil { + return e + } else { + n := f.Name() + e = o.fileClose() + + o.prtList.Store(make([]sdksss.UploadPartOutput, 0)) + o.prtSize.Store(0) + o.objSize.Store(0) + o.nbrPart.Store(0) + o.updInfo.Store(&sdksss.CompleteMultipartUploadOutput{}) + o.objInfo.Store(&sdksss.CreateMultipartUploadInput{}) + + _ = o.md5Reset() + _ = o.shaPartReset() + + if er := os.Remove(n); er != nil && e != nil { + return e + } else if er != nil { + return er + } + + return nil + } +} + +func (o *psh) fileWrite(p []byte) (n int, err error) { + if f, e := o.getFile(); e != nil { + return 0, e + } else { + n, err = f.Write(p) + + // updating size counters + o.prtSize.Add(int64(n)) + o.objSize.Add(int64(n)) + + return n, err + } +} diff --git a/aws/pusher/hash.go b/aws/pusher/hash.go new file mode 100644 index 0000000..08141c0 --- /dev/null +++ b/aws/pusher/hash.go @@ -0,0 +1,198 @@ +package pusher + +import ( + "crypto/md5" + "crypto/sha256" + "hash" +) + +func (o *psh) md5Get() (hash.Hash, error) { + if o == nil { + return nil, ErrInvalidInstance + } else if i := o.prtMD5.Load(); i != nil { + if v, k := i.(hash.Hash); k { + return v, nil + } + } + + return o.md5Init() +} + +func (o *psh) md5Init() (hash.Hash, error) { + if o == nil { + return nil, ErrInvalidInstance + } else { + return md5.New(), nil + } +} + +func (o *psh) md5Reset() error { + if o == nil { + return ErrInvalidInstance + } else if h, e := o.md5Init(); e != nil { + return e + } else if i := o.prtMD5.Swap(h); i == nil { + return nil + } else if v, k := i.(hash.Hash); !k { + return nil + } else { + v.Reset() + return nil + } +} + +func (o *psh) md5Write(p []byte) (n int, err error) { + var ( + e error + h hash.Hash + ) + + if o == nil { + return 0, ErrInvalidInstance + } else if h, e = o.md5Get(); e != nil { + return 0, e + } else { + n, e = h.Write(p) + o.prtMD5.Store(h) + return n, e + } +} + +func (o *psh) md5Checksum() ([]byte, error) { + if o == nil { + return make([]byte, 0), ErrInvalidInstance + } else if i := o.prtMD5.Load(); i == nil { + return make([]byte, 0), ErrInvalidInstance + } else if v, k := i.(hash.Hash); !k { + return make([]byte, 0), ErrInvalidInstance + } else { + return v.Sum(nil), nil + } +} + +func (o *psh) shaPartGet() (hash.Hash, error) { + if o == nil { + return nil, ErrInvalidInstance + } else if !o.cfg.isCheckSum() { + return nil, nil + } else if i := o.prtSha2.Load(); i != nil { + if v, k := i.(hash.Hash); k { + return v, nil + } + } + + return o.shaPartInit() +} + +func (o *psh) shaPartInit() (hash.Hash, error) { + if o == nil { + return nil, ErrInvalidInstance + } else { + return sha256.New(), nil + } +} + +func (o *psh) shaPartReset() error { + if o == nil { + return ErrInvalidInstance + } else if !o.cfg.isCheckSum() { + return nil + } else if h, e := o.shaPartInit(); e != nil { + return e + } else if i := o.prtSha2.Swap(h); i == nil { + return nil + } else if v, k := i.(hash.Hash); !k { + return nil + } else { + v.Reset() + return nil + } +} + +func (o *psh) shaPartWrite(p []byte) (n int, err error) { + var ( + e error + h hash.Hash + ) + + if o == nil { + return 0, ErrInvalidInstance + } else if !o.cfg.isCheckSum() { + return len(p), nil + } else if h, e = o.shaPartGet(); e != nil { + return 0, e + } else { + n, e = h.Write(p) + o.prtSha2.Store(h) + return n, e + } +} + +func (o *psh) shaPartChecksum() ([]byte, error) { + if o == nil { + return nil, ErrInvalidInstance + } else if !o.cfg.isCheckSum() { + return nil, nil + } else if i := o.prtSha2.Load(); i == nil { + return nil, ErrInvalidInstance + } else if v, k := i.(hash.Hash); !k { + return nil, ErrInvalidInstance + } else { + return v.Sum(nil), nil + } +} + +func (o *psh) shaObjGet() (hash.Hash, error) { + if o == nil { + return nil, ErrInvalidInstance + } else if !o.cfg.isCheckSum() { + return nil, nil + } else if i := o.objSha2.Load(); i != nil { + if v, k := i.(hash.Hash); k { + return v, nil + } + } + + return o.shaObjInit() +} + +func (o *psh) shaObjInit() (hash.Hash, error) { + if o == nil { + return nil, ErrInvalidInstance + } else { + return sha256.New(), nil + } +} + +func (o *psh) shaObjWrite(p []byte) (n int, err error) { + var ( + e error + h hash.Hash + ) + + if o == nil { + return 0, ErrInvalidInstance + } else if !o.cfg.isCheckSum() { + return len(p), nil + } else if h, e = o.shaObjGet(); e != nil { + return 0, e + } else { + n, e = h.Write(p) + o.objSha2.Store(h) + return n, e + } +} + +func (o *psh) shaObjChecksum() ([]byte, error) { + if o == nil { + return nil, ErrInvalidInstance + } else if !o.cfg.isCheckSum() { + return nil, nil + } else if i := o.objSha2.Load(); i == nil { + return nil, ErrInvalidInstance + } else if v, k := i.(hash.Hash); !k { + return nil, ErrInvalidInstance + } else { + return v.Sum(nil), nil + } +} diff --git a/aws/pusher/info.go b/aws/pusher/info.go new file mode 100644 index 0000000..55aab4c --- /dev/null +++ b/aws/pusher/info.go @@ -0,0 +1,129 @@ +package pusher + +import ( + sdksss "github.com/aws/aws-sdk-go-v2/service/s3" + libsiz "github.com/nabbar/golib/size" +) + +const ( + PartSizeMinimal = 5 * libsiz.SizeMega + PartSizeMaximal = 5 * libsiz.SizeGiga + MaxNumberPart int32 = 10000 + MaxObjectSize = 5 * libsiz.SizeTera +) + +func (o *psh) GetPartSize() libsiz.Size { + if o == nil { + return PartSizeMinimal + } else { + return o.cfg.getPartSize() + } +} + +func (o *psh) GetObjectSize() libsiz.Size { + if o == nil { + return 0 + } else { + return libsiz.SizeFromInt64(o.objSize.Load()) + } +} + +func (o *psh) GetObjectSizeLeft() libsiz.Size { + return MaxObjectSize - o.GetObjectSize() +} + +func (o *psh) GetObjectInfo() ObjectInfo { + if o == nil { + return ObjectInfo{} + } else if i := o.objInfo.Load(); i == nil { + return ObjectInfo{} + } else if v, k := i.(*sdksss.CreateMultipartUploadInput); !k { + return ObjectInfo{} + } else { + nfo := ObjectInfo{ + IsMPU: o.IsMPU(), + TotalSize: o.GetObjectSize(), + NumberPart: o.Counter(), + } + + if v.Bucket != nil && len(*v.Bucket) > 0 { + nfo.Bucket = *v.Bucket + } + + if v.Key != nil && len(*v.Key) > 0 { + nfo.Object = *v.Key + } + + return nfo + } +} + +func (o *psh) GetLastPartInfo() UploadInfo { + if o == nil { + return UploadInfo{} + } else if u, e := o.getUploadId(); e != nil { + return UploadInfo{} + } else if i := o.prtList.Load(); i == nil { + return UploadInfo{} + } else if v, k := i.([]sdksss.UploadPartOutput); !k { + return UploadInfo{} + } else if len(v) < 1 { + return UploadInfo{} + } else if p := v[len(v)-1]; p.ETag == nil || len(*p.ETag) < 1 { + return UploadInfo{} + } else { + return UploadInfo{ + IsMPU: o.IsMPU(), + PartNumber: o.nbrPart.Load(), + UploadID: u, + Etag: *p.ETag, + Checksum: *p.ChecksumSHA256, + } + } +} + +func (o *psh) IsMPU() bool { + if o == nil { + return false + } else if !o.IsStarted() { + return false + } else if i := o.prtList.Load(); i == nil { + return false + } else if v, k := i.([]UploadInfo); !k { + return false + } else { + return len(v) > 0 + } +} + +func (o *psh) IsStarted() bool { + if o == nil { + return false + } else if o.run.Load() { + return true + } else if i := o.updInfo.Load(); i == nil { + return false + } else if v, k := i.(*sdksss.CreateMultipartUploadOutput); !k { + return false + } else if v.UploadId == nil || len(*v.UploadId) < 1 { + return false + } else { + return true + } +} + +func (o *psh) Counter() int32 { + if o == nil { + return 0 + } else if i := o.prtList.Load(); i == nil { + return 0 + } else if v, k := i.([]UploadInfo); !k { + return 0 + } else { + return int32(len(v)) + } +} + +func (o *psh) CounterLeft() int32 { + return MaxNumberPart - o.Counter() +} diff --git a/aws/pusher/interface.go b/aws/pusher/interface.go new file mode 100644 index 0000000..8d1a1e1 --- /dev/null +++ b/aws/pusher/interface.go @@ -0,0 +1,156 @@ +/* + * MIT License + * + * Copyright (c) 2024 Nicolas JUHEL + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + */ + +package pusher + +import ( + "context" + "io" + "sync/atomic" + + sdksss "github.com/aws/aws-sdk-go-v2/service/s3" + libsiz "github.com/nabbar/golib/size" +) + +// ObjectInfo represents object information +type ObjectInfo struct { + Bucket string + Object string + IsMPU bool + TotalSize libsiz.Size + NumberPart int32 +} + +// UploadInfo represents last part upload information +type UploadInfo struct { + IsMPU bool + PartNumber int32 + UploadID string + Etag string + Checksum string +} + +// FuncClientS3 represents function to return an pure s3 client from aws s3 service sdk +type FuncClientS3 func() *sdksss.Client + +// FuncOnUpload represents function to call when a part is uploaded or a complete object is uploaded +type FuncOnUpload func(upd UploadInfo, obj ObjectInfo, e error) + +// FuncOnFinish represents function to call when upload is aborted or complete +type FuncOnFinish func(obj ObjectInfo, e error) + +// Pusher is an helper interface to upload an object directly or with multiparts in s3 +// this is a writer interface to be able to use it with io.Copy +// this interface allow to register callbacks and allow copy object from s3 in multiparts +type Pusher interface { + io.WriteCloser + io.ReaderFrom + + // Abort aborts the current multipart upload. This operation is irreversible. + // This is same as calling close. + // + // This function stops the current multipart upload and releases any resources associated with it. + // Return type: error + Abort() error + // Complete completes the current multipart upload. + // + // This function finalizes the multipart upload and returns any error that occurred during the process. + // This will concatenate all the parts into a single object and checking the checksum if enabled. + // Return type: error + Complete() error + + CopyFromS3(bucket, object, versionId string) error + + // GetPartSize returns the size of each part for a multipart upload or the maximum size for a standard put object. + // + // Return type: libsiz.Size + GetPartSize() libsiz.Size + // GetObjectInfo returns information about the object being uploaded. + // + // This method provides details about the object, including its bucket, key, and total size. + // Return type: ObjectInfo + GetObjectInfo() ObjectInfo + // GetLastPartInfo returns the last part upload information. + // + // Return type: UploadInfo + GetLastPartInfo() UploadInfo + // GetObjectSize returns the total size of the object currently uploaded. + // + // Return type: libsiz.Size + GetObjectSize() libsiz.Size + // GetObjectSizeLeft returns the remaining size of the object to be uploaded. + // + // This method provides the size left to upload for the current object. + // Return type: libsiz.Size + GetObjectSizeLeft() libsiz.Size + + // IsMPU indicates whether the current upload is a multipart upload. + // + // This method checks the type of the upload and returns true if it's a multipart upload, false otherwise. + // Return type: bool + IsMPU() bool + // IsStarted returns a boolean indicating whether the creation to init a mpu or a put object has been done. + // + // Return type: bool + IsStarted() bool + // Counter returns the current number of parts uploaded. + // + // Return type: int32 + Counter() int32 + // CounterLeft returns the maximum number of parts left available to be uploaded. + // + // Return type: int32 + CounterLeft() int32 +} + +func New(ctx context.Context, cfg *Config) (Pusher, error) { + if cfg == nil { + return nil, ErrInvalidInstance + } + + p := &psh{ + ctx: ctx, + run: new(atomic.Bool), + tmp: new(atomic.Value), + cfg: cfg, + prtSha2: new(atomic.Value), + prtMD5: new(atomic.Value), + objSha2: new(atomic.Value), + objInfo: new(atomic.Value), + updInfo: new(atomic.Value), + nbrPart: new(atomic.Int32), + prtList: new(atomic.Value), + prtSize: new(atomic.Int64), + objSize: new(atomic.Int64), + } + + if i, e := cfg.getWorkingFile(); e != nil { + return nil, e + } else { + p.tmp.Store(i) + } + + return p, nil +} diff --git a/aws/pusher/iowriter.go b/aws/pusher/iowriter.go new file mode 100644 index 0000000..1a11b6d --- /dev/null +++ b/aws/pusher/iowriter.go @@ -0,0 +1,97 @@ +package pusher + +import ( + "errors" + "io" + + libsiz "github.com/nabbar/golib/size" +) + +func (o *psh) Write(p []byte) (n int, err error) { + // checking incoming buffer is not empty + if len(p) < 1 { + return 0, nil + } + + // writing into working part file + if n, err = o.fileWrite(p); err != nil { + return 0, err + } + + // calculating md5 of part content (mandatory to upload part) + if i, e := o.md5Write(p); e != nil { + return 0, e + } else if i != n { + return n, io.ErrShortWrite + } + + // calculating sha256 of full object (optional to upload part) + if i, e := o.shaObjWrite(p); e != nil { + return 0, e + } else if i != n { + return n, io.ErrShortWrite + } + + // calculating sha256 of part content (optional to upload part) + if i, e := o.shaPartWrite(p); e != nil { + return 0, e + } else if i != n { + return n, io.ErrShortWrite + } + + // checking if needed to push part + if o.GetPartSize()-libsiz.SizeFromInt64(o.prtSize.Load()) < 1 { + // pushing part + e := o.pushObject() + + if e != nil { + return n, e + } + } + + // returning number of bytes written + return n, nil +} + +func (o *psh) Close() error { + // closing uploaded part and aborting object + return o.Abort() +} + +func (o *psh) ReadFrom(r io.Reader) (n int64, err error) { + var ( + i int // number of bytes read + j int // number of bytes written + er error // read error + ew error // write error + p = make([]byte, o.cfg.getBufferSize()) // buffer + ) + + for { + // checking context + if o.ctx.Err() != nil { + return n, o.ctx.Err() + } + + // reading from reader + i, er = r.Read(p) + + // writing to object if possible + if i > 0 { + j, ew = o.Write(p[:i]) + n += int64(j) + } + + // clearing buffer + clear(p) + + // checking errors + if er != nil && !errors.Is(er, io.EOF) { + return n, er + } else if ew != nil { + return n, ew + } else if er != nil { + return n, nil + } + } +} diff --git a/aws/pusher/model.go b/aws/pusher/model.go new file mode 100644 index 0000000..07aa290 --- /dev/null +++ b/aws/pusher/model.go @@ -0,0 +1,69 @@ +package pusher + +import ( + "context" + "sync/atomic" + + sdksss "github.com/aws/aws-sdk-go-v2/service/s3" +) + +type psh struct { + ctx context.Context + run *atomic.Bool // is running / starting + end *atomic.Bool // is closed + tmp *atomic.Value // working *os.File + cfg *Config // configuration + + prtSha2 *atomic.Value // part sha256 + prtMD5 *atomic.Value // part md5 + objSha2 *atomic.Value // object sha256 + + objInfo *atomic.Value // sdksss.CreateMultipartUploadInput + updInfo *atomic.Value // sdksss.CreateMultipartUploadOutput + nbrPart *atomic.Int32 // number of part + prtList *atomic.Value // []sdksss.UploadPartOutput - part list + + prtSize *atomic.Int64 // current part size + objSize *atomic.Int64 // object size +} + +func (o *psh) CopyFromS3(bucket, object, versionId string) error { + //TODO implement me + panic("implement me") +} + +func (o *psh) Abort() error { + var err error + + if o == nil { + return ErrInvalidInstance + } else if i := o.updInfo.Load(); i == nil { + return nil + } else if v, k := i.(*sdksss.CreateMultipartUploadOutput); !k { + return nil + } else if v.UploadId == nil || len(*v.UploadId) < 1 { + return nil + } else if !o.IsMPU() { + return nil + } + + defer o.cfg.onAbort(o.GetObjectInfo(), err) + + err = o.abortUpload() + return err +} + +func (o *psh) Complete() error { + var err error + + if o == nil { + return ErrInvalidInstance + } else if !o.IsMPU() { + return o.pushObject() + } + + defer o.cfg.onComplete(o.GetObjectInfo(), err) + + err = o.completeUpload() + return err +} diff --git a/aws/pusher/register.go b/aws/pusher/register.go new file mode 100644 index 0000000..ed7042a --- /dev/null +++ b/aws/pusher/register.go @@ -0,0 +1 @@ +package pusher diff --git a/aws/pusher/s3.go b/aws/pusher/s3.go new file mode 100644 index 0000000..a42237f --- /dev/null +++ b/aws/pusher/s3.go @@ -0,0 +1,320 @@ +package pusher + +import ( + "encoding/base64" + "time" + + sdkaws "github.com/aws/aws-sdk-go-v2/aws" + sdksss "github.com/aws/aws-sdk-go-v2/service/s3" + sdktps "github.com/aws/aws-sdk-go-v2/service/s3/types" +) + +func (o *psh) getUploadId() (string, error) { + if o == nil { + return "", ErrInvalidInstance + } else if i := o.updInfo.Load(); i == nil { + return o.createMPU() + } else if v, k := i.(*sdksss.CreateMultipartUploadOutput); !k { + return "", ErrInvalidInstance + } else if v.UploadId == nil { + return "", ErrInvalidInstance + } else { + return *(v.UploadId), nil + } +} + +func (o *psh) createMPU() (string, error) { + if o == nil { + return "", ErrInvalidInstance + } else if i := o.objInfo.Load(); i == nil { + return "", ErrInvalidInstance + } else if v, k := i.(*sdksss.CreateMultipartUploadInput); !k { + return "", ErrInvalidInstance + } else { + var tmp = *v + + if c := o.cfg.getClientS3(); c == nil { + return "", ErrInvalidClient + } else if out, err := c.CreateMultipartUpload(o.ctx, &tmp); err != nil { + return "", err + } else if out == nil { + return "", ErrInvalidResponse + } else if out.UploadId == nil || len(*out.UploadId) < 1 { + return "", ErrInvalidResponse + } else { + o.updInfo.Store(out) + return *(out.UploadId), nil + } + } +} + +func (o *psh) getUploadPartInput() (*sdksss.UploadPartInput, error) { + var in *sdksss.UploadPartInput + + if o == nil { + return nil, ErrInvalidInstance + } else { + in = o.cfg.getUploadPartInput() + } + + if i, e := o.getUploadId(); e != nil { + return nil, e + } else if len(i) < 1 { + return nil, ErrInvalidUploadID + } else { + in.UploadId = sdkaws.String(i) + } + + if i, e := o.getFile(); e != nil { + return nil, e + } else { + in.Body = i + } + + if s := o.prtSize.Load(); s > 0 { + in.ContentLength = sdkaws.Int64(s) + } else { + return nil, ErrEmptyContents + } + + if chk, err := o.md5Checksum(); err != nil { + return nil, err + } else if len(chk) < 1 { + return nil, ErrInvalidChecksum + } else { + in.ContentMD5 = sdkaws.String(base64.StdEncoding.EncodeToString(chk)) + } + + if chk, err := o.shaPartChecksum(); err == nil && len(chk) > 0 { + in.ChecksumAlgorithm = sdktps.ChecksumAlgorithmSha256 + in.ChecksumSHA256 = sdkaws.String(base64.StdEncoding.EncodeToString(chk)) + } + + o.nbrPart.Add(1) + in.PartNumber = sdkaws.Int32(o.nbrPart.Load()) + + return in, nil +} + +func (o *psh) getPutObjectInput() (*sdksss.PutObjectInput, error) { + var in *sdksss.PutObjectInput + + if o == nil { + return nil, ErrInvalidInstance + } + + in = o.cfg.getPutObjectInput() + + if f, e := o.getFile(); e != nil { + return nil, e + } else { + in.Body = f + } + + if s := o.prtSize.Load(); s > 0 { + in.ContentLength = sdkaws.Int64(s) + } else { + return nil, ErrEmptyContents + } + + if chk, err := o.md5Checksum(); err != nil { + return nil, err + } else if len(chk) < 1 { + return nil, ErrInvalidChecksum + } else { + in.ContentMD5 = sdkaws.String(base64.StdEncoding.EncodeToString(chk)) + } + + if chk, err := o.shaObjChecksum(); err == nil && len(chk) > 0 { + in.ChecksumAlgorithm = sdktps.ChecksumAlgorithmSha256 + in.ChecksumSHA256 = sdkaws.String(base64.StdEncoding.EncodeToString(chk)) + } + + return in, nil +} + +func (o *psh) pushObject() error { + var ( + err error + ret bool + fct = o.pushMPUObject + ) + + if o == nil { + return ErrInvalidInstance + } else if !o.IsMPU() { + fct = o.pushSingleObject + } + + defer o.cfg.onUpload(o.GetLastPartInfo(), o.GetObjectInfo(), err) + + for i := 0; i < 10; i++ { + if err, ret = fct(); err == nil { + return nil + } else if ret { + return err + } + + time.Sleep(10 * time.Second) + } + + return err +} + +func (o *psh) pushMPUObject() (error, bool) { + if o == nil { + return ErrInvalidInstance, false + } + + if in, e := o.getUploadPartInput(); e != nil { + o.nbrPart.Add(-1) + return e, false + } else if c := o.cfg.getClientS3(); c == nil { + o.nbrPart.Add(-1) + return ErrInvalidClient, false + } else if out, err := c.UploadPart(o.ctx, in); err != nil { + o.nbrPart.Add(-1) + return err, false + } else if out == nil || out.ETag == nil || len(*out.ETag) < 1 { + return ErrInvalidResponse, false + } else if i := o.prtList.Load(); i == nil { + o.prtList.Store(append(make([]sdksss.UploadPartOutput, 0), *out)) + } else if v, k := i.([]sdksss.UploadPartOutput); !k { + o.prtList.Store(append(make([]sdksss.UploadPartOutput, 0), *out)) + } else { + o.prtList.Store(append(v, *out)) + } + + o.run.Store(true) + + if e := o.fileTruncate(); e != nil { + return e, true + } else if e = o.md5Reset(); e != nil { + return e, true + } else { + return o.shaPartReset(), true + } +} + +func (o *psh) pushSingleObject() (error, bool) { + if o == nil { + return ErrInvalidInstance, false + } + + if o.IsMPU() { + return ErrInvalidInstance, false + } + + if in, e := o.getPutObjectInput(); e != nil { + return e, false + } else if c := o.cfg.getClientS3(); c == nil { + return ErrInvalidClient, false + } else if _, err := c.PutObject(o.ctx, in); err != nil { + return err, false + } else { + o.run.Store(true) + } + + if err := o.fileRemove(); err != nil { + return err, true + } else if err = o.md5Reset(); err != nil { + return err, true + } else { + return o.shaPartReset(), true + } +} + +func (o *psh) getUploadInfo(uploadId string) (*sdktps.MultipartUpload, error) { + var ( + c *sdksss.Client + in *sdksss.ListMultipartUploadsInput + ) + + if o == nil { + return nil, ErrInvalidInstance + } else if c = o.cfg.getClientS3(); c == nil { + return nil, ErrInvalidClient + } else { + in = o.cfg.getListMultipartUploadsInput() + } + + if l, e := c.ListMultipartUploads(o.ctx, in); e != nil { + return nil, e + } else if l == nil || len(l.Uploads) < 1 { + return nil, ErrInvalidUploadID + } else { + for _, v := range l.Uploads { + if *v.UploadId == uploadId { + return &v, nil + } + } + } + + return nil, ErrInvalidUploadID +} + +func (o *psh) abortUpload() error { + var in *sdksss.AbortMultipartUploadInput + + if o == nil { + return ErrInvalidInstance + } else { + in = o.cfg.getAbortMultipartUploadInput() + } + + if u, e := o.getUploadId(); e != nil { + return e + } else { + in.UploadId = sdkaws.String(u) + } + + if c := o.cfg.getClientS3(); c == nil { + return ErrInvalidClient + } else if out, err := c.AbortMultipartUpload(o.ctx, in); err != nil { + return err + } else if out == nil { + return ErrInvalidResponse + } else { + return nil + } +} + +func (o *psh) completeUpload() error { + var in *sdksss.CompleteMultipartUploadInput + + if o == nil { + return ErrInvalidInstance + } else { + in = o.cfg.getCompleteMultipartUploadInput() + } + + if u, e := o.getUploadId(); e != nil { + return e + } else { + in.UploadId = sdkaws.String(u) + } + + if chk, err := o.shaObjChecksum(); err == nil && len(chk) > 0 { + in.ChecksumSHA256 = sdkaws.String(base64.StdEncoding.EncodeToString(chk)) + } + + if a := o.prtList.Load(); a == nil { + return ErrInvalidInstance + } else if l, k := a.([]sdktps.CompletedPart); !k { + return ErrInvalidInstance + } else if len(l) < 1 { + return ErrInvalidInstance + } else { + in.MultipartUpload.Parts = l + } + + if c := o.cfg.getClientS3(); c == nil { + return ErrInvalidClient + } else if out, err := c.CompleteMultipartUpload(o.ctx, in); err != nil { + return err + } else if out == nil { + return ErrInvalidResponse + } else { + return nil + } +}