Skip to content

Commit

Permalink
feat: #63 support downloading files from s3 storage
Browse files Browse the repository at this point in the history
  • Loading branch information
bohdan-shulha committed Oct 14, 2024
1 parent ecb13d9 commit a0cd1e3
Show file tree
Hide file tree
Showing 10 changed files with 215 additions and 125 deletions.
2 changes: 2 additions & 0 deletions internal/app/ptah-agent/parse_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ func parseTask(taskType int, payload string) (interface{}, error) {
return unmarshalTask(payload, &ptahClient.UpdateDirdReq{})
case 21:
return unmarshalTask(payload, &ptahClient.LaunchServiceReq{})
case 22:
return unmarshalTask(payload, &ptahClient.S3DownloadReq{})
default:
return nil, fmt.Errorf("parse task: unknown task type %d", taskType)
}
Expand Down
65 changes: 47 additions & 18 deletions internal/app/ptah-agent/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ import (
t "github.com/ptah-sh/ptah-agent/internal/pkg/ptah-client"
)

//go:embed s3.sh
var s3Script string
//go:embed s3_download.sh
var s3DownloadScript string

//go:embed s3_upload.sh
var s3UploadScript string

func (e *taskExecutor) createS3Storage(ctx context.Context, req *t.CreateS3StorageReq) (*t.CreateS3StorageRes, error) {
var res t.CreateS3StorageRes
Expand Down Expand Up @@ -51,7 +54,11 @@ func (e *taskExecutor) createS3Storage(ctx context.Context, req *t.CreateS3Stora
}

func (e *taskExecutor) checkS3Storage(ctx context.Context, req *t.CheckS3StorageReq) (*t.CheckS3StorageRes, error) {
_, err := e.uploadS3FileWithHelper(ctx, []mount.Mount{}, t.ArchiveSpec{}, false, req.S3StorageConfigName, "/tmp/check-access.txt", ".check-access")
envVars := []string{
fmt.Sprintf("ARCHIVE_FORMAT=%s", "tar.gz"),
}

_, err := e.uploadS3FileWithHelper(ctx, []mount.Mount{}, envVars, req.S3StorageConfigName, "/tmp/check-access.txt", ".check-access.tar.gz")
if err != nil {
return nil, err
}
Expand All @@ -60,7 +67,11 @@ func (e *taskExecutor) checkS3Storage(ctx context.Context, req *t.CheckS3Storage
}

func (e *taskExecutor) s3upload(ctx context.Context, req *t.S3UploadReq) (*t.S3UploadRes, error) {
output, err := e.uploadS3FileWithHelper(ctx, []mount.Mount{req.VolumeSpec}, req.Archive, req.RemoveSrcFile, req.S3StorageConfigName, req.SrcFilePath, req.DestFilePath)
envVars := []string{
fmt.Sprintf("ARCHIVE_FORMAT=%s", req.Archive.Format),
}

output, err := e.uploadS3FileWithHelper(ctx, []mount.Mount{req.VolumeSpec}, envVars, req.S3StorageConfigName, req.SrcFilePath, req.DestFilePath)
if err != nil {
return nil, err
}
Expand All @@ -70,7 +81,24 @@ func (e *taskExecutor) s3upload(ctx context.Context, req *t.S3UploadReq) (*t.S3U
}, nil
}

func (e *taskExecutor) uploadS3FileWithHelper(ctx context.Context, mounts []mount.Mount, archiveSpec t.ArchiveSpec, removeSrcFile bool, s3StorageConfigName, srcFilePath, destFilePath string) ([]string, error) {
func (e *taskExecutor) s3download(ctx context.Context, req *t.S3DownloadReq) (*t.S3DownloadRes, error) {
envVars := []string{}

logs, err := e.runS3Cmd(ctx, []mount.Mount{req.VolumeSpec}, envVars, s3DownloadScript, req.S3StorageConfigName, req.SrcFilePath, req.DestFilePath)
if err != nil {
return nil, fmt.Errorf("download from s3: %w", err)
}

return &t.S3DownloadRes{
Output: logs,
}, nil
}

func (e *taskExecutor) uploadS3FileWithHelper(ctx context.Context, mounts []mount.Mount, envVars []string, s3StorageConfigName, srcFilePath, destFilePath string) ([]string, error) {
return e.runS3Cmd(ctx, mounts, envVars, s3UploadScript, s3StorageConfigName, srcFilePath, destFilePath)
}

func (e *taskExecutor) runS3Cmd(ctx context.Context, mounts []mount.Mount, envVars []string, s3Script, s3StorageConfigName, srcFilePath, destFilePath string) ([]string, error) {
credentialsConfig, err := e.getConfigByName(ctx, s3StorageConfigName)
if err != nil {
return nil, fmt.Errorf("check s3 storage: get config: %w", err)
Expand Down Expand Up @@ -100,6 +128,19 @@ func (e *taskExecutor) uploadS3FileWithHelper(ctx context.Context, mounts []moun
return nil, fmt.Errorf("check s3 storage: read image pull: %w", err)
}

vars := []string{
fmt.Sprintf("S3_ACCESS_KEY=%s", s3StorageSpec.AccessKey),
fmt.Sprintf("S3_SECRET_KEY=%s", s3StorageSpec.SecretKey),
fmt.Sprintf("S3_ENDPOINT=%s", s3StorageSpec.Endpoint),
fmt.Sprintf("S3_REGION=%s", s3StorageSpec.Region),
fmt.Sprintf("S3_BUCKET=%s", s3StorageSpec.Bucket),
fmt.Sprintf("PATH_PREFIX=%s", strings.Trim(s3StorageSpec.PathPrefix, "/")),
fmt.Sprintf("SRC_FILE_PATH=%s", strings.TrimPrefix(srcFilePath, "/")),
fmt.Sprintf("DEST_FILE_PATH=%s", strings.TrimPrefix(destFilePath, "/")),
}

vars = append(vars, envVars...)

createResponse, err := e.docker.ContainerCreate(ctx, &container.Config{
User: "root",
Image: "d3fk/s3cmd",
Expand All @@ -110,19 +151,7 @@ func (e *taskExecutor) uploadS3FileWithHelper(ctx context.Context, mounts []moun
Cmd: []string{
s3Script,
},
Env: []string{
fmt.Sprintf("ARCHIVE_ENABLED=%t", archiveSpec.Enabled),
fmt.Sprintf("ARCHIVE_FORMAT=%s", archiveSpec.Format),
fmt.Sprintf("S3_ACCESS_KEY=%s", s3StorageSpec.AccessKey),
fmt.Sprintf("S3_SECRET_KEY=%s", s3StorageSpec.SecretKey),
fmt.Sprintf("S3_ENDPOINT=%s", s3StorageSpec.Endpoint),
fmt.Sprintf("S3_REGION=%s", s3StorageSpec.Region),
fmt.Sprintf("S3_BUCKET=%s", s3StorageSpec.Bucket),
fmt.Sprintf("PATH_PREFIX=%s", strings.Trim(s3StorageSpec.PathPrefix, "/")),
fmt.Sprintf("SRC_FILE_PATH=%s", srcFilePath),
fmt.Sprintf("DEST_FILE_PATH=%s", strings.TrimPrefix(destFilePath, "/")),
fmt.Sprintf("REMOVE_SRC_FILE=%t", removeSrcFile),
},
Env: vars,
}, &container.HostConfig{
Mounts: mounts,
}, &network.NetworkingConfig{}, &v1.Platform{}, containerName)
Expand Down
102 changes: 0 additions & 102 deletions internal/app/ptah-agent/s3.sh

This file was deleted.

73 changes: 73 additions & 0 deletions internal/app/ptah-agent/s3_download.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#!/bin/sh

set -e

echo "Starting s3 download script validation"

# Function to check if a variable is set and not empty
check_var() {
eval value=\$$1
if [ -z "$value" ]; then
echo "Error: $1 is not set or is empty"
exit 1
fi
}

check_var SRC_FILE_PATH
check_var DEST_FILE_PATH
check_var S3_ACCESS_KEY
check_var S3_SECRET_KEY
check_var S3_ENDPOINT
check_var S3_REGION
check_var S3_BUCKET
check_var PATH_PREFIX

DEST_FILE_PATH="/$DEST_FILE_PATH"

echo "Removing $DEST_FILE_PATH/*"
rm -rf "$DEST_FILE_PATH/*" 2> /dev/null || true

echo "Downloading from s3://$S3_BUCKET/$PATH_PREFIX/$SRC_FILE_PATH"

case "$SRC_FILE_PATH" in
*".tar.gz")
ARCHIVE_FORMAT="tar.gz"
;;
*".zip")
ARCHIVE_FORMAT="zip"
;;
*)
echo "Unsupported archive format: $SRC_FILE_PATH"
exit 1
;;
esac

ARCHIVE_PATH="/tmp/archive.$ARCHIVE_FORMAT"

s3cmd --access_key "$S3_ACCESS_KEY" \
--secret_key "$S3_SECRET_KEY" \
--host "$S3_ENDPOINT" \
--host-bucket "$S3_ENDPOINT" \
--region "$S3_REGION" \
get "s3://$S3_BUCKET/$PATH_PREFIX/$SRC_FILE_PATH" "$ARCHIVE_PATH"

echo "Extracting $SRC_FILE_PATH to $DEST_FILE_PATH"

case "$ARCHIVE_FORMAT" in
"tar.gz")
tar -xzvf "$ARCHIVE_PATH" -C "$DEST_FILE_PATH"
;;
"zip")
apk add zip
unzip "$ARCHIVE_PATH"
;;
*)
echo "Unsupported archive format: $ARCHIVE_FORMAT"
exit 1
;;
esac

echo "Removing $ARCHIVE_PATH"
rm -f "$ARCHIVE_PATH"

echo "Done"
67 changes: 67 additions & 0 deletions internal/app/ptah-agent/s3_upload.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#!/bin/sh

set -e

echo 'https://ptah.sh' > /tmp/check-access.txt

echo "Starting s3 upload script validation"

# Function to check if a variable is set and not empty
check_var() {
eval value=\$$1
if [ -z "$value" ]; then
echo "Error: $1 is not set or is empty"
exit 1
fi
}

check_var ARCHIVE_FORMAT
check_var SRC_FILE_PATH
check_var DEST_FILE_PATH
check_var S3_ACCESS_KEY
check_var S3_SECRET_KEY
check_var S3_ENDPOINT
check_var S3_REGION
check_var S3_BUCKET
check_var PATH_PREFIX

SRC_FILE_PATH="/$SRC_FILE_PATH"

if [ -d "$SRC_FILE_PATH" ]; then
cd "$SRC_FILE_PATH"
else
cd "$(dirname "$SRC_FILE_PATH")"
fi

echo "Archiving $SRC_FILE_PATH"

SRC_FILE_PATH="/tmp/archive.$ARCHIVE_FORMAT"

case "$ARCHIVE_FORMAT" in
"tar.gz")
tar -czvf "$SRC_FILE_PATH" "."
;;
"zip")
apk add zip
zip -r "$SRC_FILE_PATH" "."
;;
*)
echo "Unsupported archive format: $ARCHIVE_FORMAT"
exit 1
;;
esac

echo "Uploading $SRC_FILE_PATH to s3://$S3_BUCKET/$PATH_PREFIX/$DEST_FILE_PATH"

s3cmd --guess-mime-type \
--access_key "$S3_ACCESS_KEY" \
--secret_key "$S3_SECRET_KEY" \
--host "$S3_ENDPOINT" \
--host-bucket "$S3_ENDPOINT" \
--region "$S3_REGION" \
put "$SRC_FILE_PATH" "s3://$S3_BUCKET/$PATH_PREFIX/$DEST_FILE_PATH"

echo "Removing $SRC_FILE_PATH/*"
rm -rf "$SRC_FILE_PATH/*" 2> /dev/null || true

echo "Done"
2 changes: 2 additions & 0 deletions internal/app/ptah-agent/safe_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ func (c *SafeClient) StartBackgroundRequestsProcessing(ctx context.Context) erro
err := c.PerformBackgroundRequests(ctx)
if err != nil {
log.Println("error performing background requests:", err)

time.Sleep(4 * time.Second)
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions internal/app/ptah-agent/service_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,10 @@ func (e *taskExecutor) monitorJobServiceLaunch(ctx context.Context, service *swa
return errors.Errorf("task %s failed: %s\n%s", t.ID, t.Status.Err, logs)
}

containerID = t.Status.ContainerStatus.ContainerID
break
if t.Status.ContainerStatus != nil {
containerID = t.Status.ContainerStatus.ContainerID
break
}
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions internal/app/ptah-agent/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ func (e *taskExecutor) executeTask(ctx context.Context, task interface{}) (inter
return e.updateDird(ctx, task.(*t.UpdateDirdReq))
case *t.LaunchServiceReq:
return e.launchDockerService(ctx, task.(*t.LaunchServiceReq))
case *t.S3DownloadReq:
return e.s3download(ctx, task.(*t.S3DownloadReq))
default:
return nil, fmt.Errorf("execute task: unknown task type %T", task)
}
Expand Down
Loading

0 comments on commit a0cd1e3

Please sign in to comment.