Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

command: add pipe command (#182) #587

Merged
merged 39 commits into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
6e633e2
command: add pipe command (#182)
ahmethakanbesel Jul 6, 2023
8bf1662
refactor: remove unused parameter
ahmethakanbesel Jul 6, 2023
f127996
test: update expected content type for Windows
ahmethakanbesel Jul 6, 2023
84789d5
style: reformat the code
ahmethakanbesel Jul 6, 2023
3c5c712
command/pipe: don't print source url
ahmethakanbesel Jul 7, 2023
19d2675
command/pipe: refactor stdin abstraction
ahmethakanbesel Jul 7, 2023
3b6d0a8
command/pipe: remove unnecessary glacier flags
ahmethakanbesel Jul 7, 2023
1ae3085
Merge branch 'master' into stdin-pipe
ahmethakanbesel Jul 7, 2023
3d174e7
style: reformat the code
ahmethakanbesel Jul 7, 2023
deaf12d
refactor: update comment
ahmethakanbesel Jul 11, 2023
b3cd89c
Merge branch 'master' into stdin-pipe
ahmethakanbesel Jul 11, 2023
3b0625a
Merge branch 'master' into stdin-pipe
ahmethakanbesel Jul 21, 2023
12b54c8
command/pipe: rename a function
ahmethakanbesel Jul 21, 2023
7a4ef26
command/pipe: rename a function
ahmethakanbesel Jul 21, 2023
dd63775
changelog: add `pipe` entry
ahmethakanbesel Jul 24, 2023
1de901f
Merge branch 'master' into stdin-pipe
ahmethakanbesel Jul 24, 2023
ee865d4
changelog: fix typo
ahmethakanbesel Jul 24, 2023
7cac524
Merge branch 'master' into stdin-pipe
ahmethakanbesel Jul 24, 2023
a4ffc79
Merge branch 'master' into stdin-pipe
igungor Jul 24, 2023
c563a04
command/app: replace pipe command
ahmethakanbesel Jul 25, 2023
abc4c2e
command/pipe: update the example
ahmethakanbesel Jul 25, 2023
2f448b2
command/pipe: add content-disposition flag
ahmethakanbesel Jul 25, 2023
35645c4
command/pipe: reformat the code
ahmethakanbesel Jul 25, 2023
b75cde6
command/pipe: simplify the implementation
ahmethakanbesel Jul 25, 2023
d436d84
command/pipe: remove stickyerr
ahmethakanbesel Jul 25, 2023
42c47ca
command/pipe: update dst error text
ahmethakanbesel Jul 25, 2023
c869dd1
command/pipe: update the tests
ahmethakanbesel Jul 25, 2023
13ec7d0
e2e/pipe: add a test case for `--content-dispositon`
ahmethakanbesel Jul 25, 2023
3dfcb35
Merge branch 'master' into stdin-pipe
ahmethakanbesel Jul 25, 2023
a54340f
e2e/pipe: remove an empty line
ahmethakanbesel Jul 27, 2023
a572980
command/pipe: refactor
ahmethakanbesel Jul 27, 2023
0140a9e
command/pipe: reformat
ahmethakanbesel Jul 27, 2023
bb450a1
command/pipe: remove `no-such-upload-retry-count` flag
ahmethakanbesel Jul 27, 2023
7631f0e
e2e/pipe: remove a test case
ahmethakanbesel Jul 27, 2023
ec798f7
Merge branch 'master' into stdin-pipe
ahmethakanbesel Jul 27, 2023
89dbf87
chore: update go.sum
ahmethakanbesel Jul 27, 2023
e128849
commmand/pipe: rename a variable
ahmethakanbesel Jul 27, 2023
80ef892
command/pipe: update usage text
ahmethakanbesel Jul 27, 2023
8f4967d
command/pipe: update the example
ahmethakanbesel Jul 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#### Features
- Added `--content-disposition` flag to `cp` command. ([#569](https://github.com/peak/s5cmd/issues/569))
- Added `--show-fullpath` flag to `ls`. ([#596](https://github.com/peak/s5cmd/issues/596))
- Added `pipe` command. ([#182](https://github.com/peak/s5cmd/issues/182))

#### Improvements
- Implemented concurrent multipart download support for `cat`. ([#245](https://github.com/peak/s5cmd/issues/245))
Expand Down
1 change: 1 addition & 0 deletions command/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ func Commands() []*cli.Command {
NewSelectCommand(),
NewSizeCommand(),
NewCatCommand(),
NewPipeCommand(),
NewRunCommand(),
NewSyncCommand(),
NewVersionCommand(),
Expand Down
4 changes: 3 additions & 1 deletion command/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import (
func printDebug(op string, err error, urls ...*url.URL) {
command := op
for _, url := range urls {
command += fmt.Sprintf(" %s", url)
if url != nil {
command += fmt.Sprintf(" %s", url)
}
}

msg := log.DebugMessage{
Expand Down
322 changes: 322 additions & 0 deletions command/pipe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,322 @@
package command

import (
"context"
"fmt"
"mime"
"os"
"path/filepath"

"github.com/urfave/cli/v2"

errorpkg "github.com/peak/s5cmd/v2/error"
"github.com/peak/s5cmd/v2/log"
"github.com/peak/s5cmd/v2/log/stat"
"github.com/peak/s5cmd/v2/storage"
"github.com/peak/s5cmd/v2/storage/url"
)

var pipeHelpTemplate = `Name:
{{.HelpName}} - {{.Usage}}

Usage:
{{.HelpName}} [options] destination

Options:
{{range .VisibleFlags}}{{.}}
{{end}}
Examples:
01. Stream stdin to an object
> echo "content" | gzip | s5cmd {{.HelpName}} s3://bucket/prefix/object.gz
`

func NewPipeCommandFlags() []cli.Flag {
pipeFlags := []cli.Flag{
&cli.StringFlag{
Name: "storage-class",
Usage: "set storage class for target ('STANDARD','REDUCED_REDUNDANCY','GLACIER','STANDARD_IA','ONEZONE_IA','INTELLIGENT_TIERING','DEEP_ARCHIVE')",
},
&cli.IntFlag{
Name: "concurrency",
Aliases: []string{"c"},
Value: defaultCopyConcurrency,
Usage: "number of concurrent parts transferred between host and remote server",
},
&cli.IntFlag{
Name: "part-size",
Aliases: []string{"p"},
Value: defaultPartSize,
Usage: "size of each part transferred between host and remote server, in MiB",
},
&cli.StringFlag{
Name: "sse",
Usage: "perform server side encryption of the data at its destination, e.g. aws:kms",
},
&cli.StringFlag{
Name: "sse-kms-key-id",
Usage: "customer master key (CMK) id for SSE-KMS encryption; leave it out if server-side generated key is desired",
},
&cli.StringFlag{
Name: "acl",
Usage: "set acl for target: defines granted accesses and their types on different accounts/groups, e.g. pipe --acl 'public-read'",
},
&cli.StringFlag{
Name: "cache-control",
Usage: "set cache control for target: defines cache control header for object, e.g. pipe --cache-control 'public, max-age=345600'",
},
&cli.StringFlag{
Name: "expires",
Usage: "set expires for target (uses RFC3339 format): defines expires header for object, e.g. pipe --expires '2024-10-01T20:30:00Z'",
},
&cli.BoolFlag{
Name: "raw",
Usage: "disable the wildcard operations, useful with filenames that contains glob characters",
},
&cli.StringFlag{
Name: "content-type",
Usage: "set content type for target: defines content type header for object, e.g. --content-type text/plain",
},
&cli.StringFlag{
igungor marked this conversation as resolved.
Show resolved Hide resolved
Name: "content-encoding",
Usage: "set content encoding for target: defines content encoding header for object, e.g. --content-encoding gzip",
},
&cli.StringFlag{
Name: "content-disposition",
Usage: "set content disposition for target: defines content disposition header for object, e.g. --content-disposition 'attachment; filename=\"filename.jpg\"'",
},
&cli.BoolFlag{
Name: "no-clobber",
Aliases: []string{"n"},
Usage: "do not overwrite destination if already exists",
},
}
return pipeFlags
}

func NewPipeCommand() *cli.Command {
cmd := &cli.Command{
Name: "pipe",
HelpName: "pipe",
Usage: "stream to remote from stdin",
Flags: NewPipeCommandFlags(),
CustomHelpTemplate: pipeHelpTemplate,
Before: func(c *cli.Context) error {
err := validatePipeCommand(c)
if err != nil {
printError(commandFromContext(c), c.Command.Name, err)
}
return err
},
Action: func(c *cli.Context) (err error) {
defer stat.Collect(c.Command.FullName(), &err)()

pipe, err := NewPipe(c, false)
if err != nil {
return err
}
return pipe.Run(c.Context)
},
}

cmd.BashComplete = getBashCompleteFn(cmd, false, false)
return cmd
}

// Pipe holds pipe operation flags and states.
type Pipe struct {
dst *url.URL
op string
fullCommand string

deleteSource bool

// flags
noClobber bool
storageClass storage.StorageClass
encryptionMethod string
encryptionKeyID string
acl string
cacheControl string
expires string
contentType string
contentEncoding string
contentDisposition string

// s3 options
concurrency int
partSize int64
storageOpts storage.Options
}

// NewPipe creates Pipe from cli.Context.
func NewPipe(c *cli.Context, deleteSource bool) (*Pipe, error) {
fullCommand := commandFromContext(c)

dst, err := url.New(c.Args().Get(0), url.WithRaw(c.Bool("raw")))
if err != nil {
printError(fullCommand, c.Command.Name, err)
return nil, err
}

return &Pipe{
dst: dst,
op: c.Command.Name,
fullCommand: fullCommand,
deleteSource: deleteSource,
// flags
noClobber: c.Bool("no-clobber"),
storageClass: storage.StorageClass(c.String("storage-class")),
concurrency: c.Int("concurrency"),
partSize: c.Int64("part-size") * megabytes,
encryptionMethod: c.String("sse"),
encryptionKeyID: c.String("sse-kms-key-id"),
acl: c.String("acl"),
cacheControl: c.String("cache-control"),
expires: c.String("expires"),
contentType: c.String("content-type"),
contentEncoding: c.String("content-encoding"),
contentDisposition: c.String("content-disposition"),

// s3 options
storageOpts: NewStorageOpts(c),
}, nil
}

// Run starts copying stdin output to destination.
func (c Pipe) Run(ctx context.Context) error {
if c.dst.IsBucket() || c.dst.IsPrefix() {
return fmt.Errorf("target %q must be an object", c.dst)
}
igungor marked this conversation as resolved.
Show resolved Hide resolved

err := c.shouldOverride(ctx, c.dst)
if err != nil {
if errorpkg.IsWarning(err) {
printDebug(c.op, err, nil, c.dst)
return nil
}
return err
}
igungor marked this conversation as resolved.
Show resolved Hide resolved

client, err := storage.NewRemoteClient(ctx, c.dst, c.storageOpts)
if err != nil {
return err
}
igungor marked this conversation as resolved.
Show resolved Hide resolved

metadata := storage.NewMetadata().
SetStorageClass(string(c.storageClass)).
SetSSE(c.encryptionMethod).
SetSSEKeyID(c.encryptionKeyID).
SetACL(c.acl).
SetCacheControl(c.cacheControl).
SetExpires(c.expires)

if c.contentType != "" {
metadata.SetContentType(c.contentType)
} else {
metadata.SetContentType(guessContentTypeByExtension(c.dst))
}
igungor marked this conversation as resolved.
Show resolved Hide resolved

if c.contentEncoding != "" {
metadata.SetContentEncoding(c.contentEncoding)
}

if c.contentDisposition != "" {
igungor marked this conversation as resolved.
Show resolved Hide resolved
metadata.SetContentDisposition(c.contentDisposition)
}

igungor marked this conversation as resolved.
Show resolved Hide resolved
err = client.Put(ctx, &stdin{file: os.Stdin}, c.dst, metadata, c.concurrency, c.partSize)
if err != nil {
return err
}

msg := log.InfoMessage{
igungor marked this conversation as resolved.
Show resolved Hide resolved
Operation: c.op,
Source: nil,
Destination: c.dst,
Object: &storage.Object{
StorageClass: c.storageClass,
},
}
log.Info(msg)
igungor marked this conversation as resolved.
Show resolved Hide resolved

return nil
}

// shouldOverride function checks if the destination should be overridden if
// the destination object and given pipe flags conform to the
// override criteria.
func (c Pipe) shouldOverride(ctx context.Context, dsturl *url.URL) error {
// if not asked to override, ignore.
if !c.noClobber {
return nil
}

client, err := storage.NewClient(ctx, dsturl, c.storageOpts)
if err != nil {
return err
}

obj, err := getObject(ctx, dsturl, client)
if err != nil {
return err
}

// if destination not exists, no conditions apply.
if obj == nil {
return nil
}

if c.noClobber {
return errorpkg.ErrObjectExists
}

return nil
}

func validatePipeCommand(c *cli.Context) error {
if c.Args().Len() != 1 {
return fmt.Errorf("expected destination argument")
}

dst := c.Args().Get(0)

dsturl, err := url.New(dst, url.WithRaw(c.Bool("raw")))
if err != nil {
return err
}

igungor marked this conversation as resolved.
Show resolved Hide resolved
if !dsturl.IsRemote() {
return fmt.Errorf("destination must be a bucket")
}

if dsturl.IsBucket() || dsturl.IsPrefix() {
return fmt.Errorf("target %q must be an object", dsturl)
}

// wildcard destination can not be used with pipe
if dsturl.IsWildcard() {
return fmt.Errorf("target %q can not contain glob characters", dst)
}

return nil
}

func guessContentTypeByExtension(dsturl *url.URL) string {
contentType := mime.TypeByExtension(filepath.Ext(dsturl.Absolute()))
if contentType == "" {
return "application/octet-stream"
}
return contentType
}

// stdin is an io.Reader adapter for os.File, enabling it to function solely as
// an io.Reader. The AWS SDK, which accepts an io.Reader for multipart uploads,
// will attempt to use io.Seek if the reader supports it. However, os.Stdin is
// a specific type of file that can not seekable.
type stdin struct {
file *os.File
}

func (s *stdin) Read(p []byte) (n int, err error) {
return s.file.Read(p)
}
Loading