From a482452f8f262a7b7963a947773f6b2c61d7204e Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Fri, 1 Jul 2022 15:58:30 -0700 Subject: [PATCH] Properly update the hasEmitted state --- .../workers/internal/EmptyAirbyteSource.java | 1 + .../workers/internal/EmptyAirbyteSourceTest.java | 14 ++++++++++++++ 2 files changed, 15 insertions(+) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/internal/EmptyAirbyteSource.java b/airbyte-workers/src/main/java/io/airbyte/workers/internal/EmptyAirbyteSource.java index 8c342cecb3cc..747f9eb4cac6 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/internal/EmptyAirbyteSource.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/internal/EmptyAirbyteSource.java @@ -150,6 +150,7 @@ private Optional emitStreamState() { final StreamDescriptor streamDescriptor = streamsToReset.poll(); return Optional.of(getNullStreamStateMessage(streamDescriptor)); } else { + hasEmittedState.compareAndSet(false, true); return Optional.empty(); } } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/internal/EmptyAirbyteSourceTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/internal/EmptyAirbyteSourceTest.java index 928a9ee9995e..5c5d90442e61 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/internal/EmptyAirbyteSourceTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/internal/EmptyAirbyteSourceTest.java @@ -181,6 +181,8 @@ public void testGlobalPartial() throws Exception { Assertions.assertThat(emptyAirbyteSource.attemptRead()) .isEmpty(); + + Assertions.assertThat(emptyAirbyteSource.isFinished()).isTrue(); } @Test @@ -221,6 +223,8 @@ public void testGlobalNewStream() throws Exception { Assertions.assertThat(emptyAirbyteSource.attemptRead()) .isEmpty(); + + Assertions.assertThat(emptyAirbyteSource.isFinished()).isTrue(); } @Test @@ -243,6 +247,8 @@ public void testPerStream() throws Exception { Assertions.assertThat(emptyAirbyteSource.attemptRead()) .isEmpty(); + + Assertions.assertThat(emptyAirbyteSource.isFinished()).isTrue(); } @Test @@ -266,6 +272,8 @@ public void testPerStreamWithExtraState() throws Exception { Assertions.assertThat(emptyAirbyteSource.attemptRead()) .isEmpty(); + + Assertions.assertThat(emptyAirbyteSource.isFinished()).isTrue(); } @Test @@ -290,6 +298,8 @@ public void testPerStreamWithMissingState() throws Exception { Assertions.assertThat(emptyAirbyteSource.attemptRead()) .isEmpty(); + + Assertions.assertThat(emptyAirbyteSource.isFinished()).isTrue(); } @Test @@ -350,6 +360,8 @@ public void testLegacyWithNewConfig() throws Exception { Assertions.assertThat(emptyAirbyteSource.attemptRead()) .isEmpty(); + + Assertions.assertThat(emptyAirbyteSource.isFinished()).isTrue(); } @Test @@ -383,6 +395,8 @@ public void testLegacyWithNullState() throws Exception { Assertions.assertThat(emptyAirbyteSource.attemptRead()) .isEmpty(); + + Assertions.assertThat(emptyAirbyteSource.isFinished()).isTrue(); } private void testReceiveNullStreamState(final StreamDescriptor streamDescriptor) {