Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use AssetExecutionContext to correctly pass in StravaAPIResource #11

Merged
merged 1 commit into from
Dec 23, 2024

Conversation

jairus-m
Copy link
Owner

Summary

I created a Dagster resource to abstract the Strava API in #5. However, it looks like I didn't implement its use correctly. This PR updates the implementation so that Dagster actually recognizes its use as a resource

Details

Before:

  • The StravaAPIResource I built it is not being recognized as a utilized asset

  • This is because I literally used it as a class object rather than using it as a Dagster resource through the AssetExecutionContext after passing the resource into the Definitions object.

    from ..resources import StravaAPIResource, strava_api_resouce
    
    @asset(key=["strava", "activities"], group_name="dltHub")
    def load_strava_activities():
        """
        dlt EL pipeline based off declarative Rest API Config
        to load raw Strava activities into DuckDB
        """
        duckdb_database_path = EnvVar("DUCKDB_DATABASE").get_value()
        logger.info(f"Dagster Env: {EnvVar('DAGSTER_ENVIRONMENT').get_value()}")
        logger.info(f"Writing to {duckdb_database_path}..")
        pipeline = dlt.pipeline(
            pipeline_name="strava_rest_config",
            destination=dlt.destinations.duckdb(duckdb_database_path),
            dataset_name="activities",
            progress="log",
        )
    
        source = strava_rest_api_config(strava_api_resouce) # this works but is not recognized by Dagster
    
        load_info = pipeline.run(source)
        logger.info(load_info)

Note below that 'strava' has 0 uses.

Screenshot 2024-12-23 at 12 22 12 PM

After

  • The StravaAPIResource is now being recognized

  • AssetExecutionContext allows an asset to use any resource that is defined in the Definitions object and call it through context.resource

    from dagster import AssetExecutionContext
    
    @asset(
    key=["strava", "activities"],
    group_name="dltHub",
    required_resource_keys={"strava"},
    )
    def load_strava_activities(context: AssetExecutionContext):
        """
        dlt EL pipeline based off declarative Rest API Config
        to load raw Strava activities into DuckDB
        """
        duckdb_database_path = EnvVar("DUCKDB_DATABASE").get_value()
        logger.info(f"Dagster Env: {EnvVar('DAGSTER_ENVIRONMENT').get_value()}")
        logger.info(f"Writing to {duckdb_database_path}..")
        pipeline = dlt.pipeline(
            pipeline_name="strava_rest_config",
            destination=dlt.destinations.duckdb(duckdb_database_path),
            dataset_name="activities",
            progress="log",
        )
    
        source = strava_rest_api_config(context.resources.strava) # this can now be recognized 
    
        load_info = pipeline.run(source)
        logger.info(load_info)

Note below that 'strava' has 2 uses.

  • 1 asset, 1 job
Screenshot 2024-12-23 at 12 30 52 PM

@jairus-m jairus-m changed the title Use AssetExecutionContext to pass StravaResourceAPI into assets Use AssetExecutionContext to correctly pass in StravaAPIResource Dec 23, 2024
@jairus-m jairus-m merged commit 4f885a0 into main Dec 23, 2024
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant