ELT with Mapping Dataflows
Task 2: Create Data warehouse tables for SmartFoods in Azure SQLDB 2
Task 3: Introduction to Mapping Data Flows 5
Slowly changing dimension type 2 withMapping dataflow (customerDim) 8
Task 1: Create a new Mapping Dataflow and add source dataset 8
Task 2: Add a parameter to Mapping Dataflow 12
Task 3: Break the Name field to firstName and lastName fields 13
Task 4: How to save (Publish) your Dataflow? 14
Task 5: Remove extra columns and Rename columns using “Select” transformation 17
Task 6: Calculate MD5 Hash of all non-key columns 18
Task 7: Add DW table source 20
Task 8: Compare staging records with DW records to identify updates and inserts 22
Task 9: Identify Updates/Inserts using “Conditional Split” transformation 23
Task 10: Handling New records (New stream) 24
Task 11: Handling Changed records (Changed stream) 25
Task 12: Putting together all inserts “New” and “Changed” together 29
Task 13: Generate Surrogate Keys 30
Task 14: Fix surrogate key value 31
Task 15: Add Batch columns to our “Insert” dataset 31
Task 16: Put Insert and Update records together 32
Task 17: Prepare the dataset for sink (Alter row transformation) 33
Task 18: Writing to destination DW table 35
Task 19: Preview the final dataset (Debug) 36
Task 20: Inspect the Dataflow “Script” 39
Task 21: Building the pipeline for the dataflow 40
Task 22: Debug the pipeline manually 42
Task 23: Enhance the pipeline 43
Task 24: (Challenge Task) Create an initial load pipeline for this DF 47
Solution requirements:
In previous exercise you ingested customer, orders and orderlines data from WWI to blob storage. Also, you ingested Customer, Transactions and reference data from SmartFoods systems to Blob storage. In this exercise you will use ADF Mapping DF to cleans, transform, enrich and store this data to be served using PowerBI to business users. Plus, data needs to be prepared for SmartFoods customer facing application to which displays accumulated loyalty points and comprehensive nutritional information and suggestions.
Analytics Reporting:
Since WWI business users are keen to setup a self-serve reporting environment, it means the serving layer storage solution should support the following requirements:
-
Role based access control plus row and column level security so data can be made available to all users and controlled at group level which rows and columns will be made available to each user group.
-
Dynamic Data Masking, certain PII information can be masked for certain user groups while they still have access to the rest of the data.
With the above requirements and considering this is only a POC, they decided to use Azure SQL Database for serving layer storage solution. The team acknowledges that after successful POC they will move this part of the solution to Azure Synapse Analytics.
Data Science:
In addition, the data science team decided to use Azure ML services to build ML/AI applications, particularly to support the nutritional suggestions based on the SmartFoods data. Hence, to avoid the need to export the cleansed data from SQL DB they requested the data to be stored in Blob storage as well if possible, at loading time. After considering all this requirement it was decided to use Parquet files on Azure Blob Storage (after POC to be replaced with Azure Data Lake Storage Gen2)
SmartFoods App:
Finally, the SmartFoods application (Web and Mobile) will need to access the data through an API and their primary requirements are performance, scalability and availability. After considering all this requirement it was decided to use Azure CosmosDB for application data storage.
In the Blob container we copied for SmartFoods there are multiple CSV files which represents SmartFood’s reference data for the transactions that comes through the HTTP API.
Here is the initial star schema we are building for SmartFoods DW. Later we will also introduce some aggregate tables for easier reporting.
Either using Query Editor in Azure Portal or using SSMS connect to your Azure SQL DB and create a schema for SmartFoods DW and all the tables by running the following SQL script.
Note: You may need to add your Client IP Address to your SQL DB through “Set Firewall” page.
CREATE SCHEMA SmartFoodsDW;
GO
CREATE TABLE [SmartFoodsDW].[customerDim](
[CustomerKey] [bigint],
[LoyaltyNum] [nvarchar](max),
[FirstName] [nvarchar](max) NULL,
[LastName] [nvarchar](max) NULL,
[City] [nvarchar](max) NULL,
[State] [nvarchar](max) NULL,
[Email] [nvarchar](max) NULL,
[Address] [nvarchar](max) NULL,
[PostCode] [nvarchar](max) NULL,
[MemberSince] [date] NULL,
[Dob] [date] NULL,
[RecInsertDt] [date] NULL,
[RecStartDt] [date] NULL,
[RecEndDt] [date] NULL,
[RecCurrInd] [bit] NULL,
[sourceLineage] [nvarchar](max),
[RecMd5Hash] [nvarchar](max)
) ;
GO
CREATE TABLE [SmartFoodsDW].[foodDim](
[sku] [nvarchar](max),
[foodKey] [bigint],
[desc] [nvarchar](max) NULL,
[foodGroup] [nvarchar](max) NULL,
[RecInsertDt] [date] NULL,
[RecStartDt] [date] NULL,
[RecEndDt] [date] NULL,
[RecCurrInd] [bit] NULL,
[sourceLineage] [nvarchar](max),
[RecMd5Hash] [nvarchar](max)
) ;
GO
CREATE TABLE [SmartFoodsDW].[foodNutDim](
[foodKey] [bigint],
[nutrientId] [nvarchar](max),
[nutritionValue] [float] NULL,
[desc] [nvarchar](max) NULL,
[nutUnit] [nvarchar](60) NULL,
[RecInsertDt] [date] NULL
);
GO
CREATE TABLE [SmartFoodsDW].[invoiceLineTxn](
[invoiceNumber] [nvarchar](max),
[lineNumber] [int],
[foodKey] [bigint],
[itemDesc] [nvarchar](max) NULL,
[itemFoodGroup] [nvarchar](max) NULL,
[uPrice] [float],
[qty] [bigint],
[gst] [float],
[lineTotalGstExc] [float],
[lineTotalGstInc] [float],
[sourceLineage] [nvarchar](max),
[recInsertDt] [date]
);
GO
CREATE TABLE [SmartFoodsDW].[invoiceTxn](
[invoiceNumber] [nvarchar](max),
[loyaltyNum] [nvarchar](max) NULL,
[CustomerKey] [bigint] NULL,
[store] [nvarchar](max) NULL,
[State] [nvarchar](max) NULL,
[lineItemCount] [bigint],
[invoiceTotalGSTinc] [float],
[invoiceTotalGSTexc] [float],
[InvoiceGst] [float],
[timestamp] [datetime2](7),
[sourceFileLineage] [nvarchar](max),
[recInsertDt] [date]
) ;
GO
Mapping Data Flows is a new feature of Azure Data Factory that allows you to build data transformations in a visual user interface (code-free or very low amount coding).
Mapping data flows are visually designed data transformations in Azure Data Factory. Data flows allow data engineers to develop graphical data transformation logic without writing code. The resulting data flows are executed as activities within Azure Data Factory pipelines that use scaled-out Apache Spark clusters. Data flow activities can be engaged via existing Data Factory scheduling, control, flow, and monitoring capabilities. More info in this article.
Mapping data flows provide an entirely visual experience with no coding required. Your data flows run on your execution cluster for scaled-out data processing. Azure Data Factory handles all the code translation, path optimization, and execution of your data flow jobs.
ADF translates the flow built in the visual interface to Apache Spark code which will run on serverless Spark cluster than we can configure in terms of count and type of worker nodes.
Serverless Spark cluster: The Apache Spark cluster will be deployed on Azure Integration Runtime and like Azure IR, which is serverless, the cluster is fully managed by Azure and charged per number of seconds the job takes to run.
Mapping DF on SH-IR? ADF Spark clusters are only deployable on Azure and currently there is no option for deploying on-prem.
Data flow canvas: Here is what the Data flow canvas look like. It is separated to three parts
The Graph: The graph displays the transformation stream. It shows the lineage of source data as it flows into one or more sinks. To add a new source, select Add source. To add a new transformation, select the plus sign on the lower right of an existing transformation.
You loaded SmartFoods’ customer staging data from API to Blob storage in CSV format in the following location “smartfoodsstaging/customer/smartfoods_customers_<date>.csv”
The ultimate table will look like below:
Field | Description | Source |
---|---|---|
CustomerKey bigint – primary key | Surrogate Key | Generated by ELT |
LoyaltyNum nvarchar | Source Key | Existing field in source |
FirstName nvarchar | From Name field | Generated by ELT |
LastName nvarchar | From Name field | Generated by ELT |
City nvarchar | Existing field in source | |
State nvarchar | Existing field in source | |
Email nvarchar | Existing field in source | |
Address nvarchar | Existing field in source | |
PostCode nvarchar | Existing field in source | |
MemberSince date | Existing field in source | |
Dob date | Existing field in source | |
RecInsertDt date | Actual ELT running date | Generated by ELT |
RecStartDt date | Record validity start date = batch date | Generated by ELT |
RecEndDt date | Record validity end date = batch date | Generated by ELT |
RecCurrInd Boolean | Record validity indicator | Generated by ELT |
sourceLineage nvarchar | Name of source file | Generated by ELT |
RecMd5Hash nvarchar | MD5 Hash of all source fields except natural key | Generated by ELT |
- Create a mapping Dataflow by clicking on new Data flow button and rename it to “SmartFoodsCustomerELT”
- At the top of the page turn on the “data flow debug” -> select AutoResolveIntegrationRuntime
Debug mode: Azure Data Factory mapping data flow's debug mode allows you to interactively watch the data shape transform while you build and debug your data flows. The debug session can be used both in Data Flow design sessions as well as during pipeline debug execution of data flows. To turn on debug mode, use the "Data Flow Debug" button at the top of the design surface. For more information check this article.
Note: By turning on Debug mode, ADF deploys a Spark cluster on the same region as your Integration Runtime.
-
Click “Add Source” on canvas
-
Change the output stream name to: “SmartFoodsCustomerStagingBlob”
-
Select the ‘SmartFoodsDelimitedTextBlob’
- Click “Debug Settings” on the top task bar
- Under “General” increase “Row limit” to 10,000
Row Limit: This is the maximum number of rows that will be retrieved from the source data set in when we try to preview the transformation results. If you are working with multiple datasets that needs to be joined, it is best to increase this to a higher limit, otherwise the join result in “preview” will have a lot of missing records.
- Under source options change “Column to store file name” to “sourceLineage”
Note: This enables the option to take the processing file name and pass it on as a column for every row in the dataset. This is really valuable information for testing, lineage and debugging.
-
Under parameters provide
-
“folder” = customer
-
“file” = smartfoods_customers_20200101
-
“fileType” = csv
-
Where did the parameters came from? We are reusing the parametrized dataset created previously for importing data from source systems to blob storage. These are the parameters that dataset needs to operate.
-
Go to “projection tab”
-
click “import Projection”
-
Change the data type for “Postcode” column from “short” to “string”
-
Note: When we load data from a delimited text or flat file, it is always recommended to double check the schema ADF detected and if needed fix it.
Why changing SKU from Short to String? 1. The destination sink (Azure SQLDB) does not recognized values of type short. 2. Postcodes can have leading 0 in it which gets eliminated in non-string type fields
- Now go to “Data Preview” tab and refresh it to get a preview of the dataset
Debug Cluster: We need “Debug” mode running for 1. Importing data projection (schema) 2. Running preview task.
Like ADF pipelines and datasets, Mapping dataflows also provide the option to create parameters. Dataflow parameters can be used within the dataflow itself (more on this later) and can be filled in by an ADF pipeline (more on this later). For now, let’s create two parameters.
-
Click on any white space of the DF canvas
-
Click on parameters tab
-
And create two parameters
-
MaxCustomerKey -- integer
-
BatchDt -- string
-
- Click the plus sign on the lower right-hand side of source transformation to add the next transformation.
-
Select “Derived column” transformation
-
Rename it to “AddFirstNameLastName”
-
Add a new column and for name type “FirstName” for value click on the right box and it opens the expression editor and enter following expression.
-
Click refresh to see the result of the expression on the data.
split(name," ")[1]
- Create another column for “LastName” using below expression
split(name," ")[2]
At this stage if you try validating or publishing a dataflow you will get an error like below
ADF mapping Dataflows does not allow publishing a DF that is still not complete, and every dataflow must have at least a “Sink” transformation to be considered complete.
To get around this issue and be able to save your work progress you can add a dummy “Sink” transformation to the end of you flow, publish your work and then remove it to continue building the flow until you reach a point that you add the actual “Sink” transform.
- After the “Source” Transform add a “New branch” transform
- Add a “sink” transform after the “New branch”
- Set up the sink as below:
Note1: As this AzureSqlTable1 dataset is parameterized, it will add two new parameters to debug settings. Just fill those prams with any random string (it will not affect your dataflow)
Note2: DO NOT forget to Remove the “New Branch” and dummy “Sink” transforms once you have the final actual “Sink” transform.
-
Add a “Select” transformation after the previous transformation to
- Remove the “name” column in favor or “firstName” and “LastName”.
- We are going to compare this data with existing data in the dimension to identify changes an newly added rows so for easier identification of columns we add an ‘i’ in front of all columns.
-
Rename your “Select” transformation to “FixColumnNamesRemoveName”
-
Set it up as below screenshot
- Preview the output of this transformation
Note: If you are not familiar with Slowly Changing Dimensions in Data Warehousing read this article in Wikipedia https://en.wikipedia.org/wiki/Slowly_changing_dimension
We are trying to build a SCD type 2 for customers table. This means no data will be discarded from the DW table. If a record changes in the OLTP source system this change gets captured by adding a new record with updated values and marking the old record as inactive(usually referred to as closing record)(recCurrInd), plus adding the date/timestamp of the record closure (recEndDate).
There are multiple technics to identify updated records, one is to compare the MD5 Hash of the existing records against the newly received record from source and if they do not match it is considered a change.
-
Add a ‘Derived column’ transformation to calculate MD5 Hash of all non-key columns
-
Rename it to “MD5Hash”
-
Add a column “iRecMd5Hash”
-
For expression use
md5( iif(isNull(iEmail),'',toString(iEmail))+
iif(isNull(iDob),'',toString(iDob))+
iif(isNull(iAddress),'',toString(iAddress))+
iif(isNull(iCity),'',toString(iCity))+
iif(isNull(iState),'',toString(iState))+
iif(isNull(iPostCode),'',toString(iPostCode))+
iif(isNull(iMemberSince),'',toString(iMemberSince))+
iif(isNull(iFirstName),'',toString(iFirstName))+
iif(isNull(iLastName),'',toString(iLastName)))
Expression explanation: For every non-key column first, we replace all Nulls with an empty string, then convert all fields to string and concatenate them together. Finally use the md5 method to calculate the hash of the whole concatenated string.
- Preview the output
To perform the comparison and identify updated or new records we need to also retrieve the existing data from DW dimension table (at first run the table is empty, hence every row will be determined as new).
-
On the far-left hand side of the canvas under the first source click “Add source” to add a new source to the flow.
-
Rename it to “SmartFoodsCustomerSQLDW”
-
For dataset Select “AzureSqlTable1” (You created this SQL Dataset at the beginning of the lab)
-
Go to “Debug Settings” and provide the below parameters:
- Go to Projection tab and import the dataset projection
- Add a filter transformation after the DW source
Note: The filter transformation is used to only get ‘active’ rows (RecEndDt is Null) from the table.
-
Rename it to “CurrentRecordsOnly”
-
Filter on:
isNull(RecEndDt)
Next step is to compare the records from staging and DW. We use a “join” transformation for this purpose. As the staging dataset will have records that do not exists in DW dataset we need to use an “left outer join” the sudo code for the join is:
SELECT * FROM staging s
LEFT OUTER JOIN
dw_table d
ON
s. key = d.key
-
Add a join transformation after “MD5Hash” transformation
-
Rename it to “JoinStagingToDWDim”
-
For right stream select “CurrRecsOnly”
-
Join type: “Left outer”
-
Join conditions: iLoyaltyNum == LoyaltyNum
Note: Since we renamed the staging columns with an ‘i’ in front of them it is quite easy to find the right column for joins here.
A “Conditional Split” transformation allows us to split an incoming dataset into multiple outgoing datasets based on some logical criteria.
Here we need to find out
-
If a record is new (if the right-hand side [records from DW] of left outer join is null)
-
If a record has Changed (if the primary key existed within both staging and DW datasets but MD5hashes are not matching).
-
If a record has Not Changed (if the primary key existed within both staging and DW datasets and MD5hashes are matching).
-
Add a “Conditional split” transformation after “JoinStagingToDWDim” transformation.
-
Rename it to “SDC2Split”
-
For Split on option set it to “First matching condition”
Split On: If we set this to “First matching condition” the first condition a record fits with will be pushed to that stream and condition(s) after that will not be tested on the record. First. This option is more efficient in processing but has two implications: 1. The order of conditions becomes important (stricter conditions should be placed above less strict ones) 2. Every record only gets passed into a single stream. If your workflow logic requires input records to get passed into multiple output streams choose “All matching conditions”
- Split Condition
- New:
isNull(LoyaltyNum)
- Changed:
!(isNull(LoyaltyNum)) && (iRecMd5Hash !=RecMd5Hash)
- Unchanged:
!(isNull(LoyaltyNum)) && (iRecMd5Hash == RecMd5Hash)
So far this is how your dataflow should look like. (Conditional split added 3 streams to our flow which we need to manipulate the output from – We will refer to these transformation as New, Changed and Unchanged)
First stream out of condition split is “New”. For every new records we need to only insert a new record to the table.
- Add a “Select” transformation after “New” stream.
Note: This select is going to perform two tasks: 1. Select the columns coming from Staging and remove all other columns (Join transform added column from both staging and DW to our dataset). 2. Remove the extra ‘i’ we added to the front of the staging columns to identify them easier.
-
Rename it to “SelectNewInsert”
-
Use the below screenshot as guide on setting the “Select” transformation.
Note: Total Number of columns selected is 12. Only columns with a leading ‘i’ will be selected and the output name will not have a leading ‘i’
Second stream out of condition split is “Changed”. For Changed records we need to insert a new record and update an existing record. So, we need to add a “New branch” transformation after “Changed”.
New Branch: allows us to replicate the output of a transformation to two streams. Essentially a “New branch” is not a transformation but only duplicating the output of a stream.
This is how our flow will look like:
Updating existing changed records (Closing old records)
- After the first “Changed” stream add a “Select” transform and rename it to “SelectChangedUpdate”
This stream is going to perform updated for records.
- We will select a total of 14 columns in this select as below. Here we select the columns WITHOUT the leading ‘i’ as we want the column values from DW. The only columns we are updating on these records are 1. RecEndDate 2. RecCurrInd
-
After this add a “Derived Column” Transform to finally add the two columns we are updating.
-
Rename it to “UpdateRecsBatchColumns”
- Add “RecEndDt” column:
toDate($BatchDt, 'yyyy-MM-dd')
Expression explanation: Here we are using the “BatchDt” parameter (In Dataflow expression a ‘$’ is used to plant a parameter in an expression). You created this pram in the beginning of the exercise. Also the parameter is of type ‘String’ so we pass it to ‘toDate’ function to transform it to a date
Why $BatchDt and not just use ‘currentDate()’ function instead? The answer is there is no guarantee the batch always runs on the same date that the record change has happened. Specially on initial loading of the data, we would process multiple days on the same day to catchup with the current date.
- Add RecCurrInd column:
false()
Adding new version of the changed records
-
After the second “Changed” stream add a “Select” transform and rename it to “SelectChangedInsert”
-
We will select a total of 12 columns in this select as below. Here we select the columns WITH the leading ‘i’ as we want the column values from staging source.
For the “Unchanged” stream we leave it without any transformation after it.
Out of stream “New” (marked 1 in above figure) and stream “Changed” (marked 3 in above figure) we have two sets of records which needs to be inserted in to the DW table. The next logical step is to merge the two set into one using a “Union” transformation.
- After “SelectNewInsert” transform add a “Union” transformation and rename it to “ALLInserts”
-
Select Union by: Name
-
Union With: SelectChangedInsert
Union transformation options: Union transformation allows merging two data sets either by matching column names (Union by name) or merge columns from two datasets in the order of columns they are. And “Union With” is where we select the other dataflow stream(s) that we want to union with the current stream. You can add multiple streams by using the Plus sign to add more streams.
- Preview the output of the transform.
For all the records to be inserted in the DW table we need to generate a surrogate key (more information here).
Surrogate key transformation essentially is a sequential number generator. The “Key Column” sets the name of the column in the output dataset for surrogate key and the “Start Value” designates the integer number that the sequential number starts at.
-
After “AllInserts” transform add a “SurrogateKey” transromation
-
Rename it to “SurrogateKey”
-
Key column: CustomerKey
-
Start value: 1
The Surrogate key issue: With the above settings every time the dataflow runs, a number will be generated for each record which passes through the “surrogate key” transform, starting at 1. So, if we leave things as is the surrogate key generated will be worthless as it will duplicated without any relation to existing records in the DW table.
The solution: To solve this problem we need to know the maximum value of surrogate key in the DB and add it to the generated number.
At the beginning of this exercise, you created a parameter named “MaxCustomerKey”. The purpose of this parameter is to hold the biggest surrogate key currently in the DW table and add it to the generated number.
The parameter gets filled in by the ADF pipeline that dataflow runs in it. Once dataflow is ready to be place in pipeline we will precede it with a “lookup” activity which retrieves the maximum of “CustomerKey” in the table and pass it on to the DF through the parameter (More on this later)
-
After the “SurrogateKey” transromation add a “Derived Column” transformation
-
Rename it to “AddMaxCustomerKey”
-
Set it like the screenshot
CustomerKey:
CustomerKey+$MaxCustomerKey
This transformation takes the generated surrogate key sequential number and add the maximum surrogate key value to it.
Our “Insert” dataset is still missing the following 4 columns. In this task a “Derived Column” activity is used to generate them and add them to the dataset.
RecInsertDt date | Actual ELT running date | Generated by ELT |
---|---|---|
RecStartDt date | Record validity start date = batch date | Generated by ELT |
RecEndDt date | Record validity end date = batch date | Generated by ELT |
RecCurrInd Boolean | Record validity indicator | Generated by ELT |
-
Add a “Derived Column” transformation
-
Rename it to InsertRecsBatchColumns
-
Columns:
- RecInsertDt:
currentDate()
- RecCurrInd
true()
- RecStartDt
toDate($BatchDt,'yyyy-MM-dd')
- RecEnddt
toDate(toString(null()))
Now we need to merge the insert stream and update stream together in to a single stream in preparation for pushing it to the destination.
-
Add a “Union” transform
-
Rename it to “UnionInsertUpdates”
-
Union with: “UpdateRecsBatchColumns”
So far, your Dataflow should look like below
Note: If you hover over any transformation it tells you how many output columns are coming out of that transformation. To double check you work the last transformation should have 16 columns
Now that our dataset is ready to be written to the DW table. Next logical step is to add a “Sink” transform and complete the data flow. let’s see what happens if we go ahead and add sink transform
A sink transformation that is writing to a Database (or DW) and requires to perform anything other than insert (update, upsert or delete) on the destination table, requires an additional transformation to mark each row with the type of operation “Sink” transform is expected to perform. That transformation is called “Alter Row”.
Alter row: Alter row transformation is used when sink is an RDBMS (DB or DW) and is expected to perform Update, Upsert or Delete.
-
After “UnionInsertUpdates” transform add an “Alter row” transform
-
Rename it to “MarkRow”
-
Add two Alter row conditions:
- Update if
!isNull(RecEndDt)
- Insert if
isNull(RecEndDt)
Update if: this is a new record (RecEndDt IS NOT Null)
Insert if: This is an existing record being closed (RecEndDt IS Null)
Now the dataflow is ready to add the Sink Transformation and write to destination table.
-
After “MarkRow” transform ad a “Sink” transform
-
Rename it to “DBSink”
-
Dataset: AzureSqlTable1
-
Go to “Settings” Tab
-
Tick “Allow Insert”
-
Tick “Allow Update”
-
Key Columns: “CustomerKey”
Reminder: Delete the “New Branch” and dummy “Sink” transforms!
The final task before publishing the dataflow is to have a look at the final dataset through “Data preview” in the “Sink” transform.
- Go to “Debug Settings” and make sure your parameters matches below screenshots.
- In “DBSink” Under “Data Preview” click “Refresh”
In the figure above number 3 shows the total number of rows that going to go through this transformation. In “Debug” mode this number is capped at either the “Row Limit” or the actual maximum number of rows in the source. Here our “Row limit” is 10,000 and the file has 5,000 rows in it. Also Number 4 indicates what type of action is being perform on each of the preview rows, the green plus sign indicates an “insert”.
Note: Running “Data preview” on “Sink” transform WILL NOT actually write anything to the destination. It only previews what the write would look like. ADF Mapping DF “Debug” will not perform any action on “Sink” transformation until the DF is placed in an ADF pipeline and debugged from there.
ADF mapping DF, generates a script for the GUI changes we make. You can access and update this script by clicking the “Script” button on the top ribbon.
Note: In the Github repo under ./Hands-on-lab/Part2/Appendix there is a file named “SmartFoodsCustomerELTScript.txt” with the complete solution script. There are instructions inside the file on how to use it.
Now that the dataflow is published the next task is to create a pipeline and add the dataflow to it.
-
Create a pipeline
-
Rename it to “SmartFoodsCustomerELTBlobToSql”
-
Under “Move & Transfrom” drag a “Data flow” activity to the pipeline.
-
Select “SmartFoodsTransactionELT” from list of Dataflows
-
The setting tab of the DF activity should look like below figure. All the dataset parameters surfaced on the “Settings” tab and dataflow’s parameters are under “parameters” tab
As a first step, we will debug the pipeline and enter the parameters manually. Then we modify the pipeline to a more production ready state.
-
From parameters tab click on “MaxCustomerKey” value and select pipeline expression
-
Fill in the dataset parameters as:
- Fill in dataflows patamers as:
-
Click “Debug”.
-
Examine the dataflow monitoring by click the “eye glass” icon – (More on Dataflow monitoring later)
- Once the debug finishes successfully examine the table contents either through SQLDB “Query editor” or using SSMS.
Once we debugged the pipeline with manual parameters successfully next step is to automate those parameters filling like what we have done for pipelines loading data from the source.
-
Add a lookup activity before the dataflow activity to retrieve the maximum Customerkey
-
Rename it to “MaxCustomerKey”
- Set up the lookup as below:
Query:
select coalesce(max(CustomerKey),0) maxSK from SmartFoodsDW.customerdim
- Create a pipeline parameter as BatchDt
- Go to DF activity and setup dataset parameters as
-
Folder: customer
-
File:
smartfoods_customers_@{replace(pipeline().parameters.BatchDt,'-','')}
-
Filetype: csv
-
Schema: SmartFoodsDW
-
tableName: customerDim
-
Schema: SmartFoodsDW
-
tableName: customerDim
- Under parameters tab setup dataflow parameters as:
- MaxCustomerKey -> pipeline expression
@activity('MaxCustomerKey').output.firstRow.maxSK
- BatchDt -> pipeline expression
@pipeline().parameters.BatchDt
- Now try debugging this pipeline and the only parameter that needs to be passed is “BatchDt” pipeline parameter.
Create another pipeline to run multiple dates of the Customer ELT pipeline in a loop
Hint1: create an array variable with values:
["2020-01-01","2020-01-02","2020-01-03","2020-01-04","2020-01-05","2020-01-06"]
Hint2: Use ForEach Loop.
Hint3: ForEach loop must run sequentially not in parallel. (why?)
Hint4: Inside the loop use execute pipeline activity.
Hint5: If you needed extra help check the screenshots below.