-
Notifications
You must be signed in to change notification settings - Fork 33
/
Copy pathfinal_project.py
56 lines (44 loc) · 1.41 KB
/
final_project.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
from datetime import datetime, timedelta
import pendulum
import os
from airflow.decorators import dag
from airflow.operators.dummy import DummyOperator
from operators import (StageToRedshiftOperator, LoadFactOperator,
LoadDimensionOperator, DataQualityOperator)
from helpers import SqlQueries
default_args = {
'owner': 'udacity',
'start_date': pendulum.now(),
}
@dag(
default_args=default_args,
description='Load and transform data in Redshift with Airflow',
schedule_interval='0 * * * *'
)
def final_project():
start_operator = DummyOperator(task_id='Begin_execution')
stage_events_to_redshift = StageToRedshiftOperator(
task_id='Stage_events',
)
stage_songs_to_redshift = StageToRedshiftOperator(
task_id='Stage_songs',
)
load_songplays_table = LoadFactOperator(
task_id='Load_songplays_fact_table',
)
load_user_dimension_table = LoadDimensionOperator(
task_id='Load_user_dim_table',
)
load_song_dimension_table = LoadDimensionOperator(
task_id='Load_song_dim_table',
)
load_artist_dimension_table = LoadDimensionOperator(
task_id='Load_artist_dim_table',
)
load_time_dimension_table = LoadDimensionOperator(
task_id='Load_time_dim_table',
)
run_quality_checks = DataQualityOperator(
task_id='Run_data_quality_checks',
)
final_project_dag = final_project()