- Clone the repository
- cd to the repo directory
- Run
pip install -r requirements.txt
in your terminal or command prompt - Set up airflow apache to schedule task
- To run the high_risk_transaction_detection function, use the following code snippet within an Airflow DAG:
- The file "big_data_dag.py" should be created and saved in folder "./dags" which is the same as the current folder of Airflow working folder. The content of "big_data_dag.py" is presented as below:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
from proj_repo.pipeline import data_scrape, processing
with DAG(dag_id="big_data_dag",
start_date=datetime(2023, 12, 23),
schedule_interval= timedelta(minutes=30),
catchup=False) as dag:
task1 = PythonOperator(
task_id="load_data",
python_callable=data_scrape.main
)
task2 = PythonOperator(
task_id="train_model",
python_callable=processing.process,
trigger_rule='all_success', # Set this parameter to all_done
)
task1 >> task2 # This means task_2 will run after task_1
The Airflow working directory structure should be like below:
├── airflow.cfg
├── airflow.db
├── airflow-webserver.pid
├── dags
│ ├── anomaly_detection_IForest.png
│ ├── anomaly_detection_user address.png
│ ├── big_data_dag.py
│ ├── hello_world_dag.py
│ ├── kafka_stream_dag.py
│ ├── proj_repo
│ ├── _pycache_
│ └── stream_data.csv
├── logs
│ ├── dag_id=big_data_dag
│ ├── dag_id=hello_world_dag
│ ├── dag_processor_manager
│ └── scheduler
├── venv
│ ├── bin
│ ├── generated
│ ├── include
│ ├── lib
│ ├── lib64 -> lib
│ ├── LICENSE.txt
│ ├── pyvenv.cfg
│ └── share
└── webserver_config.py
- Note: proj_repo is the cloned repository
-
To view the dashboard, please visit: High Risk Transaction Detection Dashboard
-
Please find our tele-bot by id @crypto_centic_bot to get briefly report.
-
This is the demonstration of the pipeline process: demo