Skip to content

Commit

Permalink
Recover some of the improvements from reverted branch and bump versio…
Browse files Browse the repository at this point in the history
…n to 8.0
  • Loading branch information
castorm committed Nov 7, 2020
1 parent ffbb35f commit 2eb57f1
Show file tree
Hide file tree
Showing 38 changed files with 1,021 additions and 598 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

---

## v0.7
## v0.8

### v0.7.7 (TBD)
### v0.8.0 (TBD)
- Provided different log levels for `OkHttpClient`.`TRACE`: `BODY`, `DEBUG`: `BASIC`, `*`: `NONE`
- Refactored throttler adding the notion of timer

### v0.7.6 (05/28/2020)
- Fix typo in `http.throttler.catchup.interval.millis` configuration property
Expand Down
129 changes: 69 additions & 60 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ These are better understood by looking at the source task implementation:
```java
public List<SourceRecord> poll() throws InterruptedException {

throttler.throttle(offset);
throttler.throttle(offset.getTimestamp().orElseGet(Instant::now));

HttpRequest request = requestFactory.createRequest(offset);

Expand All @@ -82,6 +82,57 @@ public void commitRecord(SourceRecord record) {
}
```

---
<a name="timer"/>

### Timer: Throttling HttpRequests
Controls the rate at which HTTP requests are executed by informing the task, how long until the next execution is due.

> #### `http.timer`
> ```java
> public interface Timer extends Configurable {
>
> Long getRemainingMillis();
>
> default void reset(Instant lastZero) {
> // Do nothing
> }
> }
> ```
> * Type: `Class`
> * Default: `com.github.castorm.kafka.connect.timer.AdaptableIntervalTimer`
> * Available implementations:
> * `com.github.castorm.kafka.connect.timer.FixedIntervalTimer`
> * `com.github.castorm.kafka.connect.timer.AdaptableIntervalTimer`
#### Throttling HttpRequests with FixedIntervalThrottler
Throttles rate of requests based on a fixed interval.
> ##### `http.timer.interval.millis`
> Interval in between requests
> * Type: `Long`
> * Default: `60000`
#### Throttling HttpRequests with AdaptableIntervalThrottler
Throttles rate of requests based on a fixed interval. However, it has two modes of operation, with two different
intervals:
* Up to date: No new records, or they have been created since last poll
* Catching up: There were new records in last poll but they were created long ago (longer than interval)
> ##### `http.timer.interval.millis`
> Interval in between requests when up-to-date
>
> * Type: `Long`
> * Default: `60000`
>
> ##### `http.timer.catchup.interval.millis`
> Interval in between requests when catching up
> * Type: `Long`
> * Default: `30000`
---
<a name="request"/>
Expand All @@ -95,7 +146,7 @@ The first thing our connector will need to do is creating a `HttpRequest`.
> HttpRequest createRequest(Offset offset);
> }
> ```
> * Type: Class
> * Type: `Class`
> * Default: `com.github.castorm.kafka.connect.http.request.template.TemplateHttpRequestFactory`
> * Available implementations:
> * `com.github.castorm.kafka.connect.http.request.template.TemplateHttpRequestFactory`
Expand All @@ -116,7 +167,7 @@ This `HttpRequestFactory` is based on template resolution.
>
> ##### `http.request.url`
> Http url to use in the request.
> * **Required**
> * __Required__
> * Type: `String`
>
> ##### `http.request.headers`
Expand Down Expand Up @@ -150,21 +201,25 @@ This `HttpRequestFactory` is based on template resolution.
> ```
> Class responsible for creating the templates that will be used on every request.
> * Type: `Class`
> * Default: `com.github.castorm.kafka.connect.http.request.template.freemarker.FreeMarkerTemplateFactory`
> * Default: `com.github.castorm.kafka.connect.http.request.template.freemarker.BackwardsCompatibleFreeMarkerTemplateFactory`
> * Available implementations:
> * `com.github.castorm.kafka.connect.http.request.template.freemarker.BackwardsCompatibleFreeMarkerTemplateFactory`
Implementation based on [FreeMarker](https://freemarker.apache.org/) which accepts offset properties without
`offset` namespace _(Deprecated)_
> * `com.github.castorm.kafka.connect.http.request.template.freemarker.FreeMarkerTemplateFactory`
Implementation based on [FreeMarker](https://freemarker.apache.org/)
> * `com.github.castorm.kafka.connect.http.request.template.NoTemplateFactory`
##### Creating a HttpRequest with FreeMarkerTemplateFactory
FreeMarker templates will have the following data model available:
* key
* timestamp
* ... _(custom offset properties)_
* offset
* key
* timestamp
* ... _(custom offset properties)_
Accessing any of the above withing a template can be achieved like this:
```properties
http.request.params=after=${timestamp}
http.request.params=after=${offset.timestamp}
```
For a complete understanding of the features provided by FreeMarker, please, refer to the
[User Manual](https://freemarker.apache.org/docs/index.html)
Expand Down Expand Up @@ -302,9 +357,9 @@ Parses the HTTP response into a key-value SourceRecord. This process is decompos
> * Type: `Class`
> * Default: `com.github.castorm.kafka.connect.http.record.SchemedKvSourceRecordMapper`
> * Available implementations:
> * `com.github.castorm.kafka.connect.http.record.SchemedKvSourceRecordMapper` Maps **key** to a *Struct schema*
> with a single property `key`, and **value** to a *Struct schema* with a single property `value`
> * `com.github.castorm.kafka.connect.http.record.StringKvSourceRecordMapper` Maps both **key** and **value** to
> * `com.github.castorm.kafka.connect.http.record.SchemedKvSourceRecordMapper` Maps __key__ to a *Struct schema*
> with a single property `key`, and __value__ to a *Struct schema* with a single property `value`
> * `com.github.castorm.kafka.connect.http.record.StringKvSourceRecordMapper` Maps both __key__ and __value__ to
> a `String` schema
##### Parsing a HttpResponse with JacksonKvRecordHttpResponseParser
Expand Down Expand Up @@ -365,7 +420,7 @@ Uses [Jackson](https://github.com/FasterXML/jackson) to look for the records in
> * Default: `UTC`
>
> ##### `http.response.record.offset.pointer`
> Comma separated list of `key=/value` pairs where the key is the name of the property in the offset and the value is
> Comma separated list of `key=/value` pairs where the key is the name of the property in the offset, and the value is
> the [JsonPointer](https://tools.ietf.org/html/rfc6901) to the value being used as offset for future requests
> This is the mechanism that enables sharing state in between `HttpRequests`. `HttpRequestFactory` implementations
> receive this `Offset`.
Expand All @@ -391,7 +446,7 @@ Here is also where we'll tell Kafka Connect to what topic and on what partition
> ##### `kafka.topic`
> Name of the topic where the record will be sent to
> * **Required**
> * __Required__
> * Type: `String`
> * Default: `""`
>
Expand Down Expand Up @@ -473,52 +528,6 @@ Assumptions:
* There is an `Offset` property that uniquely identify records (e.g. key)
* There won't be new items preceding already seen ones
---
<a name="throttler"/>
### Throttler: Throttling HttpRequests
Controls the rate at which HTTP requests are executed.
> #### `http.throttler`
> ```java
> public interface Throttler extends Configurable {
>
> void throttle(Offset offset) throws InterruptedException;
> }
> ```
> * Type: Class
> * Default: `com.github.castorm.kafka.connect.throttle.FixedIntervalThrottler`
> * Available implementations:
> * `com.github.castorm.kafka.connect.throttle.FixedIntervalThrottler`
> * `com.github.castorm.kafka.connect.throttle.AdaptableIntervalThrottler`
#### Throttling HttpRequests with FixedIntervalThrottler
Throttles rate of requests based on a fixed interval.
> ##### `http.throttler.interval.millis`
> Interval in between requests
> * Type: Long
> * Default: 10000
#### Throttling HttpRequests with AdaptableIntervalThrottler
Throttles rate of requests based on a fixed interval. However, it has two modes of operation, with two different
intervals:
* Up to date: No new records, or they have been created since last poll
* Catching up: There were new record in last poll and they were created long ago
> ##### `http.throttler.interval.millis`
> Interval in between requests when up-to-date
>
> * Type: Long
> * Default: 10000
>
> ##### `http.throttler.catchup.interval.millis`
> Interval in between requests when catching up
> * Type: Long
> * Default: 1000
---
## Development
Expand All @@ -544,7 +553,7 @@ We use [SemVer](http://semver.org/) for versioning.
## Authors
* **Cástor Rodríguez** - Only contributor so far - [castorm](https://github.com/castorm)
* __Cástor Rodríguez__ - Only contributor so far - [castorm](https://github.com/castorm)
## License
Expand Down
11 changes: 5 additions & 6 deletions config/quickstart-http.properties
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
name=http-source
connector.class=com.github.castorm.kafka.connect.http.HttpSourceConnector
http.offset.initial="updatedAt=2020-05-08T07:55:44Z"
http.offset.initial=timestamp=2020-05-08T07:55:44Z
http.request.url=https://your-host-here/rest/api/2/search
http.request.headers="Authorization=Basic TBD, Accept=application/json"
http.request.params="updatedAtFrom=${updatedAt}"
http.request.headers=Authorization=Basic TBD, Accept=application/json
http.request.params=updatedAtFrom=${timestamp}
http.response.list.pointer=/items
http.response.record.key.pointer=/id
http.response.record.timestamp.pointer="updatedAt=/updatedAt"
http.throttler.interval.millis=30000
http.response.record.offset.pointer=key=/id, timestamp=/updatedAt
http.timer.interval.millis=30000
kafka.topic=topic
9 changes: 4 additions & 5 deletions examples/elasticsearch-search.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,13 @@ queries.
"http.request.url": "http://domain/index_name/_search",
"http.request.method": "POST",
"http.request.headers": "Content-Type: application/json",
"http.request.body": "{\"size\": 100, \"sort\": [{\"@timestamp\": \"asc\"}], \"search_after\": [${timestamp}]}",
"http.request.body": "{\"size\": 100, \"sort\": [{\"@timestamp\": \"asc\"}], \"search_after\": [${offset.timestamp}]}",
"http.response.list.pointer": "/hits/hits",
"http.response.record.key.pointer": "/_id",
"http.response.record.pointer": "/_source",
"http.response.record.key.pointer": "/_id",
"http.response.record.offset.pointer": "timestamp=/sort/0",
"http.response.record.timestamp.pointer": "/sort/0",
"http.throttler.interval.millis": "30000",
"http.throttler.catchup.interval.millis": "1000",
"http.timer.interval.millis": "30000",
"http.timer.catchup.interval.millis": "1000",
"kafka.topic": "topic"
}
}
Expand Down
8 changes: 4 additions & 4 deletions examples/jira-issues-search.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ And based on the results we would be updating the `updated` filter for subsequen
"http.offset.initial": "timestamp=2020-05-08T07:55:44Z",
"http.request.url": "https://your-host-here/rest/api/2/search",
"http.request.headers": "Authorization: Basic TBD, Accept: application/json",
"http.request.params": "jql=updated>=\"${timestamp?datetime.iso?string['yyyy/MM/dd HH:mm']}\" ORDER BY updated ASC&maxResults=100",
"http.request.params": "jql=updated>=\"${offset.timestamp?datetime.iso?string['yyyy/MM/dd HH:mm']}\" ORDER BY updated ASC&maxResults=100",
"http.response.list.pointer": "/issues",
"http.response.record.key.pointer": "/id",
"http.response.record.timestamp.pointer": "/fields/updated",
"http.throttler.interval.millis": "30000",
"http.throttler.catchup.interval.millis": "1000",
"http.response.record.offset.pointer": "timestamp=/fields/updated",
"http.timer.interval.millis": "30000",
"http.timer.catchup.interval.millis": "1000",
"kafka.topic": "topic"
}
}
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>com.github.castorm</groupId>
<artifactId>kafka-connect-http</artifactId>
<version>0.7.7-SNAPSHOT</version>
<version>0.8.0-SNAPSHOT</version>
<packaging>jar</packaging>

<name>Kafka Connect HTTP</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.stream.Collector;
import java.util.stream.Stream;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
Expand All @@ -54,55 +53,64 @@ public static Map<String, String> breakDownMap(String mapString) {
return breakDownPairs(mapString, ",", "=");
}

private static Map<String, String> breakDownPairs(String itemLine, String itemSplitter, String pairSplitter) {
return breakDownPairs(itemLine, itemSplitter, pairSplitter, toMap(Entry::getKey, Entry::getValue));
public static List<Map<String, String>> breakDownMapList(String mapList) {
return breakDownList(mapList, ";")
.map(ConfigUtils::breakDownMap)
.collect(toList());
}

private static <T> Map<String, T> breakDownPairs(String itemLine, String itemSplitter, String pairSplitter, Collector<Entry<String, String>, ?, Map<String, T>> collector) {
if (itemLine == null || itemLine.length() == 0) {
return emptyMap();
}
return Stream.of(itemLine.split(itemSplitter))
.map(headerLine -> toPair(headerLine, pairSplitter))
.collect(collector);
private static Map<String, String> breakDownPairs(String itemLine, String itemSplitter, String pairSplitter) {
return breakDownPairs(itemLine, itemSplitter, pairSplitter, toMap(Entry::getKey, Entry::getValue));
}

private static Map<String, List<String>> breakDownMultiValuePairs(String itemLine, String itemSplitter, String pairSplitter) {
return breakDownPairs(itemLine, itemSplitter, pairSplitter, groupingBy(Entry::getKey, mapping(Entry::getValue, toList())));
}

private static Entry<String, String> toPair(String pairLine, String pairSplitter) {
private static <T> Map<String, T> breakDownPairs(String itemList, String itemSplitter, String pairSplitter, Collector<Entry<String, String>, ?, Map<String, T>> collector) {
return breakDownList(itemList, itemSplitter)
.map(headerLine -> breakDownPair(headerLine, pairSplitter))
.collect(collector);
}

private static Entry<String, String> breakDownPair(String pairLine, String pairSplitter) {
String[] parts = pairLine.split(pairSplitter, 2);
if (parts.length < 2) {
throw new IllegalStateException("Incomplete pair: " + pairLine);
}
return new SimpleEntry<>(parts[0].trim(), parts[1].trim());
}

public static Set<Integer> parseIntegerRangedList(String rangedList) {
return Stream.of(rangedList.split(","))
.map(String::trim)
.map(ConfigUtils::parseIntegerRanged)
.flatMap(Set::stream)
.collect(toSet());
public static List<String> breakDownList(String itemList) {
return breakDownList(itemList, ",")
.collect(toList());
}

public static List<String> breakDownList(String itemList) {
return Stream.of(itemList.split(","))
private static Stream<String> breakDownList(String itemList, String splitter) {
if (itemList == null || itemList.length() == 0) {
return Stream.empty();
}
return Stream.of(itemList.split(splitter))
.map(String::trim)
.filter(it -> !it.isEmpty())
.collect(toList());
.filter(it -> !it.isEmpty());
}

public static Set<Integer> parseIntegerRangedList(String rangedList) {
return breakDownList(rangedList, ",")
.map(ConfigUtils::parseIntegerRange)
.flatMap(Set::stream)
.collect(toSet());
}

private static Set<Integer> parseIntegerRanged(String range) {
private static Set<Integer> parseIntegerRange(String range) {
String[] rangeString = range.split("\\.\\.");
if (rangeString.length == 0 || rangeString[0].length() == 0) {
return emptySet();
} else if (rangeString.length == 1) {
return asSet(Integer.valueOf(rangeString[0].trim()));
} else if (rangeString.length == 2) {
Integer from = Integer.valueOf(rangeString[0].trim());
Integer to = Integer.valueOf(rangeString[1].trim());
int from = Integer.parseInt(rangeString[0].trim());
int to = Integer.parseInt(rangeString[1].trim());
return (from < to ? rangeClosed(from, to) : rangeClosed(to, from)).boxed().collect(toSet());
}
throw new IllegalStateException(String.format("Invalid range definition %s", range));
Expand Down
Loading

0 comments on commit 2eb57f1

Please sign in to comment.