Skip to content

Commit

Permalink
Update error handling logic and add tests for EPC (#6978)
Browse files Browse the repository at this point in the history
* Update error handling logic and add tests for EPC

* Fix unit tests

* Add message to exceptions and make initial position exclusive

* Revert changes to make event position exclusive
  • Loading branch information
srnagar authored Dec 20, 2019
1 parent 6f3e5d9 commit f37c90c
Show file tree
Hide file tree
Showing 8 changed files with 275 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,19 @@
import com.azure.core.annotation.ServiceClient;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.implementation.PartitionProcessor;
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.ErrorContext;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicReference;
import reactor.core.Disposable;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import reactor.core.Disposable;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/**
* EventProcessorClient provides a convenient mechanism to consume events from all partitions of an Event Hub in the
Expand Down Expand Up @@ -54,33 +54,31 @@ public class EventProcessorClient {
* @param eventHubClientBuilder The {@link EventHubClientBuilder}.
* @param consumerGroup The consumer group name used in this event processor to consumer events.
* @param partitionProcessorFactory The factory to create new partition processor(s).
* @param initialEventPosition Initial event position to start consuming events.
* @param checkpointStore The store used for reading and updating partition ownership and checkpoints. information.
* @param trackLastEnqueuedEventProperties If set to {@code true}, all events received by this
* EventProcessorClient will also include the last enqueued event properties for it's respective partitions.
* @param tracerProvider The tracer implementation.
*/
EventProcessorClient(EventHubClientBuilder eventHubClientBuilder, String consumerGroup,
Supplier<PartitionProcessor> partitionProcessorFactory, EventPosition initialEventPosition,
CheckpointStore checkpointStore, boolean trackLastEnqueuedEventProperties, TracerProvider tracerProvider) {
Supplier<PartitionProcessor> partitionProcessorFactory, CheckpointStore checkpointStore,
boolean trackLastEnqueuedEventProperties, TracerProvider tracerProvider, Consumer<ErrorContext> processError) {

Objects.requireNonNull(eventHubClientBuilder, "eventHubClientBuilder cannot be null.");
Objects.requireNonNull(consumerGroup, "consumerGroup cannot be null.");
Objects.requireNonNull(partitionProcessorFactory, "partitionProcessorFactory cannot be null.");
Objects.requireNonNull(initialEventPosition, "initialEventPosition cannot be null.");

this.checkpointStore = Objects.requireNonNull(checkpointStore, "checkpointStore cannot be null");
this.identifier = UUID.randomUUID().toString();
logger.info("The instance ID for this event processors is {}", this.identifier);
this.partitionPumpManager = new PartitionPumpManager(checkpointStore, partitionProcessorFactory,
initialEventPosition, eventHubClientBuilder, trackLastEnqueuedEventProperties, tracerProvider);
eventHubClientBuilder, trackLastEnqueuedEventProperties, tracerProvider);
EventHubAsyncClient eventHubAsyncClient = eventHubClientBuilder.buildAsyncClient();
this.partitionBasedLoadBalancer =
new PartitionBasedLoadBalancer(this.checkpointStore, eventHubAsyncClient,
eventHubAsyncClient.getFullyQualifiedNamespace().toLowerCase(Locale.ROOT),
eventHubAsyncClient.getEventHubName().toLowerCase(Locale.ROOT),
consumerGroup.toLowerCase(Locale.ROOT), identifier, TimeUnit.MINUTES.toSeconds(1),
partitionPumpManager);
partitionPumpManager, processError);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,8 @@ public EventProcessorClient buildEventProcessorClient() {

final TracerProvider tracerProvider = new TracerProvider(ServiceLoader.load(Tracer.class));
return new EventProcessorClient(eventHubClientBuilder, this.consumerGroup,
getPartitionProcessorSupplier(), EventPosition.earliest(), checkpointStore,
trackLastEnqueuedEventProperties, tracerProvider);
getPartitionProcessorSupplier(), checkpointStore, trackLastEnqueuedEventProperties, tracerProvider,
processError);
}

private Supplier<PartitionProcessor> getPartitionProcessorSupplier() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@

import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.models.ErrorContext;
import com.azure.messaging.eventhubs.models.PartitionContext;
import com.azure.messaging.eventhubs.models.PartitionOwnership;
import java.util.function.Consumer;
import reactor.core.Exceptions;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
Expand Down Expand Up @@ -47,6 +50,8 @@ final class PartitionBasedLoadBalancer {
private final long inactiveTimeLimitInSeconds;
private final PartitionPumpManager partitionPumpManager;
private final String fullyQualifiedNamespace;
private final Consumer<ErrorContext> processError;
private final PartitionContext partitionAgnosticContext;

/**
* Creates an instance of PartitionBasedLoadBalancer for the given Event Hub name and consumer group.
Expand All @@ -60,11 +65,13 @@ final class PartitionBasedLoadBalancer {
* assuming the owner of the partition is inactive.
* @param partitionPumpManager The partition pump manager that keeps track of all EventHubConsumers and partitions
* that this {@link EventProcessorClient} is processing.
* @param processError The callback that will be called when an error occurs while running the load balancer.
*/
PartitionBasedLoadBalancer(final CheckpointStore checkpointStore,
final EventHubAsyncClient eventHubAsyncClient, final String fullyQualifiedNamespace,
final String eventHubName, final String consumerGroupName, final String ownerId,
final long inactiveTimeLimitInSeconds, final PartitionPumpManager partitionPumpManager) {
final long inactiveTimeLimitInSeconds, final PartitionPumpManager partitionPumpManager,
final Consumer<ErrorContext> processError) {
this.checkpointStore = checkpointStore;
this.eventHubAsyncClient = eventHubAsyncClient;
this.fullyQualifiedNamespace = fullyQualifiedNamespace;
Expand All @@ -73,6 +80,9 @@ final class PartitionBasedLoadBalancer {
this.ownerId = ownerId;
this.inactiveTimeLimitInSeconds = inactiveTimeLimitInSeconds;
this.partitionPumpManager = partitionPumpManager;
this.processError = processError;
this.partitionAgnosticContext = new PartitionContext(fullyQualifiedNamespace, eventHubName,
consumerGroupName, "NONE");
}

/**
Expand Down Expand Up @@ -107,8 +117,12 @@ void loadBalance() {
Mono.zip(partitionOwnershipMono, partitionsMono)
.flatMap(this::loadBalance)
// if there was an error, log warning and TODO: call user provided error handler
.doOnError(ex -> logger.warning(Messages.LOAD_BALANCING_FAILED, ex.getMessage()))
.subscribe();
.subscribe(ignored -> { },
ex -> {
logger.warning(Messages.LOAD_BALANCING_FAILED, ex.getMessage());
ErrorContext errorContext = new ErrorContext(partitionAgnosticContext, ex);
processError.accept(errorContext);
}, () -> logger.info("Load balancing completed successfully"));
}

/*
Expand Down Expand Up @@ -350,7 +364,10 @@ private void claimOwnership(final Map<String, PartitionOwnership> partitionOwner
.stream()
.forEach(po -> partitionPumpManager.startPartitionPump(po,
ownedPartitionCheckpointsTuple.getT2().get(po.getPartitionId())));
});
},
ex -> {
throw logger.logExceptionAsError(new RuntimeException("Error while listing checkpoints", ex));
});
}

private PartitionOwnership createPartitionOwnershipRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@

package com.azure.messaging.eventhubs;

import com.azure.core.amqp.exception.AmqpException;
import static com.azure.core.util.tracing.Tracer.DIAGNOSTIC_ID_KEY;
import static com.azure.core.util.tracing.Tracer.SCOPE_KEY;
import static com.azure.core.util.tracing.Tracer.SPAN_CONTEXT_KEY;
import static com.azure.core.util.tracing.Tracer.ENTITY_PATH_KEY;
import static com.azure.core.util.tracing.Tracer.HOST_NAME_KEY;

import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.util.Context;
import com.azure.core.util.logging.ClientLogger;
Expand All @@ -12,28 +17,21 @@
import com.azure.messaging.eventhubs.models.Checkpoint;
import com.azure.messaging.eventhubs.models.CloseContext;
import com.azure.messaging.eventhubs.models.CloseReason;
import com.azure.messaging.eventhubs.models.ErrorContext;
import com.azure.messaging.eventhubs.models.EventContext;
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.ErrorContext;
import com.azure.messaging.eventhubs.models.InitializationContext;
import com.azure.messaging.eventhubs.models.PartitionContext;
import com.azure.messaging.eventhubs.models.PartitionOwnership;
import com.azure.messaging.eventhubs.models.ReceiveOptions;
import reactor.core.publisher.Signal;

import java.io.Closeable;
import java.io.IOException;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

import static com.azure.core.util.tracing.Tracer.DIAGNOSTIC_ID_KEY;
import static com.azure.core.util.tracing.Tracer.ENTITY_PATH_KEY;
import static com.azure.core.util.tracing.Tracer.HOST_NAME_KEY;
import static com.azure.core.util.tracing.Tracer.SCOPE_KEY;
import static com.azure.core.util.tracing.Tracer.SPAN_CONTEXT_KEY;
import reactor.core.publisher.Signal;

/**
* The partition pump manager that keeps track of all the partition pumps started by this {@link EventProcessorClient}.
Expand All @@ -52,7 +50,6 @@ class PartitionPumpManager {
private final CheckpointStore checkpointStore;
private final Map<String, EventHubConsumerAsyncClient> partitionPumps = new ConcurrentHashMap<>();
private final Supplier<PartitionProcessor> partitionProcessorFactory;
private final EventPosition initialEventPosition;
private final EventHubClientBuilder eventHubClientBuilder;
private final TracerProvider tracerProvider;
private final boolean trackLastEnqueuedEventProperties;
Expand All @@ -63,21 +60,17 @@ class PartitionPumpManager {
* @param checkpointStore The partition manager that is used to store and update checkpoints.
* @param partitionProcessorFactory The partition processor factory that is used to create new instances of {@link
* PartitionProcessor} when new partition pumps are started.
* @param initialEventPosition The initial event position to use when a new partition pump is created and no
* checkpoint for the partition is available.
* @param eventHubClientBuilder The client builder used to create new clients (and new connections) for each
* partition processed by this {@link EventProcessorClient}.
* @param trackLastEnqueuedEventProperties If set to {@code true}, all events received by this
* EventProcessorClient will also include the last enqueued event properties for it's respective partitions.
* @param trackLastEnqueuedEventProperties If set to {@code true}, all events received by this EventProcessorClient
* will also include the last enqueued event properties for it's respective partitions.
* @param tracerProvider The tracer implementation.
*/
PartitionPumpManager(CheckpointStore checkpointStore,
Supplier<PartitionProcessor> partitionProcessorFactory, EventPosition initialEventPosition,
EventHubClientBuilder eventHubClientBuilder, boolean trackLastEnqueuedEventProperties,
TracerProvider tracerProvider) {
Supplier<PartitionProcessor> partitionProcessorFactory, EventHubClientBuilder eventHubClientBuilder,
boolean trackLastEnqueuedEventProperties, TracerProvider tracerProvider) {
this.checkpointStore = checkpointStore;
this.partitionProcessorFactory = partitionProcessorFactory;
this.initialEventPosition = initialEventPosition;
this.eventHubClientBuilder = eventHubClientBuilder;
this.trackLastEnqueuedEventProperties = trackLastEnqueuedEventProperties;
this.tracerProvider = tracerProvider;
Expand Down Expand Up @@ -121,12 +114,17 @@ void startPartitionPump(PartitionOwnership claimedOwnership, Checkpoint checkpoi
partitionProcessor.initialize(initializationContext);

EventPosition startFromEventPosition = null;
// A checkpoint indicates the last known successfully processed event.
// So, the event position to start a new partition processing should be exclusive of the
// offset/sequence number in the checkpoint. If no checkpoint is available, start from
// the position in set in the InitializationContext (either the earliest event in the partition or
// the user provided initial position)
if (checkpoint != null && checkpoint.getOffset() != null) {
startFromEventPosition = EventPosition.fromOffset(checkpoint.getOffset());
} else if (checkpoint != null && checkpoint.getSequenceNumber() != null) {
startFromEventPosition = EventPosition.fromSequenceNumber(checkpoint.getSequenceNumber(), true);
startFromEventPosition = EventPosition.fromSequenceNumber(checkpoint.getSequenceNumber());
} else {
startFromEventPosition = initialEventPosition;
startFromEventPosition = initializationContext.getInitialPosition();
}

ReceiveOptions receiveOptions = new ReceiveOptions().setOwnerLevel(0L)
Expand All @@ -151,42 +149,27 @@ void startPartitionPump(PartitionOwnership claimedOwnership, Checkpoint checkpoi
partitionProcessor.processEvent(new EventContext(partitionContext, eventData, checkpointStore,
partitionEvent.getLastEnqueuedEventProperties()));
endProcessTracingSpan(processSpanContext, Signal.complete());
} catch (Exception ex) {
/* event processing threw an exception */
handleProcessingError(claimedOwnership, partitionProcessor, ex, partitionContext);
endProcessTracingSpan(processSpanContext, Signal.error(ex));

} catch (Throwable throwable) {
/* user code for event processing threw an exception - log and bubble up */
endProcessTracingSpan(processSpanContext, Signal.error(throwable));
throw logger.logExceptionAsError(new RuntimeException("Error in event processing callback",
throwable));
}
}, /* EventHubConsumer receive() returned an error */
ex -> handleReceiveError(claimedOwnership, eventHubConsumer, partitionProcessor, ex, partitionContext),
() -> partitionProcessor.close(new CloseContext(partitionContext,
CloseReason.EVENT_PROCESSOR_SHUTDOWN)));
// @formatter:on
}

private void handleProcessingError(PartitionOwnership claimedOwnership, PartitionProcessor partitionProcessor,
Throwable error, PartitionContext partitionContext) {
try {
// There was an error in process event (user provided code), call process error and if that
// also fails just log and continue
partitionProcessor.processError(new ErrorContext(partitionContext, error));
} catch (Exception ex) {
logger.warning(Messages.FAILED_WHILE_PROCESSING_ERROR, claimedOwnership.getPartitionId(), ex);
}
// @formatter:on
}

private void handleReceiveError(PartitionOwnership claimedOwnership, EventHubConsumerAsyncClient eventHubConsumer,
PartitionProcessor partitionProcessor, Throwable error, PartitionContext partitionContext) {
PartitionProcessor partitionProcessor, Throwable throwable, PartitionContext partitionContext) {
try {
logger.warning("Error receiving events for partition {}", partitionContext.getPartitionId(), throwable);
// if there was an error on receive, it also marks the end of the event data stream
partitionProcessor.processError(new ErrorContext(partitionContext, error));
CloseReason closeReason = CloseReason.EVENT_HUB_EXCEPTION;
// If the exception indicates that the partition was stolen (i.e some other consumer with same ownerlevel
// started consuming the partition), update the closeReason
// TODO: Find right exception type to determine stolen partition
if (error instanceof AmqpException) {
closeReason = CloseReason.LOST_PARTITION_OWNERSHIP;
}
partitionProcessor.processError(new ErrorContext(partitionContext, throwable));
// Any exception while receiving events will result in the processor losing ownership
CloseReason closeReason = CloseReason.LOST_PARTITION_OWNERSHIP;
partitionProcessor.close(new CloseContext(partitionContext, closeReason));
} catch (Exception ex) {
logger.warning(Messages.FAILED_PROCESSING_ERROR_RECEIVE, claimedOwnership.getPartitionId(), ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,4 @@ public enum CloseReason {
* PartitionProcessor#close(CloseContext)} will be called with this reason.
*/
EVENT_PROCESSOR_SHUTDOWN,

/**
* If a non-retryable exception occured when receiving events from Event Hub, this reason will be provided when
* {@link PartitionProcessor#close(CloseContext)} is called.
*/
EVENT_HUB_EXCEPTION
}
Loading

0 comments on commit f37c90c

Please sign in to comment.