Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Udemy #610

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open

Udemy #610

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,8 @@ sftp-config.json

# Python
__pycache__

# output files
store_files/location_wise_profit*
store_files/store_wise_profit*
store_files/clean_store_transactions*
24 changes: 24 additions & 0 deletions dags/assigment1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#import the required libraries
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators import BashOperator


#defining the default arguments dictionary
args = {
'owner': 'airflow',
'start_date': datetime(2020,12,2), #you can change this start_date
'retries': 1,
"retry_delay": timedelta(seconds=10),
}

dag = DAG('Assignment_1', default_args=args)

#task1 is to create a directory 'test_dir' inside dags folder
task1 = BashOperator(task_id='create_directory', bash_command='mkdir -p ~/dags/test_dir', dag=dag)

#task2 is to get the 'shasum' of 'test_dir' directory
task2 = BashOperator(task_id='get_shasum', bash_command='shasum ~/dags/test_dir', dag=dag)

#below we are setting up the operator relationships such that task1 will run first than task2
task2.set_upstream(task1)
3 changes: 3 additions & 0 deletions dags/create_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
CREATE TABLE IF NOT EXISTS clean_store_transactions(STORE_ID varchar(50),
STORE_LOCATION varchar(50), PRODUCT_CATEGORY varchar(50), PRODUCT_ID int, MRP float, CP float,
DISCOUNT float, SP float, DATE date);
26 changes: 26 additions & 0 deletions dags/datacleaner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
def data_cleaner():

import pandas as pd
import re

df = pd.read_csv("~/store_files_airflow/raw_store_transactions.csv")

def clean_store_location(st_loc):
return re.sub(r'[^\w\s]', '', st_loc).strip()

def clean_product_id(pd_id):
matches = re.findall(r'\d+', pd_id)
if matches:
return matches[0]
return pd_id

def remove_dollar(amount):
return float(amount.replace('$', ''))

df['STORE_LOCATION'] = df['STORE_LOCATION'].map(lambda x: clean_store_location(x))
df['PRODUCT_ID'] = df['PRODUCT_ID'].map(lambda x: clean_product_id(x))

for to_clean in ['MRP', 'CP', 'DISCOUNT', 'SP']:
df[to_clean] = df[to_clean].map(lambda x: remove_dollar(x))

df.to_csv('~/store_files_airflow/clean_store_transactions.csv', index=False)
1 change: 1 addition & 0 deletions dags/insert_into_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
LOAD DATA INFILE '/store_files_mysql/clean_store_transactions.csv' INTO TABLE clean_store_transactions FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' IGNORE 1 ROWS;
13 changes: 13 additions & 0 deletions dags/select_from_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
SELECT DATE, STORE_LOCATION, ROUND((SUM(SP) - SUM(CP)), 2) AS lc_profit
FROM clean_store_transactions
WHERE DATE = '2019-11-26'
GROUP BY STORE_LOCATION
ORDER BY lc_profit
DESC INTO OUTFILE '/store_files_mysql/location_wise_profit.csv' FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n';

SELECT DATE, STORE_ID, ROUND((SUM(SP) - SUM(CP)), 2) AS st_profit
FROM clean_store_transactions
WHERE DATE = '2019-11-26'
GROUP BY STORE_ID
ORDER BY st_profit
DESC INTO OUTFILE '/store_files_mysql/store_wise_profit.csv' FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n';
49 changes: 49 additions & 0 deletions dags/storedag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.mysql_operator import MySqlOperator
from airflow.operators.email_operator import EmailOperator

from datacleaner import data_cleaner

default_args = {
'owner': 'Airflow',
'start_date': datetime(2020, 12, 2),
'retries': 1,
'retry_delay': timedelta(seconds=5)
}

timestamp = datetime.strftime(datetime.now(), '%Y-%m-%d:%HH%MM')

dag = DAG('store_dag',default_args=default_args,schedule_interval='@daily', catchup=False)

t1=BashOperator(task_id='check_file_exists', bash_command='shasum ~/store_files_airflow/raw_store_transactions.csv',
retries=2, retry_delay=timedelta(seconds=15), dag=dag)

t2 = PythonOperator(task_id='clean_raw_csv', python_callable=data_cleaner, dag=dag)

t3 = MySqlOperator(task_id='create_mysql_table', mysql_conn_id="mysql_conn", sql="create_table.sql", dag=dag)

t4 = MySqlOperator(task_id='insert_into_table', mysql_conn_id="mysql_conn", sql="insert_into_table.sql", dag=dag)

t5 = MySqlOperator(task_id='select_from_table', mysql_conn_id="mysql_conn", sql="select_from_table.sql", dag=dag)

t6 = BashOperator(task_id='move_file1',
bash_command='cat ~/store_files_airflow/location_wise_profit.csv && mv ~/store_files_airflow/location_wise_profit.csv ~/store_files_airflow/location_wise_profit_%s.csv' % timestamp, dag=dag)

t7 = BashOperator(task_id='move_file2',
bash_command='cat ~/store_files_airflow/store_wise_profit.csv && mv ~/store_files_airflow/store_wise_profit.csv ~/store_files_airflow/store_wise_profit_%s.csv' % timestamp, dag=dag)

t8 = EmailOperator(task_id='send_email',
to='[email protected]',
subject='Daily report generated',
html_content=""" <h1>Congratulations! Your store reports are ready.</h1> """,
files=['/usr/local/airflow/store_files_airflow/location_wise_profit_%s.csv' % timestamp,
'/usr/local/airflow/store_files_airflow/store_wise_profit_%s.csv' % timestamp],
dag=dag)

t9 = BashOperator(task_id='rename_raw',
bash_command='mv ~/store_files_airflow/clean_store_transactions.csv ~/store_files_airflow/raw_store_transactions_%s.csv' % timestamp, dag=dag)

t1 >> t2 >> t3 >> t4 >> t5 >> [t6, t7] >> t8 >> t9
2 changes: 1 addition & 1 deletion dags/tuto.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2015, 6, 1),
"start_date": datetime(2020, 12, 2),
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
Expand Down
22 changes: 21 additions & 1 deletion docker-compose-LocalExecutor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,38 @@ services:
max-size: 10m
max-file: "3"

mysql:

image: mysql:5.7.27
environment:
- MYSQL_ROOT_PASSWORD=root
volumes:
- ./store_files:/store_files_mysql/
- ./mysql.cnf:/etc/mysql/mysql.cnf

webserver:
image: puckel/docker-airflow:1.10.9
restart: always
depends_on:
- postgres
- mysql
environment:
- INSTALL_MYSQL=y
- LOAD_EX=n
- EXECUTOR=Local
- AIRFLOW__SMTP__SMTP_HOST=smtp.gmail.com
- [email protected]
- AIRFLOW__SMTP__SMTP_PASSWORD=thecatinthehat
- AIRFLOW__SMTP__SMTP_PORT=587
- AIRFLOW__SMTP__SMTP_MAIL_FROM=Airflow
logging:
options:
max-size: 10m
max-file: "3"
volumes:
- ./dags:/usr/local/airflow/dags
# - ./plugins:/usr/local/airflow/plugins
- ./store_files:/usr/local/airflow/store_files_airflow
- ./sql_files:/usr/local/airflow/sql_files
ports:
- "8080:8080"
command: webserver
Expand All @@ -34,3 +51,6 @@ services:
interval: 30s
timeout: 30s
retries: 3

redis:
image: "redis:3.2.7"
15 changes: 15 additions & 0 deletions docker.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Useful docker commands

## After code changes

```
docker-compose -f docker-compose-LocalExecutor.yml up -d
```

## Reset data

reset the data

```
docker-compose -f docker-compose-LocalExecutor.yml down
```
7 changes: 7 additions & 0 deletions mysql.cnf
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
!includedir /etc/mysql/conf.d/
!includedir /etc/mysql/mysql.conf.d/
[mysqld]
pid-file = /var/run/mysqld/mysqld.pid
socket = /var/run/mysqld/mysqld.sock
datadir = /var/lib/mysql
secure-file-priv= ""
23 changes: 23 additions & 0 deletions mysql.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Mysql Container

```bash
docker exec -it 6ff2b20c1d5c bash
mysql -u root -p
use mysql;
show tables;
```

## Create connection

- see dag for connection name: `mysql_conn_id="mysql_conn"`
- create new connection `mysql_conn`
- connection type `MySQL, `host `mysql`, schema `mysql`, login `root`, password

## Troubleshooting

```
cryptography.fernet.InvalidToken
```

- check the connection and repair it if needed.

5 changes: 5 additions & 0 deletions store_files/location_wise_profit_2020-12-03:07H48M.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
2019-11-26,Washington,1515328.56
2019-11-26,New York,1403488.80
2019-11-26,Houston,1313427.36
2019-11-26,Miami,1242793.44
2019-11-26,Denver,1154795.76
Loading