From 30e035460322582299469ff7a959e73a19c64401 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 16 Feb 2024 08:25:22 -0800 Subject: [PATCH] Update async agent example (#4906) * Add sync agent example Signed-off-by: Kevin Su * nit Signed-off-by: Kevin Su --------- Signed-off-by: Kevin Su --- docs/flyte_agents/creating_an_agent.md | 28 ++++++--------------- docs/flyte_agents/testing_agents_locally.md | 6 +++++ 2 files changed, 13 insertions(+), 21 deletions(-) diff --git a/docs/flyte_agents/creating_an_agent.md b/docs/flyte_agents/creating_an_agent.md index d4aa953aeb..64f9d6c634 100644 --- a/docs/flyte_agents/creating_an_agent.md +++ b/docs/flyte_agents/creating_an_agent.md @@ -43,7 +43,7 @@ class Metadata: # If you add s3 file path, the agent will check if the file exists. job_id: str -class CustomAgent(AgentBase): +class CustomAsyncAgent(AsyncAgentBase): def __init__(self, task_type: str): # Each agent should have a unique task type. # The Flyte agent service will use the task type @@ -52,10 +52,10 @@ class CustomAgent(AgentBase): def create( self, - context: grpc.ServicerContext, output_prefix: str, task_template: TaskTemplate, inputs: typing.Optional[LiteralMap] = None, + **kwargs, ) -> TaskCreateResponse: # 1. Submit the task to the external service (BigQuery, DataBricks, etc.) # 2. Create metadata for the task, such as jobID. @@ -63,7 +63,7 @@ class CustomAgent(AgentBase): res = requests.post(url, json=data) return CreateTaskResponse(resource_meta=json.dumps(asdict(Metadata(job_id=str(res.job_id)))).encode("utf-8")) - def get(self, context: grpc.ServicerContext, resource_meta: bytes) -> TaskGetResponse: + def get(self, resource_meta: bytes, **kwargs) -> TaskGetResponse: # 1. Deserialize the metadata. # 2. Use the metadata to get the job status. # 3. Return the job status. @@ -71,29 +71,15 @@ class CustomAgent(AgentBase): res = requests.get(url, json={"job_id": metadata.job_id}) return GetTaskResponse(resource=Resource(state=res.state) - def delete(self, context: grpc.ServicerContext, resource_meta: bytes) -> TaskDeleteResponse: + def delete(self, resource_meta: bytes, **kwargs) -> TaskDeleteResponse: # 1. Deserialize the metadata. # 2. Use the metadata to delete the job. - # 3. If failed to delete the job, add the error message to the grpc context. - # context.set_code(grpc.StatusCode.INTERNAL) - # context.set_details(f"failed to create task with error {e}") - try: - metadata = Metadata(**json.loads(resource_meta.decode("utf-8"))) - requests.delete(url, json={"job_id": metadata.job_id}) - except Exception as e: - logger.error(f"failed to delete task with error {e}") - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"failed to delete task with error {e}") + metadata = Metadata(**json.loads(resource_meta.decode("utf-8"))) + requests.delete(url, json={"job_id": metadata.job_id}) return DeleteTaskResponse() # To register the custom agent -AgentRegistry.register(CustomAgent()) +AgentRegistry.register(CustomAsyncAgent()) ``` For an example implementation, see the [BigQuery Agent](https://github.com/flyteorg/flytekit/blob/9977aac26242ebbede8e00d476c2fbc59ac5487a/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py#L35). - -## Sync agent interface specification - -To create a new async agent, extend the `AgentBase` class in the `flytekit.backend` module and implement the `execute` method. - -- `execute`: This method is used to initiate a new job and return the response. \ No newline at end of file diff --git a/docs/flyte_agents/testing_agents_locally.md b/docs/flyte_agents/testing_agents_locally.md index 3bc0bfa82f..d96da05d0a 100644 --- a/docs/flyte_agents/testing_agents_locally.md +++ b/docs/flyte_agents/testing_agents_locally.md @@ -13,6 +13,12 @@ Agents can be tested locally without configuring or running the backend server, The task inherited from `AsyncAgentExecutorMixin` can be executed locally, allowing flytekit to mimic FlytePropeller's behavior to call the agent. +```python +class BigQueryTask(AsyncAgentExecutorMixin, SQLTask[BigQueryConfig]): + def __init__(self, name: str, **kwargs): + ... +``` + :::{note} In some cases, you will need to store credentials in your local environment when testing locally.