Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated Changefeed to be able to read last consumable. Also unified cursor. #13979

Merged
merged 33 commits into from
Aug 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
1902044
Added support for writing, serializing and deserializing new cursor
gapra-msft Aug 4, 2020
93deb85
Added support to read from new cursor
gapra-msft Aug 4, 2020
608fc28
Removed all references to old cursor
gapra-msft Aug 4, 2020
1be6ff3
Added tests for ChangefeedCursor
gapra-msft Aug 4, 2020
fe4ca41
Modified BlobChangefeedPagedFlux tests
gapra-msft Aug 5, 2020
71b4140
Fixed bug where last consumable would not be included in iteration
gapra-msft Aug 5, 2020
6425bac
In the middle of changing chunk test
gapra-msft Aug 5, 2020
969e821
modified chunk test
gapra-msft Aug 6, 2020
806082d
Added more tests for chunk
gapra-msft Aug 6, 2020
2de1c43
Added more test for Chunk
gapra-msft Aug 6, 2020
a43a720
Added more chunk tests and added code to check for cursor version
gapra-msft Aug 6, 2020
92e3f55
Modified some ShardTest
gapra-msft Aug 6, 2020
ec202d9
Added more tests
gapra-msft Aug 6, 2020
8e35e1b
modifying tests
gapra-msft Aug 7, 2020
db241c8
Updated segment test
gapra-msft Aug 7, 2020
a98e222
Modified Changefeed test
gapra-msft Aug 7, 2020
eba82b7
Added prints
gapra-msft Aug 7, 2020
11ce4ef
Added more tests
gapra-msft Aug 10, 2020
81aac47
Added code for test
gapra-msft Aug 10, 2020
38a1647
Added some new recorded tests
gapra-msft Aug 10, 2020
5c47ebd
Added all CF tests
gapra-msft Aug 11, 2020
dcd26e3
Added comment
gapra-msft Aug 11, 2020
f175ed7
Modified to use url host instead of url hash
gapra-msft Aug 11, 2020
90c418d
Optimized imports
gapra-msft Aug 11, 2020
c220bd6
Modified pom
gapra-msft Aug 11, 2020
6fa06c7
Added code to pass Analyze step
gapra-msft Aug 11, 2020
787e9ee
Added code to change cursor starting point
gapra-msft Aug 12, 2020
f3e3b63
Updated AvroParser logic to live in BlockSchema
gapra-msft Aug 12, 2020
7a7df86
Cleaned up more code
gapra-msft Aug 12, 2020
1b9ee84
Removed comment
gapra-msft Aug 12, 2020
31b8801
Fixed some more stuff
gapra-msft Aug 12, 2020
149da3f
Added readme sample
gapra-msft Aug 12, 2020
e0e00ca
renamed eventIndex to objectIndex
gapra-msft Aug 13, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 45 additions & 4 deletions sdk/storage/azure-storage-blob-changefeed/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,25 +118,26 @@ tasks, including:
- [Get events](#get-events)
- [Get events between a start and end time](#get-events-start-end)
- [Resume with a cursor](#get-events-cursor)
- [Poll for events with a cursor](#poll-events-cursor)

### Create a `BlobChangefeedClient`

<!-- embedme ./src/samples/java/com/azure/storage/blob/changefeed/ReadmeSamples.java#L23-L23 -->
<!-- embedme ./src/samples/java/com/azure/storage/blob/changefeed/ReadmeSamples.java#L26-L26 -->
```java
client = new BlobChangefeedClientBuilder(blobServiceClient).buildClient();
```

### Get events

<!-- embedme ./src/samples/java/com/azure/storage/blob/changefeed/ReadmeSamples.java#L27-L28 -->
<!-- embedme ./src/samples/java/com/azure/storage/blob/changefeed/ReadmeSamples.java#L30-L31 -->
```java
client.getEvents().forEach(event ->
System.out.printf("Topic: %s, Subject: %s%n", event.getTopic(), event.getSubject()));
```

### Get events between a start and end time

<!-- embedme ./src/samples/java/com/azure/storage/blob/changefeed/ReadmeSamples.java#L32-L36 -->
<!-- embedme ./src/samples/java/com/azure/storage/blob/changefeed/ReadmeSamples.java#L35-L39 -->
```java
OffsetDateTime startTime = OffsetDateTime.MIN;
OffsetDateTime endTime = OffsetDateTime.now();
Expand All @@ -147,7 +148,7 @@ client.getEvents(startTime, endTime).forEach(event ->

### Resume with a cursor

<!-- embedme ./src/samples/java/com/azure/storage/blob/changefeed/ReadmeSamples.java#L40-L56 -->
<!-- embedme ./src/samples/java/com/azure/storage/blob/changefeed/ReadmeSamples.java#L43-L59 -->
```java
BlobChangefeedPagedIterable iterable = client.getEvents();
Iterable<BlobChangefeedPagedResponse> pages = iterable.iterableByPage();
Expand All @@ -168,6 +169,46 @@ client.getEvents(cursor).forEach(event ->
System.out.printf("Topic: %s, Subject: %s%n", event.getTopic(), event.getSubject()));
```

### Poll for events with a cursor

<!-- embedme ./src/samples/java/com/azure/storage/blob/changefeed/ReadmeSamples.java#L63-L96 -->
```java
List<BlobChangefeedEvent> changefeedEvents = new ArrayList<BlobChangefeedEvent>();

/* Get the start time. The change feed client will round start time down to the nearest hour if you provide
an OffsetDateTime with minutes and seconds. */
OffsetDateTime startTime = OffsetDateTime.now();

/* Get your polling interval. */
long pollingInterval = 1000 * 60 * 5; /* 5 minutes. */

/* Get initial set of events. */
Iterable<BlobChangefeedPagedResponse> pages = client.getEvents(startTime, null).iterableByPage();

String continuationToken = null;

while (true) {
for (BlobChangefeedPagedResponse page : pages) {
changefeedEvents.addAll(page.getValue());
/*
* Get the change feed cursor. The cursor is not required to get each page of events,
* it is intended to be saved and used to resume iterating at a later date.
*/
continuationToken = page.getContinuationToken();
}

/* Wait before processing next batch of events. */
try {
Thread.sleep(pollingInterval);
} catch (InterruptedException e) {
e.printStackTrace();
}

/* Resume from last continuation token and fetch latest set of events. */
pages = client.getEvents(continuationToken).iterableByPage();
}
```

## Troubleshooting
When interacting with blobs using this Java client library, errors returned by the service correspond to the same HTTP
status codes returned for [REST API][error_codes] requests. For example, if you try to retrieve a container or blob that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.paging.ContinuablePage;
import com.azure.core.util.paging.ContinuablePagedFlux;
import com.azure.storage.blob.changefeed.implementation.models.BlobChangefeedEventWrapper;
import com.azure.storage.blob.changefeed.implementation.models.ChangefeedCursor;
import com.azure.storage.blob.changefeed.implementation.models.BlobChangefeedEventWrapper;
import com.azure.storage.blob.changefeed.models.BlobChangefeedEvent;
import com.azure.storage.common.implementation.StorageImplUtils;
import reactor.core.CoreSubscriber;
Expand All @@ -27,10 +27,7 @@ public final class BlobChangefeedPagedFlux extends ContinuablePagedFlux<String,

private final ClientLogger logger = new ClientLogger(BlobChangefeedPagedFlux.class);

private final ChangefeedFactory changefeedFactory;
private final OffsetDateTime startTime;
private final OffsetDateTime endTime;
private final String cursor;
private final Changefeed changefeed;

private static final Integer DEFAULT_PAGE_SIZE = 5000;

Expand All @@ -39,21 +36,15 @@ public final class BlobChangefeedPagedFlux extends ContinuablePagedFlux<String,
*/
BlobChangefeedPagedFlux(ChangefeedFactory changefeedFactory, OffsetDateTime startTime, OffsetDateTime endTime) {
StorageImplUtils.assertNotNull("changefeedFactory", changefeedFactory);
this.changefeedFactory = changefeedFactory;
this.startTime = startTime;
this.endTime = endTime;
this.cursor = null;
this.changefeed = changefeedFactory.getChangefeed(startTime, endTime);
}

/**
* Creates an instance of {@link BlobChangefeedPagedFlux}.
*/
BlobChangefeedPagedFlux(ChangefeedFactory changefeedFactory, String cursor) {
StorageImplUtils.assertNotNull("changefeedFactory", changefeedFactory);
this.changefeedFactory = changefeedFactory;
this.startTime = null;
this.endTime = null;
this.cursor = cursor;
this.changefeed = changefeedFactory.getChangefeed(cursor);
}

@Override
Expand Down Expand Up @@ -95,22 +86,15 @@ public Flux<BlobChangefeedPagedResponse> byPage(int preferredPageSize) {
public Flux<BlobChangefeedPagedResponse> byPage(String continuationToken, int preferredPageSize) {

if (continuationToken != null) {
return FluxUtil.fluxError(logger, new UnsupportedOperationException("continuationToken not supported. Use "
+ "client.getEvents(String) to pass in a cursor."));
return FluxUtil.pagedFluxError(logger, new UnsupportedOperationException("continuationToken not "
+ "supported. Use client.getEvents(String) to pass in a cursor."));
}
if (preferredPageSize <= 0) {
return FluxUtil.fluxError(logger, new IllegalArgumentException("preferredPageSize > 0 required but "
return FluxUtil.pagedFluxError(logger, new IllegalArgumentException("preferredPageSize > 0 required but "
+ "provided: " + preferredPageSize));
}
preferredPageSize = Integer.min(preferredPageSize, DEFAULT_PAGE_SIZE);

Changefeed changefeed;
if (cursor != null) {
changefeed = changefeedFactory.getChangefeed(cursor);
} else {
changefeed = changefeedFactory.getChangefeed(startTime, endTime);
}

return changefeed.getEvents()
/* Window the events to the page size. This takes the Flux<BlobChangefeedEventWrapper> and
transforms it into a Flux<Flux<BlobChangefeedEventWrapper>>, where the internal Fluxes can have at most
Expand Down Expand Up @@ -139,8 +123,7 @@ public Flux<BlobChangefeedPagedResponse> byPage(String continuationToken, int pr

@Override
public void subscribe(CoreSubscriber<? super BlobChangefeedEvent> coreSubscriber) {
byPage(null, DEFAULT_PAGE_SIZE)
.flatMap((page) -> Flux.fromIterable(page.getElements()))
changefeed.getEvents().map(BlobChangefeedEventWrapper::getEvent)
.subscribe(coreSubscriber);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ public Flux<ByteBuffer> download() {
BiFunction<BlobRange, BlobRequestConditions, Mono<BlobDownloadAsyncResponse>> downloadFunc = (range, conditions)
-> client.downloadWithResponse(range, null, conditions, false);

return ChunkedDownloadUtils.downloadFirstChunk(range, options, requestConditions, downloadFunc)
/* We don't etag lock since the Changefeed can append to the blob while we are reading it. */
return ChunkedDownloadUtils.downloadFirstChunk(range, options, requestConditions, downloadFunc, false)
.flatMapMany(setupTuple3 -> {
long newCount = setupTuple3.getT1();
BlobRequestConditions finalConditions = setupTuple3.getT2();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
import com.azure.storage.blob.changefeed.implementation.util.TimeUtils;
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.ListBlobsOptions;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuples;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.time.OffsetDateTime;
import java.util.Objects;

/**
* A class that represents a Changefeed.
Expand All @@ -38,15 +38,11 @@ class Changefeed {

private static final String SEGMENT_PREFIX = "idx/segments/";
private static final String METADATA_SEGMENT_PATH = "meta/segments.json";
private static final ObjectMapper MAPPER = new ObjectMapper();

private final BlobContainerAsyncClient client; /* Changefeed container */
private final OffsetDateTime startTime; /* User provided start time. */
private final OffsetDateTime endTime; /* User provided end time. */
private OffsetDateTime lastConsumable; /* Last consumable time. The latest time the changefeed can safely be
read from.*/
private OffsetDateTime safeEndTime; /* Soonest time between lastConsumable and endTime. */
private final ChangefeedCursor cfCursor; /* Cursor associated with changefeed. */
private final ChangefeedCursor changefeedCursor; /* Cursor associated with changefeed. */
private final ChangefeedCursor userCursor; /* User provided cursor. */
private final SegmentFactory segmentFactory; /* Segment factory. */

Expand All @@ -56,13 +52,28 @@ class Changefeed {
Changefeed(BlobContainerAsyncClient client, OffsetDateTime startTime, OffsetDateTime endTime,
ChangefeedCursor userCursor, SegmentFactory segmentFactory) {
this.client = client;
this.startTime = startTime;
this.endTime = endTime;
this.startTime = TimeUtils.roundDownToNearestHour(startTime);
this.endTime = TimeUtils.roundUpToNearestHour(endTime);
this.userCursor = userCursor;
this.segmentFactory = segmentFactory;

this.cfCursor = new ChangefeedCursor(this.endTime);
this.safeEndTime = endTime;
String urlHost = null;
try {
urlHost = new URL(client.getBlobContainerUrl()).getHost();
} catch (MalformedURLException e) {
throw logger.logExceptionAsError(new RuntimeException(e));
}
this.changefeedCursor = new ChangefeedCursor(urlHost, this.endTime);

/* Validate the cursor. */
if (userCursor != null) {
if (userCursor.getCursorVersion() != 1) {
throw logger.logExceptionAsError(new IllegalArgumentException("Unsupported cursor version."));
}
if (!Objects.equals(urlHost, userCursor.getUrlHost())) {
throw logger.logExceptionAsError(new IllegalArgumentException("Cursor URL host does not match "
+ "container URL host."));
}
}
}

/**
Expand All @@ -72,8 +83,14 @@ class Changefeed {
Flux<BlobChangefeedEventWrapper> getEvents() {
return validateChangefeed()
.then(populateLastConsumable())
.thenMany(listYears())
.concatMap(this::listSegmentsForYear)
.flatMapMany(safeEndTime ->
listYears(safeEndTime).map(str -> Tuples.of(safeEndTime, str))
)
.concatMap(tuple2 -> {
OffsetDateTime safeEndTime = tuple2.getT1();
String year = tuple2.getT2();
return listSegmentsForYear(safeEndTime, year);
})
.concatMap(this::getEventsForSegment);
}

Expand All @@ -91,7 +108,6 @@ private Mono<Boolean> validateChangefeed() {
});
}

/* TODO (gapra) : Investigate making this thread safe. */
/**
* Populates the last consumable property from changefeed metadata.
* Log files in any segment that is dated after the date of the LastConsumable property in the
Expand All @@ -100,25 +116,24 @@ private Mono<Boolean> validateChangefeed() {
private Mono<OffsetDateTime> populateLastConsumable() {
/* We can keep the entire metadata file in memory since it is expected to only be a few hundred bytes. */
return DownloadUtils.downloadToByteArray(this.client, METADATA_SEGMENT_PATH)
.flatMap(DownloadUtils::parseJson)
/* Parse JSON for last consumable. */
.flatMap(json -> {
try {
JsonNode jsonNode = MAPPER.reader().readTree(json);
this.lastConsumable = OffsetDateTime.parse(jsonNode.get("lastConsumable").asText());
if (this.lastConsumable.isBefore(endTime)) {
this.safeEndTime = this.lastConsumable;
}
return Mono.just(this.lastConsumable);
} catch (IOException e) {
return FluxUtil.monoError(logger, new UncheckedIOException(e));
.flatMap(jsonNode -> {
/* Last consumable time. The latest time the changefeed can safely be read from.*/
OffsetDateTime lastConsumableTime = OffsetDateTime.parse(jsonNode.get("lastConsumable").asText());
/* Soonest time between lastConsumable and endTime. */
OffsetDateTime safeEndTime = this.endTime;
if (lastConsumableTime.isBefore(endTime)) {
kasobol-msft marked this conversation as resolved.
Show resolved Hide resolved
safeEndTime = lastConsumableTime.plusHours(1); /* Add an hour since end time is non inclusive. */
}
return Mono.just(safeEndTime);
});
}

/**
* List years for which changefeed data exists.
*/
private Flux<String> listYears() {
private Flux<String> listYears(OffsetDateTime safeEndTime) {
return client.listBlobsByHierarchy(SEGMENT_PREFIX)
.map(BlobItem::getName)
.filter(yearPath -> TimeUtils.validYear(yearPath, startTime, safeEndTime));
Expand All @@ -127,7 +142,7 @@ private Flux<String> listYears() {
/**
* List segments for years of interest.
*/
private Flux<String> listSegmentsForYear(String year) {
private Flux<String> listSegmentsForYear(OffsetDateTime safeEndTime, String year) {
return client.listBlobs(new ListBlobsOptions().setPrefix(year))
.map(BlobItem::getName)
.filter(segmentPath -> TimeUtils.validSegment(segmentPath, startTime, safeEndTime));
Expand All @@ -139,12 +154,15 @@ private Flux<String> listSegmentsForYear(String year) {
private Flux<BlobChangefeedEventWrapper> getEventsForSegment(String segment) {
OffsetDateTime segmentTime = TimeUtils.convertPathToTime(segment);
/* Only pass the user cursor in to the segment of interest. */
if (userCursor != null && segmentTime.isEqual(OffsetDateTime.parse(userCursor.getSegmentTime()))) {
return segmentFactory.getSegment(segment, cfCursor.toSegmentCursor(segmentTime), userCursor)
.getEvents();
if (userCursor != null && segmentTime.isEqual(startTime)) {
return segmentFactory.getSegment(segment,
changefeedCursor.toSegmentCursor(segment, userCursor.getCurrentSegmentCursor()),
userCursor.getCurrentSegmentCursor()).getEvents();
} else {
return segmentFactory.getSegment(segment,
changefeedCursor.toSegmentCursor(segment, null),
null).getEvents();
}
return segmentFactory.getSegment(segment, cfCursor.toSegmentCursor(segmentTime), null)
.getEvents();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.azure.core.util.logging.ClientLogger;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.changefeed.implementation.models.ChangefeedCursor;
import com.azure.storage.blob.changefeed.implementation.util.TimeUtils;
import com.azure.storage.common.implementation.StorageImplUtils;

import java.time.OffsetDateTime;
Expand Down Expand Up @@ -47,8 +48,8 @@ Changefeed getChangefeed(String cursor) {
StorageImplUtils.assertNotNull("cursor", cursor);

ChangefeedCursor userCursor = ChangefeedCursor.deserialize(cursor, logger);
OffsetDateTime start = OffsetDateTime.parse(userCursor.getSegmentTime());
OffsetDateTime end = OffsetDateTime.parse(userCursor.getEndTime());
OffsetDateTime start = TimeUtils.convertPathToTime(userCursor.getCurrentSegmentCursor().getSegmentPath());
OffsetDateTime end = userCursor.getEndTime();

return new Changefeed(this.client, start, end, userCursor, segmentFactory);
}
Expand Down
Loading