Data Streaming Pipeline that sends tweets and images to an Azure CosmosDB via APIM and Azure Functions, with visualization in PowerBI
This project is a prototype of a Data Pipeline that emulates streaming data of twitter messages to a Microsoft Azure REST API endpoint. This streaming data is then stored in an Azure blob storage container, and pushed to an Azure Event Hubs Instance. Then the data is persisted in an Azure CosmosDB database, and finally consumed in Power BI.
Here's a high-level overview of the end-to-end pipeline architecture
- Connect: API Management
- Process: Azure Functions
- Buffer: Event Hub
- Store and Serve: Blob Storage
- Database: CosmosDB
- Consume: Power BI
Azure Functions is an event-driven, compute-on-demand serverless offering that hosts a single method or function in a programming language that runs in response to an event. The programming language used here is python. We have two Azure functions in this pipeline. One of them is triggered by an API POST request, and the other is triggered by a push into Event Hubs.
Azure API Management is a fully managed service that helps customers to securely expose their APIs to external and internal customers. It serves as a gatekeeper and front door for API implementations, enabling frictionless consumption by developers. APIM comes with features like:
- Securing your API, including setting quotas and request throttling
- A developer portal for users of your API, which included documentation ansd security key management.
- Self-service subscription for API users
- Monitor usage of your API
Event Hubs represents the front door for an event pipeline. It is often used as an event ingestor in Solution Architecture. An event ingestor is a component or service that sits between event publishers and event consumers to decouple the production of an event stream from the consumption of these events. Event Hubs provides a unified streaming platform with time retention buffer, decoupling event producers from event consumers
CosmosDB is a fully managed NoSQL database for mordern application development. It has several API options, depending on particular use cases, such as key-value, wide-column, graph and document databases. For new projects, Microsoft Azure recommends the use of the core (NoSQL) API.
The data is sources from 2 Kaggle projects:
-
Hurricane Harvey Tweets Tweets containing hurricane harvey in the message from the morning of the tropical storm event (20th August 2017) up to when it became a Hurricane and then downgraded back to a tropical storm.
-
Satellite Images of Hurricane Damage Satellite images from Texas after the Hurricane. This dataset has been divided into 2 labels, for Machine learning purposes, to show photos that show damage and photos that do not show damage.
Original Data is sourced from this location
- WSL2
(Ubuntu 22.04)
environment for Windows users - VSCode with Microsoft Azure and Remote containers extensions
- Python
- A Microsoft Azure Account with minimal spending
- Docker and Docker-Compose
The goal of this phase is to create a JSON file that contains tweet messages merged with images, with the images first encoded into base64 format. Then these merged tweet and image messages are sent as a request body to the REST API in Azure APIM. Python scripts have been prepared to perforom the following tasks, which can indeed be orchestrated in a regular schedule to procuce both batch or streaming data, depending on the use case of the prototype.
- Preprocess the tweets, and convert them to JSON format
- Preprocess the images and convert them to JSON format
- Merge image and messages into single JSON schema
- Push merged tweets to API to simulate streaming data
- Developing Azure Function in Python using VSCode extension, which includes Azure Functions SDK, Docker CLI, and related dependencies. The code ingests body of HTTP request, and validates JSON message against a predefined JSON schema.
- Test function in Azure Function App from local project workspace.
- Test function with the dataset preprocessed.
- Deploy Azure Function App from your local project workspace.
- Test function in Azure, then implement function type authorization to secure your API.
- Bind function output to an Azure Blob Storage container. This can be done using the vscode extention functionalities that adds configuration to the
function.json
file.
- Create a new Azure API Management instance
- Add API to APIM to expose and protect the Azure Function as a backend
- Test it from within the APIM and locally from Insomnia
- Add a basic authentication policy to a default subscription key to secure the API. This is done by: a. creating a key vault, b. enabling acess to key vault secrets, c. adding a username and password, d. creating names values in APIM with key vault url, e. adding an inbound policy definition f. Testing request with basic authorization header
- Test it out from your API and Imported function app as well as from your python program,
push_tweets.py
- Create an Event Hubs namespace - standard tier
- Create an Event Hub with 2 partitions and a 1-day message retention
- Capture events using first Azure function and Azure Event Hubs Capture
- Include an Azure Event Hubs binding in the Azure Function.
Create CosmosDB Instance and create second Azure Function to pull tweets from Event Hubs into CosmosDB
- Create a database with one container
- Create Azure function to take tweets from Event Hub Instance and write it to CosmosDB
In this step, we create a live connection in PowerBI to Azure CosmosDB, and watch the real time and interactive changes as streaming messages are generated from the python scripts, through the Azure functions, into Event Hubs, and finally into CosmosDB
Power BI Dashboard report on Power BI Desktop
Logging information from Function App
Live metrics from Azure Function processing of streaming API calls
Application Insights Monitoring Dashboard
PowerBI Dashboard view in PowerBI service workspace
PowerBI Lineage information showing data sourced from CosmosDB