Skip to content

Commit

Permalink
feat: restart interrupted data flows
Browse files Browse the repository at this point in the history
  • Loading branch information
ndr-brt committed Nov 11, 2024
1 parent 858d09b commit 842d0f3
Show file tree
Hide file tree
Showing 11 changed files with 221 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;

import static java.util.concurrent.TimeUnit.SECONDS;
Expand All @@ -49,6 +50,7 @@ public class EmbeddedRuntime extends BaseRuntime {
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final MultiSourceServiceLocator serviceLocator;
private final URL[] classPathEntries;
private Future<?> runtimeThread;

public EmbeddedRuntime(String name, Map<String, String> properties, String... additionalModules) {
this(new MultiSourceServiceLocator(), name, properties, ClasspathReader.classpathFor(additionalModules));
Expand Down Expand Up @@ -78,7 +80,7 @@ public void boot(boolean addShutdownHook) {
var runtimeException = new AtomicReference<Exception>();
var latch = new CountDownLatch(1);

executorService.execute(() -> {
runtimeThread = executorService.submit(() -> {
try {
var classLoader = URLClassLoader.newInstance(classPathEntries);

Expand Down Expand Up @@ -110,6 +112,9 @@ public void boot(boolean addShutdownHook) {
public void shutdown() {
serviceLocator.clearSystemExtensions();
super.shutdown();
if (runtimeThread != null && !runtimeThread.isDone()) {
runtimeThread.cancel(true);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
public class StateMachineManager {

private final List<Processor> processors = new ArrayList<>();
private final List<Processor> startupProcessors = new ArrayList<>();
private final ScheduledExecutorService executor;
private final AtomicBoolean active = new AtomicBoolean();
private final WaitStrategy waitStrategy;
Expand All @@ -65,6 +66,7 @@ private StateMachineManager(String name, Monitor monitor, ExecutorInstrumentatio
*/
public Future<?> start() {
active.set(true);
performStartupLogic();
return scheduleNextIterationIn(0L);
}

Expand Down Expand Up @@ -103,6 +105,16 @@ private Runnable loop() {
};
}

private void performStartupLogic() {
for (var startupProcessor : startupProcessors) {
try {
startupProcessor.process();
} catch (Throwable e) {
monitor.severe(format("StateMachineManager [%s] startup error caught", name), e);
}
}
}

private void performLogic() {
try {
var processed = processors.stream()
Expand Down Expand Up @@ -150,6 +162,17 @@ public Builder shutdownTimeout(int seconds) {
return this;
}

/**
* Register a processor that will run once at startup before the regular processors.
*
* @param startupProcessor the processor.
* @return the builder.
*/
public Builder startupProcessor(Processor startupProcessor) {
loop.startupProcessors.add(startupProcessor);
return this;
}

public StateMachineManager build() {
return loop;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.only;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
Expand All @@ -45,7 +47,7 @@ void setUp() {
}

@Test
void shouldExecuteProcessorsAsyncAndCanBeStopped() throws InterruptedException {
void shouldExecuteProcessorsAsyncAndCanBeStopped() {
var processor = mock(Processor.class);
when(processor.process()).thenAnswer(i -> {
Thread.sleep(100L);
Expand All @@ -67,12 +69,10 @@ void shouldExecuteProcessorsAsyncAndCanBeStopped() throws InterruptedException {
}

@Test
void shouldNotWaitForSomeTimeIfTheresAtLeastOneProcessedEntity() throws InterruptedException {
void shouldNotWaitForSomeTimeIfTheresAtLeastOneProcessedEntity() {
var processor = mock(Processor.class);
when(processor.process()).thenReturn(1L);
doAnswer(i -> {
return 1L;
}).when(waitStrategy).success();
doAnswer(i -> 1L).when(waitStrategy).success();
var stateMachine = StateMachineManager.Builder.newInstance("test", monitor, instrumentation, waitStrategy)
.processor(processor)
.build();
Expand All @@ -86,13 +86,11 @@ void shouldNotWaitForSomeTimeIfTheresAtLeastOneProcessedEntity() throws Interrup
}

@Test
void shouldWaitForSomeTimeIfNoEntityIsProcessed() throws InterruptedException {
void shouldWaitForSomeTimeIfNoEntityIsProcessed() {
var processor = mock(Processor.class);
when(processor.process()).thenReturn(0L);
var waitStrategy = mock(WaitStrategy.class);
doAnswer(i -> {
return 0L;
}).when(waitStrategy).waitForMillis();
doAnswer(i -> 0L).when(waitStrategy).waitForMillis();
var stateMachine = StateMachineManager.Builder.newInstance("test", monitor, instrumentation, waitStrategy)
.processor(processor)
.build();
Expand All @@ -118,12 +116,10 @@ void shouldExitWithAnExceptionIfProcessorExitsWithAnUnrecoverableError() {
}

@Test
void shouldWaitRetryTimeWhenAnExceptionIsThrownByAnProcessor() throws InterruptedException {
void shouldWaitRetryTimeWhenAnExceptionIsThrownByAnProcessor() {
var processor = mock(Processor.class);
when(processor.process()).thenThrow(new EdcException("exception")).thenReturn(0L);
when(waitStrategy.retryInMillis()).thenAnswer(i -> {
return 1L;
});
when(waitStrategy.retryInMillis()).thenAnswer(i -> 1L);
var stateMachine = StateMachineManager.Builder.newInstance("test", monitor, instrumentation, waitStrategy)
.processor(processor)
.build();
Expand All @@ -135,4 +131,33 @@ void shouldWaitRetryTimeWhenAnExceptionIsThrownByAnProcessor() throws Interrupte
verify(waitStrategy).retryInMillis();
});
}

@Test
void shouldExecuteStartupProcessorOnce() {
var processor = mock(Processor.class);
when(processor.process()).thenAnswer(i -> {
Thread.sleep(100L);
return 1L;
});
var startupProcessor = mock(Processor.class);
when(startupProcessor.process()).thenAnswer(i -> {
Thread.sleep(100L);
return 1L;
});
var stateMachine = StateMachineManager.Builder.newInstance("test", monitor, instrumentation, waitStrategy)
.startupProcessor(startupProcessor)
.processor(processor)
.shutdownTimeout(1)
.build();

stateMachine.start();

await().untilAsserted(() -> {
verify(processor, atLeast(2)).process();
verify(startupProcessor, only()).process();

assertThat(stateMachine.stop()).succeedsWithin(2, SECONDS);
verifyNoMoreInteractions(processor);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ public void shutdown() {
if (dataPlaneManager != null) {
dataPlaneManager.stop();
}
pipelineService.closeAll();
}

@Provider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ public StatusResult<Void> terminate(String dataFlowId, @Nullable String reason)
@Override
protected StateMachineManager.Builder configureStateMachineManager(StateMachineManager.Builder builder) {
return builder
.startupProcessor(processDataFlowInState(STARTED, this::restartFlow))
.processor(processDataFlowInState(RECEIVED, this::processReceived))
.processor(processDataFlowInState(COMPLETED, this::processCompleted))
.processor(processDataFlowInState(FAILED, this::processFailed));
Expand Down Expand Up @@ -192,6 +193,11 @@ private Result<DataFlowResponseMessage> handlePush(DataFlowStartMessage startMes
.build());
}

private boolean restartFlow(DataFlow dataFlow) {
dataFlow.transitToReceived();
return processReceived(dataFlow);
}

private boolean processReceived(DataFlow dataFlow) {
var request = dataFlow.toRequest();
var transferService = transferServiceRegistry.resolveTransferService(request);
Expand All @@ -206,7 +212,7 @@ private boolean processReceived(DataFlow dataFlow) {
store.save(dataFlow);

return entityRetryProcessFactory.doAsyncProcess(dataFlow, () -> transferService.transfer(request))
.entityRetrieve(id -> store.findById(id))
.entityRetrieve(id -> store.findByIdAndLease(id).orElse(f -> null))
.onSuccess((f, r) -> {
if (f.getState() != STARTED.code()) {
return;
Expand Down Expand Up @@ -297,6 +303,7 @@ public Builder authorizationService(DataPlaneAuthorizationService authorizationS
manager.authorizationService = authorizationService;
return this;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ public StreamResult<Void> terminate(DataFlow dataFlow) {
return terminate(dataFlow.getId());
}

@Override
public void closeAll() {
sources.forEach((processId, source) -> terminate(processId));
}

@Override
public void registerFactory(DataSourceFactory factory) {
sourceFactories.add(factory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package org.eclipse.edc.connector.dataplane.framework;

import org.eclipse.edc.connector.dataplane.framework.registry.TransferServiceRegistryImpl;
import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService;
import org.eclipse.edc.connector.dataplane.spi.registry.TransferServiceRegistry;
import org.eclipse.edc.junit.extensions.DependencyInjectionExtension;
import org.eclipse.edc.spi.system.ExecutorInstrumentation;
Expand All @@ -24,12 +25,17 @@
import org.junit.jupiter.api.extension.ExtendWith;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

@ExtendWith(DependencyInjectionExtension.class)
class DataPlaneFrameworkExtensionTest {

private final PipelineService pipelineService = mock();

@BeforeEach
public void setUp(ServiceExtensionContext context) {
context.registerService(PipelineService.class, pipelineService);
context.registerService(ExecutorInstrumentation.class, ExecutorInstrumentation.noop());
}

Expand All @@ -40,4 +46,10 @@ void initialize_registers_transferService(ServiceExtensionContext context, DataP
assertThat(context.getService(TransferServiceRegistry.class)).isInstanceOf(TransferServiceRegistryImpl.class);
}

@Test
void shouldClosePipelineService_whenShutdown(DataPlaneFrameworkExtension extension) {
extension.shutdown();

verify(pipelineService).closeAll();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ class Received {
void shouldStartTransferTransitionAndTransitionToStarted() {
var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build();
when(store.nextNotLeased(anyInt(), stateIs(RECEIVED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList());
when(store.findById(any())).thenReturn(dataFlow);
when(store.findByIdAndLease(any())).thenReturn(StoreResult.success(dataFlow));
when(registry.resolveTransferService(any())).thenReturn(transferService);
when(transferService.canHandle(any())).thenReturn(true);
when(transferService.transfer(any())).thenReturn(new CompletableFuture<>());
Expand All @@ -373,7 +373,7 @@ void shouldStartTransferTransitionAndTransitionToStarted() {
void shouldStarTransitionToCompleted_whenTransferSucceeds() {
var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build();
when(store.nextNotLeased(anyInt(), stateIs(RECEIVED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList());
when(store.findById(any())).thenReturn(dataFlow);
when(store.findByIdAndLease(any())).thenReturn(StoreResult.success(dataFlow));
when(registry.resolveTransferService(any())).thenReturn(transferService);
when(transferService.canHandle(any())).thenReturn(true);
when(transferService.transfer(any())).thenReturn(completedFuture(StreamResult.success()));
Expand All @@ -391,7 +391,7 @@ void shouldStartTransferAndNotTransitionToCompleted_whenTransferSucceedsBecauseI
var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build();
var terminatedDataFlow = dataFlowBuilder().state(TERMINATED.code()).build();
when(store.nextNotLeased(anyInt(), stateIs(RECEIVED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList());
when(store.findById(any())).thenReturn(terminatedDataFlow);
when(store.findByIdAndLease(any())).thenReturn(StoreResult.success(terminatedDataFlow));
when(registry.resolveTransferService(any())).thenReturn(transferService);
when(transferService.canHandle(any())).thenReturn(true);
when(transferService.transfer(any())).thenReturn(completedFuture(StreamResult.success()));
Expand All @@ -409,7 +409,7 @@ void shouldNotChangeState_whenTransferGetsSuspended() {
var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build();
var terminatedDataFlow = dataFlowBuilder().state(SUSPENDED.code()).build();
when(store.nextNotLeased(anyInt(), stateIs(RECEIVED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList());
when(store.findById(any())).thenReturn(terminatedDataFlow);
when(store.findByIdAndLease(any())).thenReturn(StoreResult.success(terminatedDataFlow));
when(registry.resolveTransferService(any())).thenReturn(transferService);
when(transferService.canHandle(any())).thenReturn(true);
when(transferService.transfer(any())).thenReturn(completedFuture(StreamResult.success()));
Expand All @@ -426,7 +426,7 @@ void shouldNotChangeState_whenTransferGetsSuspended() {
void shouldStartTransferAndTransitionToFailed_whenTransferFails() {
var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build();
when(store.nextNotLeased(anyInt(), stateIs(RECEIVED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList());
when(store.findById(any())).thenReturn(dataFlow);
when(store.findByIdAndLease(any())).thenReturn(StoreResult.success(dataFlow));
when(registry.resolveTransferService(any())).thenReturn(transferService);
when(transferService.canHandle(any())).thenReturn(true);
when(transferService.transfer(any())).thenReturn(completedFuture(StreamResult.error("an error")));
Expand All @@ -443,7 +443,7 @@ void shouldStartTransferAndTransitionToFailed_whenTransferFails() {
void shouldStartTransferAndTransitionToReceivedForRetrying_whenTransferFutureIsFailed() {
var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build();
when(store.nextNotLeased(anyInt(), stateIs(RECEIVED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList());
when(store.findById(any())).thenReturn(dataFlow);
when(store.findByIdAndLease(any())).thenReturn(StoreResult.success(dataFlow));
when(registry.resolveTransferService(any())).thenReturn(transferService);
when(transferService.canHandle(any())).thenReturn(true);
when(transferService.transfer(any())).thenReturn(failedFuture(new RuntimeException("an error")));
Expand All @@ -460,7 +460,7 @@ void shouldStartTransferAndTransitionToReceivedForRetrying_whenTransferFutureIsF
void shouldTransitToFailedIfNoTransferServiceCanHandleStarted() {
var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build();
when(store.nextNotLeased(anyInt(), stateIs(RECEIVED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList());
when(store.findById(any())).thenReturn(dataFlow);
when(store.findByIdAndLease(any())).thenReturn(StoreResult.success(dataFlow));
when(registry.resolveTransferService(any())).thenReturn(null);

manager.start();
Expand Down Expand Up @@ -565,6 +565,25 @@ void shouldStillSuspend_whenDataFlowHasNoSource() {
}
}

@Nested
class RestartFlowsAtStartup {
@Test
void shouldRestartFlowsAtStartup() {
var dataFlow = dataFlowBuilder().state(STARTED.code()).build();
when(store.nextNotLeased(anyInt(), stateIs(STARTED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList());
when(registry.resolveTransferService(any())).thenReturn(transferService);
when(transferService.canHandle(any())).thenReturn(true);
when(transferService.transfer(any())).thenReturn(new CompletableFuture<>());

manager.start();

await().untilAsserted(() -> {
verify(transferService).transfer(isA(DataFlowStartMessage.class));
verify(store).save(argThat(it -> it.getState() == STARTED.code()));
});
}
}

private DataFlow.Builder dataFlowBuilder() {
return DataFlow.Builder.newInstance()
.source(DataAddress.Builder.newInstance().type("source").build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,25 @@ void shouldReturnSinkTypesFromFactories() {

}

@Nested
class CloseAll {

@Test
void shouldCloseAllTheOngoingDataFlows() throws Exception {
when(sourceFactory.supportedType()).thenReturn("source");
when(sourceFactory.createSource(any())).thenReturn(source);
when(sinkFactory.supportedType()).thenReturn("destination");
when(sinkFactory.createSink(any())).thenReturn(sink);
when(sink.transfer(any())).thenReturn(new CompletableFuture<>());

service.transfer(dataFlow("source", "destination").toRequest());

service.closeAll();

verify(source).close();
}
}

private DataFlow dataFlow(String sourceType, String destinationType) {
return DataFlow.Builder.newInstance()
.id("1")
Expand Down
Loading

0 comments on commit 842d0f3

Please sign in to comment.