Skip to content

A Kafka Source connector to receive data from REST APIs and publish them to Kafka. It has an extended version to support FitBit APIs.

License

Notifications You must be signed in to change notification settings

RADAR-base/RADAR-REST-Connector

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Kafka Connect REST Source and Fitbit Source

This project contains a Kafka Connect source connector for a general REST API, and one for Fitbit in particular. The documentation of the Kafka Connect REST source still needs to be done.

Fitbit source connector

Installation

This repository relies on a recent version of docker and docker-compose as well as an installation of Java 17 or later.

Usage

Generally, this component is installed with RADAR-Kubernetes. It uses Docker image radarbase/kafka-connect-rest-fitbit-source.

First, register a Fitbit App with Fitbit. It should be either a server app, for multiple users, or a personal app for a single user. With the server app, you need to request access to intraday API data.

For every Fitbit user you want access to, copy docker/fitbit-user.yml.template to a file in docker/users/. Get an access token and refresh token for the user using for example the Fitbit OAuth 2.0 tutorial page.

For automatic configuration for multiple users, please take a look at scripts/REDCAP-FITBIT-AUTH-AUTO/README.md.

Copy docker/source-fitbit.properties.template to docker/source-fitbit.properties and enter your Fitbit App client ID and client secret. The following tables shows the possible properties.

Name Description Type Default Valid Values Importance
application.loop.interval.msHow often to perform the main application loop (only controls how often to poll for new user registrations).>long300000
user.cache.refresh.interval.msHow often to invalidate the cache and poll for new user registrations.long3600000
rest.source.poll.interval.msHow often to poll the source URL.long60000low
rest.source.base.urlBase URL for REST source connector.stringhigh
rest.source.destination.topicsThe list of destination topics for the REST source connector.list""high
rest.source.topic.selectorThe topic selector class for REST source connector.classorg.radarbase.connect.rest.selector.SimpleTopicSelectorClass extending org.radarbase.connect.rest.selector.TopicSelectorhigh
rest.source.payload.converter.classClass to be used to convert messages from REST calls to SourceRecordsclassorg.radarbase.connect.rest.converter.StringPayloadConverterClass extending org.radarbase.connect.rest.converter.PayloadToSourceRecordConverterlow
rest.source.request.generator.classClass to be used to generate REST requestsclassorg.radarbase.connect.rest.single.SingleRequestGeneratorClass extending org.radarbase.connect.rest.request.RequestGeneratorlow
fitbit.usersThe user ID of Fitbit users to include in polling, separated by commas. Non existing user names will be ignored. If empty, all users in the user directory will be used.list""high
fitbit.api.clientClient ID for the Fitbit APIstringnon-empty stringhigh
fitbit.api.secretSecret for the Fitbit API client set in fitbit.api.client.passwordhigh
fitbit.user.poll.intervalPolling interval per Fitbit user per request route in seconds.int150medium
fitbit.api.intradaySet to true if the client has permissions to Fitbit Intraday API, false otherwise.booleanfalsemedium
fitbit.user.repository.classClass for managing users and authentication.classorg.radarbase.connect.rest.fitbit.user.YamlUserRepositoryClass extending org.radarbase.connect.rest.fitbit.user.UserRepositorymedium
fitbit.user.dirDirectory containing Fitbit user information and credentials. Only used if a file-based user repository is configured.string/var/lib/kafka-connect-fitbit-source/userslow
fitbit.user.repository.urlURL for webservice containing user credentials. Only used if a webservice-based user repository is configured.string""low
fitbit.user.repository.client.idClient ID for connecting to the service repository.string""medium
fitbit.user.repository.client.secretClient secret for connecting to the service repository.string""medium
fitbit.user.repository.oauth2.token.urlOAuth 2.0 token url for retrieving client credentials.string""medium
fitbit.intraday.steps.topicTopic for Fitbit intraday stepsstringconnect_fitbit_intraday_stepsnon-empty string without control characterslow
fitbit.intraday.heart.rate.topicTopic for Fitbit intraday heart_ratestringconnect_fitbit_intraday_heart_ratenon-empty string without control characterslow
fitbit.sleep.stages.topicTopic for Fitbit sleep stagesstringconnect_fitbit_sleep_stagesnon-empty string without control characterslow
fitbit.sleep.classic.topicTopic for Fitbit sleep classic datastringconnect_fitbit_sleep_classicnon-empty string without control characterslow
fitbit.time.zone.topicTopic for Fitbit profile time zonestringconnect_fitbit_time_zonenon-empty string without control characterslow
fitbit.activity.log.topicTopic for Fitbit activity log.stringconnect_fitbit_activity_lognon-empty string without control characterslow
fitbit.intraday.calories.topicTopic for Fitbit intraday caloriesstringconnect_fitbit_intraday_caloriesnon-empty string without control characterslow
fitbit.user.firebase.collection.fitbit.nameFirestore Collection for retrieving Fitbit Auth details. Only used when a Firebase based user repository is used.stringfitbitlow
fitbit.user.firebase.collection.user.nameFirestore Collection for retrieving User details. Only used when a Firebase based user repository is used.stringuserslow

If the ManagementPortal is used to authenticate against the user repository, please add an OAuth client to ManagementPortal with the following properties:

Client ID: fitbit.user.repository.client.id
Client Secret: fitbit.user.repository.client.secret
Scope: SUBJECT.READ MEASUREMENT.CREATE
Resources: res_restAuthorizer
Grant types: client_credentials
Access Token validity: 600
Refresh Token validity: 0

Finally set the fitbit.user.repository.oauth.token.url to http://managementportal-app:8080/managementportal/oauth/token.

Now you can run a full Kafka stack using

docker-compose up -d --build

Inspect the progress with docker-compose logs -f radar-fitbit-connector. To inspect data that is coming out of the requests, run

docker-compose exec schema-registry-1 kafka-avro-console-consumer \
  --bootstrap-server kafka-1:9092,kafka-2:9092,kafka-3:9092 \
  --from-beginning \
  --topic connect_fitbit_intraday_heart_rate

Flows

The following diagrams shows the flow of the Fitbit source connector. The fitbit source connector is a Kafka Connect source connector that polls the Fitbit API for data. The data is then converted to Avro records and sent to Kafka topics.

Initialization

On startup, the fitbit connector simply starts up and schedules its regular polling tasks.

sequenceDiagram
  participant connector as Fitbit Source Connector
  participant kafka as Kafka

  connector ->> kafka: Check Kafka readiness (optional)
  connector ->> connector: Schedule polling tasks
Loading

Regular operation

The Fitbit connector operates by regularly polling the user repository, and regularly polling all configured users for data

sequenceDiagram
  participant connector as Fitbit Source Connector
  participant userRepo as User Repository (rest-source-auth)
  participant fitbit as Fitbit API
  participant kafka as Kafka
    
  note over connector: Get users (every 5 minutes)
  connector ->> userRepo: Get users @ /users?source-type=FitBit
  note over connector: For each user (every 5 seconds)
  connector ->> connector: What data should be fetched?  
  connector ->> userRepo: Get fitbit access token @ users/<id>/token
  connector ->> fitbit: Get required data @ api.fitbit.com/1/user/<id>/<data-type>/date/<daterange>
  connector ->> kafka: Send data
  kafka ->> connector: 200 OK
  connector ->> connector: Update offset times
Loading

Contributing

Code should be formatted using the Google Java Code Style Guide. If you want to contribute a feature or fix browse our issues, and please make a pull request.