Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multipart blob download #5715

Open
wants to merge 25 commits into
base: master
Choose a base branch
from

Conversation

wayner0628
Copy link
Contributor

@wayner0628 wayner0628 commented Sep 1, 2024

Tracking issue

#3632

Why are the changes needed?

Supporting multipart blob downloads allows us to completely copy the specified directory into the input path.

What changes were proposed in this pull request?

  • Using new storage List api to collect items under container before download
  • Implement List api for memory storage
  • Parallel download

How was this patch tested?

unit tests, specifically in download_test.go

Setup process

Screenshots

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

flyteorg/flytekit#2258

Docs link

NA

Copy link

codecov bot commented Sep 1, 2024

Codecov Report

Attention: Patch coverage is 58.16327% with 41 lines in your changes missing coverage. Please review.

Project coverage is 36.74%. Comparing base (59bf191) to head (acc16c8).
Report is 49 commits behind head on master.

Files with missing lines Patch % Lines
flytecopilot/data/download.go 67.05% 21 Missing and 7 partials ⚠️
flytestdlib/storage/mem_store.go 0.00% 11 Missing ⚠️
flytestdlib/storage/storage.go 0.00% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #5715      +/-   ##
==========================================
+ Coverage   36.21%   36.74%   +0.52%     
==========================================
  Files        1303     1304       +1     
  Lines      109644   130160   +20516     
==========================================
+ Hits        39710    47829    +8119     
- Misses      65810    78148   +12338     
- Partials     4124     4183      +59     
Flag Coverage Δ
unittests-datacatalog 51.58% <ø> (+0.21%) ⬆️
unittests-flyteadmin 54.41% <ø> (-1.19%) ⬇️
unittests-flytecopilot 19.35% <67.05%> (+7.18%) ⬆️
unittests-flytectl 62.40% <ø> (+0.18%) ⬆️
unittests-flyteidl 6.89% <ø> (-0.24%) ⬇️
unittests-flyteplugins 53.62% <ø> (+0.27%) ⬆️
unittests-flytepropeller 42.84% <ø> (+1.08%) ⬆️
unittests-flytestdlib 54.67% <0.00%> (-0.55%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Signed-off-by: wayner0628 <[email protected]>
Signed-off-by: wayner0628 <[email protected]>
Signed-off-by: wayner0628 <[email protected]>
Signed-off-by: wayner0628 <[email protected]>
Signed-off-by: wayner0628 <[email protected]>
Signed-off-by: wayner0628 <[email protected]>
Signed-off-by: wayner0628 <[email protected]>
Signed-off-by: wayner0628 <[email protected]>
Signed-off-by: wayner0628 <[email protected]>
Signed-off-by: wayner0628 <[email protected]>
@wayner0628 wayner0628 marked this pull request as ready for review September 5, 2024 19:14
Copy link
Contributor

@wild-endeavor wild-endeavor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @wayner0628 - i think this is good. I want to get @eapolinario or @EngHabu to take a quick look at this as well though. This is a pretty core interface that's changing in this PR.

@@ -78,6 +78,9 @@ type RawStore interface {
// Head gets metadata about the reference. This should generally be a light weight operation.
Head(ctx context.Context, reference DataReference) (Metadata, error)

// GetItems retrieves the paths of all items from the Blob store or an error
GetItems(ctx context.Context, reference DataReference) ([]string, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would this be more accurately named ListItems? Also what is retrieved? The relative path to the reference input? can we add comment?

flytecopilot/data/download.go Show resolved Hide resolved
@@ -54,6 +55,23 @@ func (s *InMemoryStore) Head(ctx context.Context, reference DataReference) (Meta
}, nil
}

func (s *InMemoryStore) GetItems(ctx context.Context, reference DataReference) ([]string, error) {
var items []string
prefix := string(reference) + "/"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will reference ever already have a /?

Copy link
Member

@Future-Outlier Future-Outlier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @wayner0628
Can you test cases like this PR?
flyteorg/flytekit#2258
To be more specifically, this case

flyte_dir_io = ContainerTask(
    name="flyte_dir_io",
    input_data_dir="/var/inputs",
    output_data_dir="/var/outputs",
    inputs=kwtypes(inputs=FlyteDirectory),
    outputs=kwtypes(out=FlyteDirectory),
    image="futureoutlier/rawcontainer:0320",
    command=[
        "python",
        "write_flytedir.py",
        "{{.inputs.inputs}}",
        "/var/outputs/out",
    ],
)

If possible, please proivde screenshot, thank you.

@wild-endeavor
Copy link
Contributor

There is also this PR, https://github.com/flyteorg/flyte/pull/5674/files which I think we should merge first. The change to core api should probably be done separately.

@wild-endeavor
Copy link
Contributor

@wayner0628 #5741 this was just merged, adding a list api to the storage client. mind using the new interface to do this?

@wayner0628
Copy link
Contributor Author

@wild-endeavor No problem, I'll update this PR to align with the new interface.

Copy link
Member

@Future-Outlier Future-Outlier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tips to develop copilot in single binary.

  1. config
plugins:
  logs:
    dynamic-log-links:
      - comet-ml-execution-id:
          displayName: Comet
          templateUris: "{{ .taskConfig.host }}/{{ .taskConfig.workspace }}/{{ .taskConfig.project_name }}/{{ .executionName }}{{ .nodeId }}{{ .taskRetryAttempt }}{{ .taskConfig.link_suffix }}"
      - comet-ml-custom-id:
          displayName: Comet
          templateUris: "{{ .taskConfig.host }}/{{ .taskConfig.workspace }}/{{ .taskConfig.project_name }}/{{ .taskConfig.experiment_key }}"

    kubernetes-enabled: true
    kubernetes-template-uri: http://localhost:30080/kubernetes-dashboard/#/log/{{.namespace }}/{{ .podName }}/pod?namespace={{ .namespace }}
    cloudwatch-enabled: false
    stackdriver-enabled: false
  k8s:
    default-env-vars:
      - FLYTE_AWS_ENDPOINT: "http://flyte-sandbox-minio.flyte:9000"
      - FLYTE_AWS_ACCESS_KEY_ID: minio
      - FLYTE_AWS_SECRET_ACCESS_KEY: miniostorage
      - MLFLOW_TRACKING_URI: postgresql+psycopg2://postgres:@postgres.flyte.svc.cluster.local:5432/flyteadmin
    co-pilot:
          image: "localhost:30000/copilot-flytefile:0603"
  1. how to build copilot image?
    use Dockerfile.flytecopilot to build it.

@wayner0628 wayner0628 closed this Sep 15, 2024
@wayner0628 wayner0628 reopened this Sep 15, 2024
Signed-off-by: wayner0628 <[email protected]>
Signed-off-by: wayner0628 <[email protected]>
Signed-off-by: wayner0628 <[email protected]>
Signed-off-by: wayner0628 <[email protected]>
@wayner0628
Copy link
Contributor Author

wayner0628 commented Oct 8, 2024

Hi @Future-Outlier and @wild-endeavor,

I’ve been encountering an issue while running a Flytekit test case. The error I'm seeing is as follows:

[1/1] currentAttempt done. Last Error: USER::Pod failed. No message received from kubernetes.
[flyte-copilot-downloader] terminated with exit code (1). Reason [Error]. Message: 
Type to use [iam, accesskey]. (default "iam")
      --storage.connection.disable-ssl             Disables SSL connection. Should only be used for development.
      --storage.connection.endpoint string         URL for storage client to connect to.
      --storage.connection.region string           Region to connect to. (default "us-east-1")
      --storage.connection.secret-key string       Secret to use when accesskey is set.
      --storage.container string                   Initial container (in s3 a bucket) to create -if it doesn't exist-.'
      --storage.defaultHttpClient.timeout string   Sets time out on the http client. (default "0s")
      --storage.enable-multicontainer              If this is true,  then the container argument is overlooked and redundant. This config will automatically open new connections to new containers/buckets as they are encountered
      --storage.limits.maxDownloadMBs int          Maximum allowed download size (in MBs) per call. (default 2)
      --storage.stow.config stringToString         Configuration for stow backend. Refer to github/graymeta/stow (default [])
      --storage.stow.kind string                   Kind of Stow backend to use. Refer to github/graymeta/stow
      --storage.type string                        Sets the type of storage to configure [s3/minio/local/mem/stow]. (default "s3")
      --tls-server-name string                     If provided, this name will be used to validate server certificate. If this is not provided, hostname used to contact the server is used.
      --token string                               Bearer token for authentication to the API server
      --user string                                The name of the kubeconfig user to use
      --username string                            Username for basic authentication to the API server
  -v, --v Level                                    number for the log level verbosity
      --vmodule moduleSpec                         comma-separated list of pattern=N settings for file-filtered logging

.

The error with flyte-copilot-downloader persists even when I run the raw_container.py example from the Flytesnacks using master branch:

flytectl demo start --dev
POD_NAMESPACE=flyte ./flyte start --config flyte-single-binary-local.yaml
pyflyte run --remote raw_container.py calculate_ellipse_area_shell --a 1.1 --b 1.2

Environment Details:

  • Flytekit: 1.13.7
  • Flyte: up-to-date Master branch
  • Config: No modifications from the master branch
  • Python 3.9.12
  • go version go1.22.4 darwin/arm64

I build, tag and push the modified docker image when testing this PR, but I did not use modified image for the Flytesnacks, it still failed.

This has been blocking me for a couple of weeks now. I’ll continue investigating, but any help or guidance you could provide would be greatly appreciated!

Thank you in advance.

@Future-Outlier
Copy link
Member

Hi @Future-Outlier and @wild-endeavor,

I’ve been encountering an issue while running a Flytekit test case. The error I'm seeing is as follows:

[1/1] currentAttempt done. Last Error: USER::Pod failed. No message received from kubernetes.
[flyte-copilot-downloader] terminated with exit code (1). Reason [Error]. Message: 
Type to use [iam, accesskey]. (default "iam")
      --storage.connection.disable-ssl             Disables SSL connection. Should only be used for development.
      --storage.connection.endpoint string         URL for storage client to connect to.
      --storage.connection.region string           Region to connect to. (default "us-east-1")
      --storage.connection.secret-key string       Secret to use when accesskey is set.
      --storage.container string                   Initial container (in s3 a bucket) to create -if it doesn't exist-.'
      --storage.defaultHttpClient.timeout string   Sets time out on the http client. (default "0s")
      --storage.enable-multicontainer              If this is true,  then the container argument is overlooked and redundant. This config will automatically open new connections to new containers/buckets as they are encountered
      --storage.limits.maxDownloadMBs int          Maximum allowed download size (in MBs) per call. (default 2)
      --storage.stow.config stringToString         Configuration for stow backend. Refer to github/graymeta/stow (default [])
      --storage.stow.kind string                   Kind of Stow backend to use. Refer to github/graymeta/stow
      --storage.type string                        Sets the type of storage to configure [s3/minio/local/mem/stow]. (default "s3")
      --tls-server-name string                     If provided, this name will be used to validate server certificate. If this is not provided, hostname used to contact the server is used.
      --token string                               Bearer token for authentication to the API server
      --user string                                The name of the kubeconfig user to use
      --username string                            Username for basic authentication to the API server
  -v, --v Level                                    number for the log level verbosity
      --vmodule moduleSpec                         comma-separated list of pattern=N settings for file-filtered logging

.

The error with flyte-copilot-downloader persists even when I run the raw_container.py example from the Flytesnacks using master branch:

flytectl demo start --dev
POD_NAMESPACE=flyte ./flyte start --config flyte-single-binary-local.yaml
pyflyte run --remote raw_container.py calculate_ellipse_area_shell --a 1.1 --b 1.2

Environment Details:

  • Flytekit: 1.13.7
  • Flyte: up-to-date Master branch
  • Config: No modifications from the master branch
  • Python 3.9.12
  • go version go1.22.4 darwin/arm64

I build, tag and push the modified docker image when testing this PR, but I did not use modified image for the Flytesnacks, it still failed.

This has been blocking me for a couple of weeks now. I’ll continue investigating, but any help or guidance you could provide would be greatly appreciated!

Thank you in advance.

Can you show me your config file?

@wayner0628
Copy link
Contributor Author

It's the original one, I used to run Flytesnacks

plugins:
  logs:
    kubernetes-enabled: true
    kubernetes-template-uri: http://localhost:30080/kubernetes-dashboard/#/log/{{.namespace }}/{{ .podName }}/pod?namespace={{ .namespace }}
    cloudwatch-enabled: false
    stackdriver-enabled: false
  k8s:
    default-env-vars:
      - FLYTE_AWS_ENDPOINT: http://flyte-sandbox-minio.flyte:9000
      - FLYTE_AWS_ACCESS_KEY_ID: minio
      - FLYTE_AWS_SECRET_ACCESS_KEY: miniostorage
    image-pull-policy: Always # Helps in better iteration of flytekit changes
  k8s-array:
    logs:
      config:
        kubernetes-enabled: true
        kubernetes-template-uri: http://localhost:30080/kubernetes-dashboard/#/log/{{.namespace }}/{{ .podName }}/pod?namespace={{ .namespace }}
        cloudwatch-enabled: false
        stackdriver-enabled: false

@Future-Outlier
Copy link
Member

It's the original one, I used to run Flytesnacks

plugins:
  logs:
    kubernetes-enabled: true
    kubernetes-template-uri: http://localhost:30080/kubernetes-dashboard/#/log/{{.namespace }}/{{ .podName }}/pod?namespace={{ .namespace }}
    cloudwatch-enabled: false
    stackdriver-enabled: false
  k8s:
    default-env-vars:
      - FLYTE_AWS_ENDPOINT: http://flyte-sandbox-minio.flyte:9000
      - FLYTE_AWS_ACCESS_KEY_ID: minio
      - FLYTE_AWS_SECRET_ACCESS_KEY: miniostorage
    image-pull-policy: Always # Helps in better iteration of flytekit changes
  k8s-array:
    logs:
      config:
        kubernetes-enabled: true
        kubernetes-template-uri: http://localhost:30080/kubernetes-dashboard/#/log/{{.namespace }}/{{ .podName }}/pod?namespace={{ .namespace }}
        cloudwatch-enabled: false
        stackdriver-enabled: false

you have to add co-pilot image.

  k8s:
    default-env-vars:
      - FLYTE_AWS_ENDPOINT: "http://flyte-sandbox-minio.flyte:9000"
      - FLYTE_AWS_ACCESS_KEY_ID: minio
      - FLYTE_AWS_SECRET_ACCESS_KEY: miniostorage
      - MLFLOW_TRACKING_URI: postgresql+psycopg2://postgres:@postgres.flyte.svc.cluster.local:5432/flyteadmin
    co-pilot:
          image: "cr.flyte.org/flyteorg/flytecopilot:v1.13.1"

@wayner0628
Copy link
Contributor Author

@Future-Outlier , I'll try it later, thank you

@wayner0628
Copy link
Contributor Author

@Future-Outlier , I add copilot image image: "cr.flyte.org/flyteorg/flytecopilot:v1.13.1" to the config, but the task still failed, do I need to push the image, or do anything else?

@Future-Outlier
Copy link
Member

@Future-Outlier , I add copilot image image: "cr.flyte.org/flyteorg/flytecopilot:v1.13.1" to the config, but the task still failed, do I need to push the image, or do anything else?

@wayner0628 show me your python code and show your whole k8s config.

@wayner0628
Copy link
Contributor Author

@Future-Outlier , I add copilot image image: "cr.flyte.org/flyteorg/flytecopilot:v1.13.1" to the config, but the task still failed, do I need to push the image, or do anything else?

@wayner0628 show me your python code and show your whole k8s config.

import logging

from flytekit import ContainerTask, kwtypes, task, workflow

logger = logging.getLogger(__file__)

# A `flytekit.ContainerTask` denotes an arbitrary container. In the following example, the name of the task
# is `calculate_ellipse_area_shell`. This name has to be unique in the entire project. Users can specify:
#
# - `input_data_dir` -> where inputs will be written to.
# - `output_data_dir` -> where Flyte will expect the outputs to exist.

# `inputs` and `outputs` specify the interface for the task; thus it should be an ordered dictionary of typed input and
# output variables.
calculate_ellipse_area_shell = ContainerTask(
    name="ellipse-area-metadata-shell",
    input_data_dir="/var/inputs",
    output_data_dir="/var/outputs",
    inputs=kwtypes(a=float, b=float),
    outputs=kwtypes(area=float, metadata=str),
    image="ghcr.io/flyteorg/rawcontainers-shell:v2",
    command=[
        "./calculate-ellipse-area.sh",
        "{{.inputs.a}}",
        "{{.inputs.b}}",
        "/var/outputs",
    ],
)

calculate_ellipse_area_python = ContainerTask(
    name="ellipse-area-metadata-python",
    input_data_dir="/var/inputs",
    output_data_dir="/var/outputs",
    inputs=kwtypes(a=float, b=float),
    outputs=kwtypes(area=float, metadata=str),
    image="ghcr.io/flyteorg/rawcontainers-python:v2",
    command=[
        "python",
        "calculate-ellipse-area.py",
        "{{.inputs.a}}",
        "{{.inputs.b}}",
        "/var/outputs",
    ],
)

calculate_ellipse_area_r = ContainerTask(
    name="ellipse-area-metadata-r",
    input_data_dir="/var/inputs",
    output_data_dir="/var/outputs",
    inputs=kwtypes(a=float, b=float),
    outputs=kwtypes(area=float, metadata=str),
    image="ghcr.io/flyteorg/rawcontainers-r:v2",
    command=[
        "Rscript",
        "--vanilla",
        "calculate-ellipse-area.R",
        "{{.inputs.a}}",
        "{{.inputs.b}}",
        "/var/outputs",
    ],
)

calculate_ellipse_area_haskell = ContainerTask(
    name="ellipse-area-metadata-haskell",
    input_data_dir="/var/inputs",
    output_data_dir="/var/outputs",
    inputs=kwtypes(a=float, b=float),
    outputs=kwtypes(area=float, metadata=str),
    image="ghcr.io/flyteorg/rawcontainers-haskell:v2",
    command=[
        "./calculate-ellipse-area",
        "{{.inputs.a}}",
        "{{.inputs.b}}",
        "/var/outputs",
    ],
)

calculate_ellipse_area_julia = ContainerTask(
    name="ellipse-area-metadata-julia",
    input_data_dir="/var/inputs",
    output_data_dir="/var/outputs",
    inputs=kwtypes(a=float, b=float),
    outputs=kwtypes(area=float, metadata=str),
    image="ghcr.io/flyteorg/rawcontainers-julia:v2",
    command=[
        "julia",
        "calculate-ellipse-area.jl",
        "{{.inputs.a}}",
        "{{.inputs.b}}",
        "/var/outputs",
    ],
)


@task
def report_all_calculated_areas(
    area_shell: float,
    metadata_shell: str,
    area_python: float,
    metadata_python: str,
    area_r: float,
    metadata_r: str,
    area_haskell: float,
    metadata_haskell: str,
    area_julia: float,
    metadata_julia: str,
):
    logger.info(f"shell: area={area_shell}, metadata={metadata_shell}")
    logger.info(f"python: area={area_python}, metadata={metadata_python}")
    logger.info(f"r: area={area_r}, metadata={metadata_r}")
    logger.info(f"haskell: area={area_haskell}, metadata={metadata_haskell}")
    logger.info(f"julia: area={area_julia}, metadata={metadata_julia}")


# If you’re using Flytekit version >= v1.11.1, you can execute it locally.
# For example, `pyflyte run raw_container.py calculate_ellipse_area_shell --a 1.1 --b 1.2`
#
# As can be seen in this example, `ContainerTask`s can be interacted with like normal Python functions, whose inputs
# correspond to the declared input variables. All data returned by the tasks are consumed and logged by a Flyte task.
@workflow
def wf(a: float, b: float):
    # Calculate area in all languages
    area_shell, metadata_shell = calculate_ellipse_area_shell(a=a, b=b)
    area_python, metadata_python = calculate_ellipse_area_python(a=a, b=b)
    area_r, metadata_r = calculate_ellipse_area_r(a=a, b=b)
    area_haskell, metadata_haskell = calculate_ellipse_area_haskell(a=a, b=b)
    area_julia, metadata_julia = calculate_ellipse_area_julia(a=a, b=b)

    # Report on all results in a single task to simplify comparison
    report_all_calculated_areas(
        area_shell=area_shell,
        metadata_shell=metadata_shell,
        area_python=area_python,
        metadata_python=metadata_python,
        area_r=area_r,
        metadata_r=metadata_r,
        area_haskell=area_haskell,
        metadata_haskell=metadata_haskell,
        area_julia=area_julia,
        metadata_julia=metadata_julia,
    )

@Future-Outlier , k8s config, you mean flyte-single-binary-local.yaml ?

# This is a sample configuration file for running single-binary Flyte locally against
# a sandbox.
admin:
  # This endpoint is used by flytepropeller to talk to admin
  # and artifacts to talk to admin,
  # and _also_, admin to talk to artifacts
  endpoint: localhost:30080
  insecure: true

catalog-cache:
  endpoint: localhost:8081
  insecure: true
  type: datacatalog

cluster_resources:
  standaloneDeployment: false
  templatePath: $HOME/.flyte/sandbox/cluster-resource-templates

logger:
  show-source: true
  level: 5

propeller:
  create-flyteworkflow-crd: true
  kube-config: $HOME/.flyte/sandbox/kubeconfig
  rawoutput-prefix: s3://my-s3-bucket/data

server:
  kube-config: $HOME/.flyte/sandbox/kubeconfig

webhook:
  certDir: $HOME/.flyte/webhook-certs
  localCert: true
  secretName: flyte-sandbox-webhook-secret
  serviceName: flyte-sandbox-local
  servicePort: 9443

tasks:
  task-plugins:
    enabled-plugins:
      - container
      - sidecar
      - K8S-ARRAY
      - agent-service
      - echo
    default-for-task-types:
      - container: container
      - container_array: K8S-ARRAY

plugins:
  logs:
    kubernetes-enabled: true
    kubernetes-template-uri: http://localhost:30080/kubernetes-dashboard/#/log/{{.namespace }}/{{ .podName }}/pod?namespace={{ .namespace }}
    cloudwatch-enabled: false
    stackdriver-enabled: false
  k8s:
    default-env-vars:
      - FLYTE_AWS_ENDPOINT: "http://flyte-sandbox-minio.flyte:9000"
      - FLYTE_AWS_ACCESS_KEY_ID: minio
      - FLYTE_AWS_SECRET_ACCESS_KEY: miniostorage
      - MLFLOW_TRACKING_URI: postgresql+psycopg2://postgres:@postgres.flyte.svc.cluster.local:5432/flyteadmin
    co-pilot:
      image: "cr.flyte.org/flyteorg/flytecopilot:v1.13.1"
    image-pull-policy: Always # Helps in better iteration of flytekit changes
  k8s-array:
    logs:
      config:
        kubernetes-enabled: true
        kubernetes-template-uri: http://localhost:30080/kubernetes-dashboard/#/log/{{.namespace }}/{{ .podName }}/pod?namespace={{ .namespace }}
        cloudwatch-enabled: false
        stackdriver-enabled: false

database:
  postgres:
    username: postgres
    password: postgres
    host: 127.0.0.1
    port: 30001
    dbname: flyte
    options: "sslmode=disable"
storage:
  type: stow
  stow:
    kind: s3
    config:
      region: us-east-1
      disable_ssl: true
      v2_signing: true
      endpoint: http://localhost:30002
      auth_type: accesskey
      access_key_id: minio
      secret_key: miniostorage
  container: my-s3-bucket

task_resources:
  defaults:
    cpu: 500m
    memory: 500Mi
  limits:
    cpu: 4
    memory: 4Gi

@Future-Outlier
Copy link
Member

@Future-Outlier , I add copilot image image: "cr.flyte.org/flyteorg/flytecopilot:v1.13.1" to the config, but the task still failed, do I need to push the image, or do anything else?

@wayner0628 show me your python code and show your whole k8s config.

import logging

from flytekit import ContainerTask, kwtypes, task, workflow

logger = logging.getLogger(__file__)

# A `flytekit.ContainerTask` denotes an arbitrary container. In the following example, the name of the task
# is `calculate_ellipse_area_shell`. This name has to be unique in the entire project. Users can specify:
#
# - `input_data_dir` -> where inputs will be written to.
# - `output_data_dir` -> where Flyte will expect the outputs to exist.

# `inputs` and `outputs` specify the interface for the task; thus it should be an ordered dictionary of typed input and
# output variables.
calculate_ellipse_area_shell = ContainerTask(
    name="ellipse-area-metadata-shell",
    input_data_dir="/var/inputs",
    output_data_dir="/var/outputs",
    inputs=kwtypes(a=float, b=float),
    outputs=kwtypes(area=float, metadata=str),
    image="ghcr.io/flyteorg/rawcontainers-shell:v2",
    command=[
        "./calculate-ellipse-area.sh",
        "{{.inputs.a}}",
        "{{.inputs.b}}",
        "/var/outputs",
    ],
)

calculate_ellipse_area_python = ContainerTask(
    name="ellipse-area-metadata-python",
    input_data_dir="/var/inputs",
    output_data_dir="/var/outputs",
    inputs=kwtypes(a=float, b=float),
    outputs=kwtypes(area=float, metadata=str),
    image="ghcr.io/flyteorg/rawcontainers-python:v2",
    command=[
        "python",
        "calculate-ellipse-area.py",
        "{{.inputs.a}}",
        "{{.inputs.b}}",
        "/var/outputs",
    ],
)

calculate_ellipse_area_r = ContainerTask(
    name="ellipse-area-metadata-r",
    input_data_dir="/var/inputs",
    output_data_dir="/var/outputs",
    inputs=kwtypes(a=float, b=float),
    outputs=kwtypes(area=float, metadata=str),
    image="ghcr.io/flyteorg/rawcontainers-r:v2",
    command=[
        "Rscript",
        "--vanilla",
        "calculate-ellipse-area.R",
        "{{.inputs.a}}",
        "{{.inputs.b}}",
        "/var/outputs",
    ],
)

calculate_ellipse_area_haskell = ContainerTask(
    name="ellipse-area-metadata-haskell",
    input_data_dir="/var/inputs",
    output_data_dir="/var/outputs",
    inputs=kwtypes(a=float, b=float),
    outputs=kwtypes(area=float, metadata=str),
    image="ghcr.io/flyteorg/rawcontainers-haskell:v2",
    command=[
        "./calculate-ellipse-area",
        "{{.inputs.a}}",
        "{{.inputs.b}}",
        "/var/outputs",
    ],
)

calculate_ellipse_area_julia = ContainerTask(
    name="ellipse-area-metadata-julia",
    input_data_dir="/var/inputs",
    output_data_dir="/var/outputs",
    inputs=kwtypes(a=float, b=float),
    outputs=kwtypes(area=float, metadata=str),
    image="ghcr.io/flyteorg/rawcontainers-julia:v2",
    command=[
        "julia",
        "calculate-ellipse-area.jl",
        "{{.inputs.a}}",
        "{{.inputs.b}}",
        "/var/outputs",
    ],
)


@task
def report_all_calculated_areas(
    area_shell: float,
    metadata_shell: str,
    area_python: float,
    metadata_python: str,
    area_r: float,
    metadata_r: str,
    area_haskell: float,
    metadata_haskell: str,
    area_julia: float,
    metadata_julia: str,
):
    logger.info(f"shell: area={area_shell}, metadata={metadata_shell}")
    logger.info(f"python: area={area_python}, metadata={metadata_python}")
    logger.info(f"r: area={area_r}, metadata={metadata_r}")
    logger.info(f"haskell: area={area_haskell}, metadata={metadata_haskell}")
    logger.info(f"julia: area={area_julia}, metadata={metadata_julia}")


# If you’re using Flytekit version >= v1.11.1, you can execute it locally.
# For example, `pyflyte run raw_container.py calculate_ellipse_area_shell --a 1.1 --b 1.2`
#
# As can be seen in this example, `ContainerTask`s can be interacted with like normal Python functions, whose inputs
# correspond to the declared input variables. All data returned by the tasks are consumed and logged by a Flyte task.
@workflow
def wf(a: float, b: float):
    # Calculate area in all languages
    area_shell, metadata_shell = calculate_ellipse_area_shell(a=a, b=b)
    area_python, metadata_python = calculate_ellipse_area_python(a=a, b=b)
    area_r, metadata_r = calculate_ellipse_area_r(a=a, b=b)
    area_haskell, metadata_haskell = calculate_ellipse_area_haskell(a=a, b=b)
    area_julia, metadata_julia = calculate_ellipse_area_julia(a=a, b=b)

    # Report on all results in a single task to simplify comparison
    report_all_calculated_areas(
        area_shell=area_shell,
        metadata_shell=metadata_shell,
        area_python=area_python,
        metadata_python=metadata_python,
        area_r=area_r,
        metadata_r=metadata_r,
        area_haskell=area_haskell,
        metadata_haskell=metadata_haskell,
        area_julia=area_julia,
        metadata_julia=metadata_julia,
    )

@Future-Outlier , k8s config, you mean flyte-single-binary-local.yaml ?

# This is a sample configuration file for running single-binary Flyte locally against
# a sandbox.
admin:
  # This endpoint is used by flytepropeller to talk to admin
  # and artifacts to talk to admin,
  # and _also_, admin to talk to artifacts
  endpoint: localhost:30080
  insecure: true

catalog-cache:
  endpoint: localhost:8081
  insecure: true
  type: datacatalog

cluster_resources:
  standaloneDeployment: false
  templatePath: $HOME/.flyte/sandbox/cluster-resource-templates

logger:
  show-source: true
  level: 5

propeller:
  create-flyteworkflow-crd: true
  kube-config: $HOME/.flyte/sandbox/kubeconfig
  rawoutput-prefix: s3://my-s3-bucket/data

server:
  kube-config: $HOME/.flyte/sandbox/kubeconfig

webhook:
  certDir: $HOME/.flyte/webhook-certs
  localCert: true
  secretName: flyte-sandbox-webhook-secret
  serviceName: flyte-sandbox-local
  servicePort: 9443

tasks:
  task-plugins:
    enabled-plugins:
      - container
      - sidecar
      - K8S-ARRAY
      - agent-service
      - echo
    default-for-task-types:
      - container: container
      - container_array: K8S-ARRAY

plugins:
  logs:
    kubernetes-enabled: true
    kubernetes-template-uri: http://localhost:30080/kubernetes-dashboard/#/log/{{.namespace }}/{{ .podName }}/pod?namespace={{ .namespace }}
    cloudwatch-enabled: false
    stackdriver-enabled: false
  k8s:
    default-env-vars:
      - FLYTE_AWS_ENDPOINT: "http://flyte-sandbox-minio.flyte:9000"
      - FLYTE_AWS_ACCESS_KEY_ID: minio
      - FLYTE_AWS_SECRET_ACCESS_KEY: miniostorage
      - MLFLOW_TRACKING_URI: postgresql+psycopg2://postgres:@postgres.flyte.svc.cluster.local:5432/flyteadmin
    co-pilot:
      image: "cr.flyte.org/flyteorg/flytecopilot:v1.13.1"
    image-pull-policy: Always # Helps in better iteration of flytekit changes
  k8s-array:
    logs:
      config:
        kubernetes-enabled: true
        kubernetes-template-uri: http://localhost:30080/kubernetes-dashboard/#/log/{{.namespace }}/{{ .podName }}/pod?namespace={{ .namespace }}
        cloudwatch-enabled: false
        stackdriver-enabled: false

database:
  postgres:
    username: postgres
    password: postgres
    host: 127.0.0.1
    port: 30001
    dbname: flyte
    options: "sslmode=disable"
storage:
  type: stow
  stow:
    kind: s3
    config:
      region: us-east-1
      disable_ssl: true
      v2_signing: true
      endpoint: http://localhost:30002
      auth_type: accesskey
      access_key_id: minio
      secret_key: miniostorage
  container: my-s3-bucket

task_resources:
  defaults:
    cpu: 500m
    memory: 500Mi
  limits:
    cpu: 4
    memory: 4Gi

Thank you, I just tested it, and found that this is break by others....

@wayner0628
Copy link
Contributor Author

Thanks for the insights, @Future-Outlier. I was able to test my copilot image and noticed an error in write_flytedir.py in your example :

Error: s3:/my-s3-bucket/data/9x/al2ts9fhllvlghff6dx6-n0-0/a0a112e1a40ca0f61ee2b4e6f4d3911c does not exist or is not a directory.

It seems like there’s a missing / after s3:/, but for corresponding FlyteDirectory and multi-part blob there is no such missing. I'm currently working on resolving this. Since testing with the Copilot setup takes some time, I appreciate your patience as I continue to investigate. Thanks!

@Future-Outlier
Copy link
Member

Thanks for the insights, @Future-Outlier. I was able to test my copilot image and noticed an error in write_flytedir.py in your example :


Error: s3:/my-s3-bucket/data/9x/al2ts9fhllvlghff6dx6-n0-0/a0a112e1a40ca0f61ee2b4e6f4d3911c does not exist or is not a directory.

It seems like there’s a missing / after s3:/, but for corresponding FlyteDirectory and multi-part blob there is no such missing. I'm currently working on resolving this. Since testing with the Copilot setup takes some time, I appreciate your patience as I continue to investigate. Thanks!

No problem, reply here anytime, I'll be there

@wayner0628
Copy link
Contributor Author

Hi @Future-Outlier, I have addressed the technical aspects of the issues, but I need some clarification on a couple of conceptual points before I can finalize the PR:

  1. Memory Storage List API: In the unit test function TestHandleBlobMultipart, the memory storage List API did not align with the implementation in the stow storage, which should use an "absolute path." I've made updates to both the API and the downloader in this behavior. ✅

  2. Error Explanation:

    Error: s3:/my-s3-bucket/data/9x/al2ts9fhllvlghff6dx6-n0-0/a0a112e1a40ca0f61ee2b4e6f4d3911c does not exist or is not a directory.
    

    This error seems to be related to input_path = Path(sys.argv[1]), where pathlib removes the second slash. However, I'm confused about one part: "why does the Container Task receive an S3 path directly instead of a relative path that it could use to copy the input directory?"

Once I have clarity on these points, I believe I can deliver the final PR very quickly. Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants