Skip to content

Commit

Permalink
aws-blog-event-driven-batch-analytics
Browse files Browse the repository at this point in the history
  • Loading branch information
karthiksonti committed Sep 20, 2016
1 parent 7fea6b0 commit 960393e
Show file tree
Hide file tree
Showing 21 changed files with 769 additions and 0 deletions.
137 changes: 137 additions & 0 deletions aws-blog-event-driven-batch-analytics/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
# Building Event Driven Batch Analytics on AWS

## Overview

This repository contains the code that supports the [AWS Big Data Blog Post](https://blogs.aws.amazon.com/bigdata/)

### Usecase Description
Yummy Foods has franchise stores all over the country. These franchise stores run on hetrogenous platforms and they submit cumulative transaction files to Yummy Foods at varuious cadence levels through out the day in tab delimited .tdf format. Some of these franchise stores due to their system limitations ocassionally send additional data starting with characters such as “----“.

The requirement is to be able to update insights on the sales made by each franchise for a given item through out the day as soon as the complete list of franchise files from a given province are available. The number of franchises per province is fixed and seldom changes.

The aggregation job for given province should not be submitted until the configured number of franchise store files from that province are available and also until the product master data update is posted at the beginning of the day. A master data update is identified by the presence of atleast one “Item.csv” file for that day

The aggregation job should consider only transaction codes 4 (sale amount) , 5 (tax amount) and 6 (discount amount). Rest of the codes can be ignored. Once the aggregation job is completed only one record should exist for a combination of franchise store,item and transaction date


### Pre-Requisites
1. Create VPC with at least one private "MyPrivateSubnet" and one public subnet "MyPublicSubnet"
2. Create a NAT Gateway or NAT Instance for [lambda functions in private subnet](https://aws.amazon.com/blogs/aws/new-access-resources-in-a-vpc-from-your-lambda-functions/) to be able to access internet
3. Create a role "myLambdaRole" with AWSLambdaVPCAccessExecution, AWSLambdaRole, ElasticMapReduceForEC2Role,S3 and Cloudwatch access policies
4. Create security group "MySecurityGroup" with inbound MySQL (3306) and Redshift (5439) ports open.
5. Jar file with all dependencies is already available in S3 at this location. Download it your local environment [location](s3://event-driven-batch-analytics/code/eventdrivenbatchanalytics.jar).
6. If you wish to build your own jar,download mySQL JDBC driver and Redshift JDBC Driver and add it to your maven repository

### Getting Started

1. Create [Amazon RDS Mysql 5.7.x instance](http://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/CHAP_GettingStarted.CreatingConnecting.MySQL.html)
2. Connect to the mysql database instance through your preferred SQL client and execute sql statements inside resources/edba_config_mysql.sql.
3. Create a two node dc1.large [Redshift cluster](http://docs.aws.amazon.com/redshift/latest/mgmt/managing-clusters-console.html#create-cluster)
4. Connect to the cluster through your preferred SQL client and execute statements inside resources/edba_redshift.sql file
5. Update LambdaContainer.java with your mySQL endpoint and credentials
6. Update ProcessVendorTransactions.java with your Redshift endpoint and credentials
7. Build the jar by executing maven shade package. You need to execute this command from the directory where the pom.xml is located

```
mvn package
```
8. Create S3 bucket

```
aws s3 mb event-driven-batch-analytics
```
9. Create Validation/Conversion Layer Lambda function

```
aws lambda create-function --function-name validateAndNormalizeInputData --zip-file fileb:///<<MyPath>>/eventdrivenbatchanalytics.jar --handler com.amazonaws.bigdatablog.edba.LambdaContainer::validateAndNormalizeInputData --role arn:aws:iam::<<myAccountNumber>>:role/<<myLambdaRole>> --runtime java8 --timeout 120
```
10. Provide S3 permissions to invoke the Validation Layer lambda function

```
aws lambda add-permission --function-name auditValidatedFile --statement-id 2222 --action "lambda:InvokeFunction" --principal s3.amazonaws.com --source-arn arn:aws:s3:::event-driven-batch-analytics --source-account <<MyAccount>>
```
11. Create "Input Tracking Layer" lambda function

```
aws lambda create-function --function-name auditValidatedFile --zip-file fileb:///<<MyPath>>/eventdrivenbatchanalytics.jar --handler com.amazonaws.bigdatablog.edba.LambdaContainer::auditValidatedFile --role arn:aws:iam::<<myAccountNumber>>:role/lambdas3eventprocessor --runtime java8 --vpc-config '{"SubnetIds":["MyPrivateSubnet"],"SecurityGroupIds":["MySecurityGroup"]}' --memory-size 1024 --timeout 120
```
12. Provide S3 permissions to invoke "Input Tracking Layer" lambda function

```
aws lambda add-permission --function-name auditValidatedFile --statement-id 2222 --action "lambda:InvokeFunction" --principal s3.amazonaws.com --source-arn arn:aws:s3:::event-driven-batch-analytics --source-account 203726645967
```
13. Configure events in S3 to trigger "Validation/Conversion Layer" and "Input Tracking Layer" lambda functions

```
aws s3api put-bucket-notification-configuration --notification-configuration file:///<<MyPath>>/put-bucket-notification.json --bucket event-driven-batch-analytics
```
14. Create EMR Job Submission Layer lambda function. This function will submit a EMR job if the respective configured criteria has been passed

```
aws lambda create-function --function-name checkCriteriaFireEMR --zip-file fileb:///<<MyPath>>/eventdrivenbatchanalytics-0.0.1-SNAPSHOT.jar --handler com.amazonaws.bigdatablog.edba.LambdaContainer::checkConditionStatusAndFireEMRStep --role arn:aws:iam::<<myAccountNumber>>:role/lambdas3eventprocessor --runtime java8 \
--vpc-config '{"SubnetIds":["MyPrivateSubnet"],"SecurityGroupIds":["MySecurityGroup"]}' --memory-size 1024 --timeout 300
```
15. Schedule CloudWatch Event to fire every 10 minutes to verify whether any Aggregation Job submission criteria is passed

```
aws events put-rule --name scheduledEMRJobRule --schedule-expression 'rate(10 minutes)'
```
16. Give CloudWatch events rule permission to invoke "scheduledEMRJobRule" lambda function

```
aws lambda add-permission \
--function-name checkCriteriaFireEMR \
--statement-id checkCriteriaFireEMR \
--action 'lambda:InvokeFunction' \
--principal events.amazonaws.com --source-arn arn:aws:events:us-east-1:<<myAccountNumber>>:rule/scheduledEMRRule
```
17. Configure "checkCriteriaFireEMR" Lambda function as target for the "scheduledEMRJobRule" CloudWatch event rule

```
aws events put-targets --rule scheduledEMRJobRule --targets '{"Id" : "1", "Arn": "arn:aws:lambda:us-east-1:<<myAccountNumber>>:function:checkCriteriaFireEMR"}'
```
18. Create EMR Job Monitoring Layer lambda function. This function will update AGGRJOBCONFIGURATION table with status of a RUNNING EMR step

```
aws lambda create-function --function-name monitorEMRAggregationJob --zip-file fileb:///<<MyPath>>/eventdrivenbatchanalytics-0.0.1-SNAPSHOT.jar --handler com.amazonaws.bigdatablog.edba.LambdaContainer::monitorEMRStep --role arn:aws:iam::<<myAccountNumber>>:role/lambdas3eventprocessor --runtime java8 \
--vpc-config '{"SubnetIds":["MyPrivateSubnet"],"SecurityGroupIds":["MySecurityGroup"]}' --memory-size 500 --timeout 300
```
19. Schedule CloudWatch Event to monitor submitted EMR jobs every 15 minutes

```
aws events put-rule --name monitorEMRJobRule --schedule-expression 'rate(15 minutes)'
```
20. Give Cloudwatch event rule permission to invoke "monitorEMRAggregationJob" lambda function

```
aws lambda add-permission \
--function-name monitorEMRAggregationJob \
--statement-id monitorEMRAggregationJob \
--action 'lambda:InvokeFunction' \
--principal events.amazonaws.com --source-arn arn:aws:events:us-east-1:<<myAccountNumber>>:rule/monitorEMRJobRule
```
21. Configure "monitorEMRAggregationJob" lambda function as target for "monitorEMRJobRule"

```
aws events put-targets --rule monitorEMRJobRule --targets '{"Id" : "1", "Arn": "arn:aws:lambda:us-east-1:<<myAccountNumber>>:function:monitorEMRAggregationJob"}'
```
22. Download the files from resources/sampledata/ to your local directory and from the directory where you downloaded the files to, upload them to S3://event-driven-batch-analytics/ with prefix data/source-identical.

```
aws s3 sync . s3://event-driven-batch-analytics/data/source-identical/
```
23. Observe the timestamps of CloudWatch logs for each of the lambda functions being created and updated. Notice that there are no errors recorded
21) After around 10 minutes, connect to the MySQL client and verify whether any jobs have been submitted. The schedule interval will determine the delay

```
select job_config_id from aggrjobconfiguration where last_exec_status = 'RUNNING';
```
24. Verify that the job configuration "J101" (Vendor transactions posted from the state of "Illinois") is in "RUNNING" state

25. Connect to the redshift cluster and verify that the data in the tables "vendortranssummary" is populated for the vendor transactions.

26. If for any reason a job is failed, execute the below query to find out the impacted files

```
select t1.job_config_id,t2.file_url,t2.last_validated_timestamp from aggrjobconfiguration t1 join ingestedfilestatus t2 on json_contains(t2.submitted_jobs,json_array(t1.job_config_id))=1 where t1.last_exec_status='FAILED';
```
45 changes: 45 additions & 0 deletions aws-blog-event-driven-batch-analytics/State-Management-Store.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@


## State Management Store

A simple statement management store can be accomplished with two tables: one for storing aggregation job configurations (AGGRJOBCONFIGURATION) and another for storing the ingested file status (INGESTEDFILESTATUS). Here are the columns in each table and their descriptions. The DDL and the EMR job configurations for the tracking store can be found here

Sample records for the AGGRJOBCONFIGURATION are also provided here as a reference. They indicate that the job for Illinois state will not be submitted unless there are alteast 13 vendor transaction files are posted and master data for Item data is posted. Similar rules were also configured for the state of California with the execption of minimum file count is 25.



### AGGRJOBCONFIGURATION
| Column | Description |
| ------ | -------------- |
| job_config_id | Job Configuration Identifier |
| job_input_pattern | The file pattern that this job cares about. The EMR Job Submission Layer lambda function checks whether the timestmap for these files is later than the timestamp when the job ran last time |
| job_min_file_count | The minimum number of files that should be collected before submitting this job |
| job_addl_criteria | In addition to the default timestamp check (explained above), if your job configuration needs additional criteria, express your criteria in the form of SQL statement |
| job_params | EMR step configuration parmaeters (example: spark-submit, --deploy-mode cluster, class, jar etc) |
| last_exec_stepid | The last submitted EMR step id . A combination of clusterid:stepid will be stored in this column |
| last_exec_status | The status (COMPLETED/FAILED/RUNNING) of the EMR step that has been submitted for this configuration |
| last_run_timestamp | The last time when this job was run. |



### INGESTEDFILESTATUS
| Column | Description |
| ------ | ------------ |
| file_url | The complete key of the input file including the bucket name |
| submitted_jobs | A JSON list the jobs that were submitted with this file. When a new update of this file is received , this array will be reset to null. By a join on this column and job_config_id column AGGRJOBCONFIGURATION table , the files related to a FAILED job or RUNNING job or COMPLETED job can be obtained |
| last_update_status | Indicates whether the latest update on this file has been validated or not |
| last_validated_timestamp | The last time when a valid update on this file is received |

These two tables are read by the Lambda functions in the EMR Job Submission and Monitoring Layer that we are going to see next. A variation of this design is to have code component to be executed for “additional_criteria” instead of sql statements and may be also to extend it to beyond EMR (for example, a Data Pipeline job). The data models shown here are just an indication of how this layer can be used. You may need to tweak them to suit your specific need.


### Sample AGGRJOBCONFIGURATION Records

Here are the sample records for the use case we are walking through. The job configuration "J101" indicates that there need be at least 13 files collected from Illinois, identified by having IL in the file prefix, and an update from on Item master data, identified by Item%.csv, posted in the last 24 hours

The job configuration "J102" is similar to the configuration "J101" with the exception that the file prefix will have "CA" for California province files and the number of vendor transactions to be collected are at least 25

| job_config_id | job_input_pattern | job_min_file_count | job_params | additional_criteria | last_exec_stepid | last_exec_status | last_run_timestamp |
| ------------ | ---------------- | --------- | ----------- | ---------- | -------- | --------- | ---------- |
| J101 | ingestedfilestatus.file_url like %validated%IL%.csv | 13 | spark-submit,--deploy-mode,cluster,--class,com.amazonaws.bigdatablog.edba.emr.ProcessVendorTrasactions,s3://event-driven-batch-analytics/code/eventdrivenanalytics.jar,s3://event-driven-batch-analytics/validated/data/source-identical/IL*.csv | select 1 from ingestedfilestatus where file_url like '%Item%.csv' and last_validated_timestamp > current_timestamp - interval 1 day | | | |
| J102 | ingestedfilestatus.file_url like %validated%CA%.CSV | 25 | spark-submit,--deploy-mode,cluster,--class,com.amazonaws.bigdatablog.edba.emr.ProcessVendorTrasactions,s3://event-driven-batch-analytics/code/eventdrivenanalytics.jar,s3://event-driven-batch-analytics/validated/data/source-identical/CA*.csv | select 1 from ingestedfilestatus where file_url like '%Item%.csv' and last_validated_timestamp > current_timestamp - interval 1 day | | | |
109 changes: 109 additions & 0 deletions aws-blog-event-driven-batch-analytics/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.amazonaws.bigdatablog</groupId>
<artifactId>eventdrivenbatchanalytics</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>eventdrivenbatchanalytics</name>
<url>http://maven.apache.org</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>
com.amazonaws
</groupId>
<artifactId>aws-lambda-java-core</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-events</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>1.10.58</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-emr</artifactId>
<version>1.10.58</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<!-- http://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-sns -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.6.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-redshift_2.10</artifactId>
<version>0.6.1</version>
</dependency>
<dependency>
<groupId>com.amazon</groupId>
<artifactId>redshift</artifactId>
<version>1.1.13</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sts</artifactId>
<version>1.9.6</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>1.10.58</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
create table edbaconfig.ingestedfilestatus(
file_url varchar(500) primary key,
submitted_jobs json,
last_update_status varchar(50),
last_validated_timestamp timestamp,
unique(file_url,last_update_status)
);

create table edbaconfig.aggrjobconfiguration(
job_config_id varchar(10) primary key,
job_input_pattern varchar(500),
job_min_file_count int,
job_addl_criteria varchar(1000),
job_params varchar(1000),
last_exec_stepid varchar(50),
last_exec_status varchar(20),
last_run_timestamp timestamp
);

insert into edbaconfig.aggrjobconfiguration(job_config_id,job_input_pattern,job_min_file_count,job_addl_criteria,job_params,last_exec_stepid,last_exec_status,last_run_timestamp)
values('J101','ingestedfilestatus.file_url like \'%validated%IL%.csv\'',13,'select 1 from ingestedfilestatus where file_url like \'%Item%.csv\' and last_validated_timestamp > current_timestamp - interval 1 day','spark-submit,--deploy-mode,cluster,--class,com.amazonaws.bigdatablog.edba.emr.ProcessVendorTrasactions,s3://event-driven-batch-analytics/code/eventdrivenanalytics.jar,s3://event-driven-batch-analytics/validated/data/source-identical/IL*.csv',null,null,null);

insert into edbaconfig.aggrjobconfiguration(job_config_id,job_input_pattern,job_min_file_count,job_addl_criteria,job_params,last_exec_stepid,last_exec_status,last_run_timestamp)
values('J102','ingestedfilestatus.file_url like \'%validated%CA%.csv\'',25,'select 1 from ingestedfilestatus where file_url like \'%Item%.csv\' and last_validated_timestamp > current_timestamp - interval 1 day','spark-submit,--deploy-mode,cluster,—-class,com.amazonaws.bigdatablog.edba.emr.ProcessVendorTrasactions,s3://event-driven-batch-analytics/code/eventdrivenanalytics.jar,s3://event-driven-batch-analytics/validated/data/source-identical/CA*.csv',null,null,null);
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE TABLE public.vendortranssummary (
"vendor_id" varchar(25),
"item_id" varchar(25),
"trans_date" date,
"sale_amount" numeric(5,2),
"tax_amount" numeric(5,2),
"discount_amount" numeric(5,2)
)
compound sortkey(vendor_id,item_id,trans_date);
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
v101 450.00 4 i101 2016-06-01 some-data some-data some-data
v101 450.00 5 i101 2016-06-01 some-data some-data some-data
v101 450.00 6 i301 2016-06-01 some-data some-data some-data
v101 450.00 4 i301 2016-06-01 some-data some-data some-data
v101 450.00 5 i101 2016-06-01 some-data some-data some-data
v101 450.00 6 i101 2016-06-01 some-data some-data some-data
v101 450.00 4 i301 2016-06-01 some-data some-data some-data
v101 450.00 13 i201 2016-06-01 some-data some-data some-data
Loading

0 comments on commit 960393e

Please sign in to comment.