Skip to content

Commit

Permalink
Updated Changefeed to be able to read last consumable. Also unified c…
Browse files Browse the repository at this point in the history
…ursor. (#13979)
  • Loading branch information
gapra-msft authored Aug 13, 2020
1 parent 8df9e17 commit eb68505
Show file tree
Hide file tree
Showing 66 changed files with 8,679 additions and 29,917 deletions.
49 changes: 45 additions & 4 deletions sdk/storage/azure-storage-blob-changefeed/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,25 +118,26 @@ tasks, including:
- [Get events](#get-events)
- [Get events between a start and end time](#get-events-start-end)
- [Resume with a cursor](#get-events-cursor)
- [Poll for events with a cursor](#poll-events-cursor)

### Create a `BlobChangefeedClient`

<!-- embedme ./src/samples/java/com/azure/storage/blob/changefeed/ReadmeSamples.java#L23-L23 -->
<!-- embedme ./src/samples/java/com/azure/storage/blob/changefeed/ReadmeSamples.java#L26-L26 -->
```java
client = new BlobChangefeedClientBuilder(blobServiceClient).buildClient();
```

### Get events

<!-- embedme ./src/samples/java/com/azure/storage/blob/changefeed/ReadmeSamples.java#L27-L28 -->
<!-- embedme ./src/samples/java/com/azure/storage/blob/changefeed/ReadmeSamples.java#L30-L31 -->
```java
client.getEvents().forEach(event ->
System.out.printf("Topic: %s, Subject: %s%n", event.getTopic(), event.getSubject()));
```

### Get events between a start and end time

<!-- embedme ./src/samples/java/com/azure/storage/blob/changefeed/ReadmeSamples.java#L32-L36 -->
<!-- embedme ./src/samples/java/com/azure/storage/blob/changefeed/ReadmeSamples.java#L35-L39 -->
```java
OffsetDateTime startTime = OffsetDateTime.MIN;
OffsetDateTime endTime = OffsetDateTime.now();
Expand All @@ -147,7 +148,7 @@ client.getEvents(startTime, endTime).forEach(event ->

### Resume with a cursor

<!-- embedme ./src/samples/java/com/azure/storage/blob/changefeed/ReadmeSamples.java#L40-L56 -->
<!-- embedme ./src/samples/java/com/azure/storage/blob/changefeed/ReadmeSamples.java#L43-L59 -->
```java
BlobChangefeedPagedIterable iterable = client.getEvents();
Iterable<BlobChangefeedPagedResponse> pages = iterable.iterableByPage();
Expand All @@ -168,6 +169,46 @@ client.getEvents(cursor).forEach(event ->
System.out.printf("Topic: %s, Subject: %s%n", event.getTopic(), event.getSubject()));
```

### Poll for events with a cursor

<!-- embedme ./src/samples/java/com/azure/storage/blob/changefeed/ReadmeSamples.java#L63-L96 -->
```java
List<BlobChangefeedEvent> changefeedEvents = new ArrayList<BlobChangefeedEvent>();

/* Get the start time. The change feed client will round start time down to the nearest hour if you provide
an OffsetDateTime with minutes and seconds. */
OffsetDateTime startTime = OffsetDateTime.now();

/* Get your polling interval. */
long pollingInterval = 1000 * 60 * 5; /* 5 minutes. */

/* Get initial set of events. */
Iterable<BlobChangefeedPagedResponse> pages = client.getEvents(startTime, null).iterableByPage();

String continuationToken = null;

while (true) {
for (BlobChangefeedPagedResponse page : pages) {
changefeedEvents.addAll(page.getValue());
/*
* Get the change feed cursor. The cursor is not required to get each page of events,
* it is intended to be saved and used to resume iterating at a later date.
*/
continuationToken = page.getContinuationToken();
}

/* Wait before processing next batch of events. */
try {
Thread.sleep(pollingInterval);
} catch (InterruptedException e) {
e.printStackTrace();
}

/* Resume from last continuation token and fetch latest set of events. */
pages = client.getEvents(continuationToken).iterableByPage();
}
```

## Troubleshooting
When interacting with blobs using this Java client library, errors returned by the service correspond to the same HTTP
status codes returned for [REST API][error_codes] requests. For example, if you try to retrieve a container or blob that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.paging.ContinuablePage;
import com.azure.core.util.paging.ContinuablePagedFlux;
import com.azure.storage.blob.changefeed.implementation.models.BlobChangefeedEventWrapper;
import com.azure.storage.blob.changefeed.implementation.models.ChangefeedCursor;
import com.azure.storage.blob.changefeed.implementation.models.BlobChangefeedEventWrapper;
import com.azure.storage.blob.changefeed.models.BlobChangefeedEvent;
import com.azure.storage.common.implementation.StorageImplUtils;
import reactor.core.CoreSubscriber;
Expand All @@ -27,10 +27,7 @@ public final class BlobChangefeedPagedFlux extends ContinuablePagedFlux<String,

private final ClientLogger logger = new ClientLogger(BlobChangefeedPagedFlux.class);

private final ChangefeedFactory changefeedFactory;
private final OffsetDateTime startTime;
private final OffsetDateTime endTime;
private final String cursor;
private final Changefeed changefeed;

private static final Integer DEFAULT_PAGE_SIZE = 5000;

Expand All @@ -39,21 +36,15 @@ public final class BlobChangefeedPagedFlux extends ContinuablePagedFlux<String,
*/
BlobChangefeedPagedFlux(ChangefeedFactory changefeedFactory, OffsetDateTime startTime, OffsetDateTime endTime) {
StorageImplUtils.assertNotNull("changefeedFactory", changefeedFactory);
this.changefeedFactory = changefeedFactory;
this.startTime = startTime;
this.endTime = endTime;
this.cursor = null;
this.changefeed = changefeedFactory.getChangefeed(startTime, endTime);
}

/**
* Creates an instance of {@link BlobChangefeedPagedFlux}.
*/
BlobChangefeedPagedFlux(ChangefeedFactory changefeedFactory, String cursor) {
StorageImplUtils.assertNotNull("changefeedFactory", changefeedFactory);
this.changefeedFactory = changefeedFactory;
this.startTime = null;
this.endTime = null;
this.cursor = cursor;
this.changefeed = changefeedFactory.getChangefeed(cursor);
}

@Override
Expand Down Expand Up @@ -95,22 +86,15 @@ public Flux<BlobChangefeedPagedResponse> byPage(int preferredPageSize) {
public Flux<BlobChangefeedPagedResponse> byPage(String continuationToken, int preferredPageSize) {

if (continuationToken != null) {
return FluxUtil.fluxError(logger, new UnsupportedOperationException("continuationToken not supported. Use "
+ "client.getEvents(String) to pass in a cursor."));
return FluxUtil.pagedFluxError(logger, new UnsupportedOperationException("continuationToken not "
+ "supported. Use client.getEvents(String) to pass in a cursor."));
}
if (preferredPageSize <= 0) {
return FluxUtil.fluxError(logger, new IllegalArgumentException("preferredPageSize > 0 required but "
return FluxUtil.pagedFluxError(logger, new IllegalArgumentException("preferredPageSize > 0 required but "
+ "provided: " + preferredPageSize));
}
preferredPageSize = Integer.min(preferredPageSize, DEFAULT_PAGE_SIZE);

Changefeed changefeed;
if (cursor != null) {
changefeed = changefeedFactory.getChangefeed(cursor);
} else {
changefeed = changefeedFactory.getChangefeed(startTime, endTime);
}

return changefeed.getEvents()
/* Window the events to the page size. This takes the Flux<BlobChangefeedEventWrapper> and
transforms it into a Flux<Flux<BlobChangefeedEventWrapper>>, where the internal Fluxes can have at most
Expand Down Expand Up @@ -139,8 +123,7 @@ public Flux<BlobChangefeedPagedResponse> byPage(String continuationToken, int pr

@Override
public void subscribe(CoreSubscriber<? super BlobChangefeedEvent> coreSubscriber) {
byPage(null, DEFAULT_PAGE_SIZE)
.flatMap((page) -> Flux.fromIterable(page.getElements()))
changefeed.getEvents().map(BlobChangefeedEventWrapper::getEvent)
.subscribe(coreSubscriber);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ public Flux<ByteBuffer> download() {
BiFunction<BlobRange, BlobRequestConditions, Mono<BlobDownloadAsyncResponse>> downloadFunc = (range, conditions)
-> client.downloadWithResponse(range, null, conditions, false);

return ChunkedDownloadUtils.downloadFirstChunk(range, options, requestConditions, downloadFunc)
/* We don't etag lock since the Changefeed can append to the blob while we are reading it. */
return ChunkedDownloadUtils.downloadFirstChunk(range, options, requestConditions, downloadFunc, false)
.flatMapMany(setupTuple3 -> {
long newCount = setupTuple3.getT1();
BlobRequestConditions finalConditions = setupTuple3.getT2();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
import com.azure.storage.blob.changefeed.implementation.util.TimeUtils;
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.ListBlobsOptions;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuples;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.time.OffsetDateTime;
import java.util.Objects;

/**
* A class that represents a Changefeed.
Expand All @@ -38,15 +38,11 @@ class Changefeed {

private static final String SEGMENT_PREFIX = "idx/segments/";
private static final String METADATA_SEGMENT_PATH = "meta/segments.json";
private static final ObjectMapper MAPPER = new ObjectMapper();

private final BlobContainerAsyncClient client; /* Changefeed container */
private final OffsetDateTime startTime; /* User provided start time. */
private final OffsetDateTime endTime; /* User provided end time. */
private OffsetDateTime lastConsumable; /* Last consumable time. The latest time the changefeed can safely be
read from.*/
private OffsetDateTime safeEndTime; /* Soonest time between lastConsumable and endTime. */
private final ChangefeedCursor cfCursor; /* Cursor associated with changefeed. */
private final ChangefeedCursor changefeedCursor; /* Cursor associated with changefeed. */
private final ChangefeedCursor userCursor; /* User provided cursor. */
private final SegmentFactory segmentFactory; /* Segment factory. */

Expand All @@ -56,13 +52,28 @@ class Changefeed {
Changefeed(BlobContainerAsyncClient client, OffsetDateTime startTime, OffsetDateTime endTime,
ChangefeedCursor userCursor, SegmentFactory segmentFactory) {
this.client = client;
this.startTime = startTime;
this.endTime = endTime;
this.startTime = TimeUtils.roundDownToNearestHour(startTime);
this.endTime = TimeUtils.roundUpToNearestHour(endTime);
this.userCursor = userCursor;
this.segmentFactory = segmentFactory;

this.cfCursor = new ChangefeedCursor(this.endTime);
this.safeEndTime = endTime;
String urlHost = null;
try {
urlHost = new URL(client.getBlobContainerUrl()).getHost();
} catch (MalformedURLException e) {
throw logger.logExceptionAsError(new RuntimeException(e));
}
this.changefeedCursor = new ChangefeedCursor(urlHost, this.endTime);

/* Validate the cursor. */
if (userCursor != null) {
if (userCursor.getCursorVersion() != 1) {
throw logger.logExceptionAsError(new IllegalArgumentException("Unsupported cursor version."));
}
if (!Objects.equals(urlHost, userCursor.getUrlHost())) {
throw logger.logExceptionAsError(new IllegalArgumentException("Cursor URL host does not match "
+ "container URL host."));
}
}
}

/**
Expand All @@ -72,8 +83,14 @@ class Changefeed {
Flux<BlobChangefeedEventWrapper> getEvents() {
return validateChangefeed()
.then(populateLastConsumable())
.thenMany(listYears())
.concatMap(this::listSegmentsForYear)
.flatMapMany(safeEndTime ->
listYears(safeEndTime).map(str -> Tuples.of(safeEndTime, str))
)
.concatMap(tuple2 -> {
OffsetDateTime safeEndTime = tuple2.getT1();
String year = tuple2.getT2();
return listSegmentsForYear(safeEndTime, year);
})
.concatMap(this::getEventsForSegment);
}

Expand All @@ -91,7 +108,6 @@ private Mono<Boolean> validateChangefeed() {
});
}

/* TODO (gapra) : Investigate making this thread safe. */
/**
* Populates the last consumable property from changefeed metadata.
* Log files in any segment that is dated after the date of the LastConsumable property in the
Expand All @@ -100,25 +116,24 @@ private Mono<Boolean> validateChangefeed() {
private Mono<OffsetDateTime> populateLastConsumable() {
/* We can keep the entire metadata file in memory since it is expected to only be a few hundred bytes. */
return DownloadUtils.downloadToByteArray(this.client, METADATA_SEGMENT_PATH)
.flatMap(DownloadUtils::parseJson)
/* Parse JSON for last consumable. */
.flatMap(json -> {
try {
JsonNode jsonNode = MAPPER.reader().readTree(json);
this.lastConsumable = OffsetDateTime.parse(jsonNode.get("lastConsumable").asText());
if (this.lastConsumable.isBefore(endTime)) {
this.safeEndTime = this.lastConsumable;
}
return Mono.just(this.lastConsumable);
} catch (IOException e) {
return FluxUtil.monoError(logger, new UncheckedIOException(e));
.flatMap(jsonNode -> {
/* Last consumable time. The latest time the changefeed can safely be read from.*/
OffsetDateTime lastConsumableTime = OffsetDateTime.parse(jsonNode.get("lastConsumable").asText());
/* Soonest time between lastConsumable and endTime. */
OffsetDateTime safeEndTime = this.endTime;
if (lastConsumableTime.isBefore(endTime)) {
safeEndTime = lastConsumableTime.plusHours(1); /* Add an hour since end time is non inclusive. */
}
return Mono.just(safeEndTime);
});
}

/**
* List years for which changefeed data exists.
*/
private Flux<String> listYears() {
private Flux<String> listYears(OffsetDateTime safeEndTime) {
return client.listBlobsByHierarchy(SEGMENT_PREFIX)
.map(BlobItem::getName)
.filter(yearPath -> TimeUtils.validYear(yearPath, startTime, safeEndTime));
Expand All @@ -127,7 +142,7 @@ private Flux<String> listYears() {
/**
* List segments for years of interest.
*/
private Flux<String> listSegmentsForYear(String year) {
private Flux<String> listSegmentsForYear(OffsetDateTime safeEndTime, String year) {
return client.listBlobs(new ListBlobsOptions().setPrefix(year))
.map(BlobItem::getName)
.filter(segmentPath -> TimeUtils.validSegment(segmentPath, startTime, safeEndTime));
Expand All @@ -139,12 +154,15 @@ private Flux<String> listSegmentsForYear(String year) {
private Flux<BlobChangefeedEventWrapper> getEventsForSegment(String segment) {
OffsetDateTime segmentTime = TimeUtils.convertPathToTime(segment);
/* Only pass the user cursor in to the segment of interest. */
if (userCursor != null && segmentTime.isEqual(OffsetDateTime.parse(userCursor.getSegmentTime()))) {
return segmentFactory.getSegment(segment, cfCursor.toSegmentCursor(segmentTime), userCursor)
.getEvents();
if (userCursor != null && segmentTime.isEqual(startTime)) {
return segmentFactory.getSegment(segment,
changefeedCursor.toSegmentCursor(segment, userCursor.getCurrentSegmentCursor()),
userCursor.getCurrentSegmentCursor()).getEvents();
} else {
return segmentFactory.getSegment(segment,
changefeedCursor.toSegmentCursor(segment, null),
null).getEvents();
}
return segmentFactory.getSegment(segment, cfCursor.toSegmentCursor(segmentTime), null)
.getEvents();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.azure.core.util.logging.ClientLogger;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.changefeed.implementation.models.ChangefeedCursor;
import com.azure.storage.blob.changefeed.implementation.util.TimeUtils;
import com.azure.storage.common.implementation.StorageImplUtils;

import java.time.OffsetDateTime;
Expand Down Expand Up @@ -47,8 +48,8 @@ Changefeed getChangefeed(String cursor) {
StorageImplUtils.assertNotNull("cursor", cursor);

ChangefeedCursor userCursor = ChangefeedCursor.deserialize(cursor, logger);
OffsetDateTime start = OffsetDateTime.parse(userCursor.getSegmentTime());
OffsetDateTime end = OffsetDateTime.parse(userCursor.getEndTime());
OffsetDateTime start = TimeUtils.convertPathToTime(userCursor.getCurrentSegmentCursor().getSegmentPath());
OffsetDateTime end = userCursor.getEndTime();

return new Changefeed(this.client, start, end, userCursor, segmentFactory);
}
Expand Down
Loading

0 comments on commit eb68505

Please sign in to comment.