Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support interactions with Airflow #836

Closed
michel-tricot opened this issue Nov 6, 2020 · 14 comments
Closed

Support interactions with Airflow #836

michel-tricot opened this issue Nov 6, 2020 · 14 comments
Labels
type/enhancement New feature or request

Comments

@michel-tricot
Copy link
Contributor

Tell us about the problem you're trying to solve

I would like Airflow to be able to trigger syncs on Airbyte as well as having Airbyte triggering Airflow runs

@michel-tricot michel-tricot added the type/enhancement New feature or request label Nov 6, 2020
@ryw
Copy link

ryw commented Nov 12, 2020

With Airflow 2 launching, you might want to build this on the new Airflow API. The plan is to get Airflow 2 out in early December.

@pualien
Copy link

pualien commented Dec 29, 2020

@ryw are there new updates regarding the integration with airflow? A couple of weeks ago airflow 2 was released and seems to be pretty stable 🙂

@michel-tricot
Copy link
Contributor Author

@pualien how would you see the AF2 integration working? Could you run me through a scenario? We are currently thinking about the design and the more usecases we have the better!

@pualien
Copy link

pualien commented Dec 30, 2020

@michel-tricot as you reported before..being more specific the best could be to have airbyte operators and sensors in airflow, in order to trigger connector execution and check the end of the sync in airflow

@ChristopheDuong
Copy link
Contributor

Airbyte Jobs or syncs created through Airbyte UI could also be translated to Airflow DAGs dynamically?
https://www.astronomer.io/guides/dynamically-generating-dags

@marcosmarxm
Copy link
Member

https://www.astronomer.io/blog/airflow-dbt-1 another blog by the astronomer. The second part is very interesting about how they build dbt tasks within Airflow DAG using a manifest.json.

With the Airbyte API it is now possible to build an Airflow Operator and the connector. Can I help with that?

@michel-tricot
Copy link
Contributor Author

Hi @marcosmarxm that would be amazing!!

How do you envision it working?

@marcosmarxm
Copy link
Member

marcosmarxm commented Feb 23, 2021

@michel-tricot I imagine that this integration should take place in stages. I made a draft and the code is very primitive:
https://github.com/marcosmarxm/airflow-airbyte-interaction

The use case I considered in this draft is (in the code I used the Money/JSON destination example from the getting started):

I use Airflow as an orchestrator and Airbyte for integrations. At the moment I will create the sources/destinations/connections in Airbyte UI and leave them as a manual schedule. I will use Airflow to perform the trigger to perform the sync. Want to use the name from sources and destinations from Airbyte in Airflow to make my job easier.

I created in Airflow an operator called AirbyteTriggerSyncOperator that will receive the name of the source and destination. It is the responsibility of this operator to find the connectionId to trigger the synchronization in Airbyte. Briefly:

  1. call /workspaces/get_byt_slug getting workspaceId
  2. call /sources/list getting sourceId based on source name
  3. call /destinations/list getting destinationId based on destination name
  4. call /connections/listgetting connectionId based on sourceId and destinationId
  5. call /connections/sync with connectionId
  6. do a loop while job status is running/pending calling /jobs/get every 1 second
  7. if job status == succeeded finished.
from airflow.providers.airbyte import AirbyteTriggerSyncOperator # in the future! now is plugin.operators :p

with DAG(dag_id='trigger_airbyte_connection',
         default_args={'owner': 'airflow'},
         schedule_interval='@daily',
         start_date=days_ago(2)) as dag:

    money_json = AirbyteTriggerSyncOperator(
        task_id='sync_money_json',
        airbyte_conn_id='airbyte_local',
        source_name='Money',
        dest_name='JSON destination'
    )

as I mentioned, it's a draft. However it's quite simple and is already "working".

@marcosmarxm
Copy link
Member

Looking at the Airflow documentation, it would be interesting to create a Hook in addition to the Operator. Inserting all the API access methods into it. And if there are more Operators in the future, they would be able to reuse the methods from the Hook.

@michel-tricot
Copy link
Contributor Author

Looks great!

Out of curiosity, why did you decide to configure the SyncOperator with the source name and destination name, instead of the connection id?

Also, where do you envision the operator to live? Do you think it should be a separate project or would it make sense to have it in the monorepo?

@marcosmarxm
Copy link
Member

Using Source and Destination as parameters

airflow_task = AirbyteTriggerSyncOperator(
    task_id='sync_money_json',
    airbyte_conn_id='airbyte_conn_example',
    source_name='Money',
    dest_name='JSON destination'
)

Using ConnectionId directly

airflow_task = AirbyteTriggerSyncOperator(
    task_id='sync_money_json',
    airbyte_conn_id='airbyte_conn_example',
    connection_id='5ae6b09b-fdec-41af-aaf7-7d94cfc33ef6',
)

I used the name of the source_name and destination_name because I find them more convenient and readable.
At the time of connectionId I just managed to get this info from the URL in the connection page, is that correct?
Analyzing the two examples above, a task in Airflow requires having a task_id, this variable can be given a more friendly name than using source or connectionId does not have much difference.

PROS:
Using the direct connectionId makes integration with Airflow much easier because I'll directly call the endpoint that triggers the job.

CONS:
Imagine that the job has a problem, and it is necessary to access Airbyte UI for debugging. Having the source_name and destination_name information ends up having a logical path to find out my connection. However, if I only use the connectionId information in Airflow, I may have a problem finding the connection.
(Of course, you can enter the URL /connection and paste the connectionId, but it's not the coolest thing to do, IMHO)


The AirbyteTriggerSyncOperator must live within the Airflow project as a Provider package. People can install it easily when set up Airflow as an extra-package requirement apache-airflow[airbyte].
Airbyte and Airflow are evolving very fast. A repository could be created with the sample project and a CI to be executed weekly/monthly to ensure that the integration continues correctly.

@jrhizor
Copy link
Contributor

jrhizor commented Feb 24, 2021

Sources (and destinations) can have duplicate names, so it's probably safer to refer to just the connectionId. Maybe inside of the operator it can retrieve the source and destination names/ids and log them at runtime for debugging purposes?

Developers could still use a descriptive task_id to document the source/destination?

@sherifnada
Copy link
Contributor

@marcosmarxm can we close this?

@marcosmarxm
Copy link
Member

I think so, I'll create another issue to expand the first Operator release to support SSH connection also.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

7 participants