Skip to content

Commit

Permalink
KAFKA-8456: Stabilize flaky StoreUpgradeIntegrationTest (apache#6941)
Browse files Browse the repository at this point in the history
Reviewers: Boyang Chen <[email protected]>, Guozhang Wang <[email protected]>, Bill Bejeck <[email protected]>, A. Sophie Blee-Goldman <[email protected]>
  • Loading branch information
mjsax authored Aug 1, 2019
1 parent a028f59 commit 06e246d
Showing 1 changed file with 150 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -329,56 +329,65 @@ private <K, V> void processKeyValueAndVerifyPlainCount(final K key,
IntegerSerializer.class),
CLUSTER.time);

TestUtils.waitForCondition(() -> {
try {
final ReadOnlyKeyValueStore<K, V> store =
kafkaStreams.store(STORE_NAME, QueryableStoreTypes.keyValueStore());
try (final KeyValueIterator<K, V> all = store.all()) {
final List<KeyValue<K, V>> storeContent = new LinkedList<>();
while (all.hasNext()) {
storeContent.add(all.next());
TestUtils.waitForCondition(
() -> {
try {
final ReadOnlyKeyValueStore<K, V> store =
kafkaStreams.store(STORE_NAME, QueryableStoreTypes.keyValueStore());
try (final KeyValueIterator<K, V> all = store.all()) {
final List<KeyValue<K, V>> storeContent = new LinkedList<>();
while (all.hasNext()) {
storeContent.add(all.next());
}
return storeContent.equals(expectedStoreContent);
}
return storeContent.equals(expectedStoreContent);
} catch (final Exception swallow) {
swallow.printStackTrace();
System.err.println(swallow.getMessage());
return false;
}
} catch (final Exception swallow) {
swallow.printStackTrace();
System.err.println(swallow.getMessage());
return false;
}
}, "Could not get expected result in time.");
},
60_000L,
"Could not get expected result in time.");
}

private <K> void verifyCountWithTimestamp(final K key,
final long value,
final long timestamp) throws Exception {
TestUtils.waitForCondition(() -> {
try {
final ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> store =
kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore());
final ValueAndTimestamp<Long> count = store.get(key);
return count.value() == value && count.timestamp() == timestamp;
} catch (final Exception swallow) {
swallow.printStackTrace();
System.err.println(swallow.getMessage());
return false;
}
}, "Could not get expected result in time.");
TestUtils.waitForCondition(
() -> {
try {
final ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> store =
kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore());
final ValueAndTimestamp<Long> count = store.get(key);
return count.value() == value && count.timestamp() == timestamp;
} catch (final Exception swallow) {
swallow.printStackTrace();
System.err.println(swallow.getMessage());
return false;
}
},
60_000L,
"Could not get expected result in time.");
}

private <K> void verifyCountWithSurrogateTimestamp(final K key,
final long value) throws Exception {
TestUtils.waitForCondition(() -> {
try {
final ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> store =
kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore());
final ValueAndTimestamp<Long> count = store.get(key);
return count.value() == value && count.timestamp() == -1L;
} catch (final Exception swallow) {
swallow.printStackTrace();
System.err.println(swallow.getMessage());
return false;
}
}, "Could not get expected result in time.");
TestUtils.waitForCondition(
() -> {
try {
final ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> store =
kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore());
final ValueAndTimestamp<Long> count = store.get(key);
return count.value() == value && count.timestamp() == -1L;
} catch (final Exception swallow) {
swallow.printStackTrace();
System.err.println(swallow.getMessage());
return false;
}
},
60_000L,
"Could not get expected result in time.");
}

private <K, V> void processKeyValueAndVerifyCount(final K key,
Expand All @@ -394,23 +403,26 @@ private <K, V> void processKeyValueAndVerifyCount(final K key,
IntegerSerializer.class),
timestamp);

TestUtils.waitForCondition(() -> {
try {
final ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> store =
kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore());
try (final KeyValueIterator<K, ValueAndTimestamp<V>> all = store.all()) {
final List<KeyValue<K, ValueAndTimestamp<V>>> storeContent = new LinkedList<>();
while (all.hasNext()) {
storeContent.add(all.next());
TestUtils.waitForCondition(
() -> {
try {
final ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> store =
kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore());
try (final KeyValueIterator<K, ValueAndTimestamp<V>> all = store.all()) {
final List<KeyValue<K, ValueAndTimestamp<V>>> storeContent = new LinkedList<>();
while (all.hasNext()) {
storeContent.add(all.next());
}
return storeContent.equals(expectedStoreContent);
}
return storeContent.equals(expectedStoreContent);
} catch (final Exception swallow) {
swallow.printStackTrace();
System.err.println(swallow.getMessage());
return false;
}
} catch (final Exception swallow) {
swallow.printStackTrace();
System.err.println(swallow.getMessage());
return false;
}
}, "Could not get expected result in time.");
},
60_000L,
"Could not get expected result in time.");
}

private <K, V> void processKeyValueAndVerifyCountWithTimestamp(final K key,
Expand All @@ -426,23 +438,26 @@ private <K, V> void processKeyValueAndVerifyCountWithTimestamp(final K key,
IntegerSerializer.class),
timestamp);

TestUtils.waitForCondition(() -> {
try {
final ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> store =
kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore());
try (final KeyValueIterator<K, ValueAndTimestamp<V>> all = store.all()) {
final List<KeyValue<K, ValueAndTimestamp<V>>> storeContent = new LinkedList<>();
while (all.hasNext()) {
storeContent.add(all.next());
TestUtils.waitForCondition(
() -> {
try {
final ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> store =
kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore());
try (final KeyValueIterator<K, ValueAndTimestamp<V>> all = store.all()) {
final List<KeyValue<K, ValueAndTimestamp<V>>> storeContent = new LinkedList<>();
while (all.hasNext()) {
storeContent.add(all.next());
}
return storeContent.equals(expectedStoreContent);
}
return storeContent.equals(expectedStoreContent);
} catch (final Exception swallow) {
swallow.printStackTrace();
System.err.println(swallow.getMessage());
return false;
}
} catch (final Exception swallow) {
swallow.printStackTrace();
System.err.println(swallow.getMessage());
return false;
}
}, "Could not get expected result in time.");
},
60_000L,
"Could not get expected result in time.");
}

@Test
Expand Down Expand Up @@ -789,56 +804,65 @@ private <K, V> void processWindowedKeyValueAndVerifyPlainCount(final K key,
IntegerSerializer.class),
CLUSTER.time);

TestUtils.waitForCondition(() -> {
try {
final ReadOnlyWindowStore<K, V> store =
kafkaStreams.store(STORE_NAME, QueryableStoreTypes.windowStore());
try (final KeyValueIterator<Windowed<K>, V> all = store.all()) {
final List<KeyValue<Windowed<K>, V>> storeContent = new LinkedList<>();
while (all.hasNext()) {
storeContent.add(all.next());
TestUtils.waitForCondition(
() -> {
try {
final ReadOnlyWindowStore<K, V> store =
kafkaStreams.store(STORE_NAME, QueryableStoreTypes.windowStore());
try (final KeyValueIterator<Windowed<K>, V> all = store.all()) {
final List<KeyValue<Windowed<K>, V>> storeContent = new LinkedList<>();
while (all.hasNext()) {
storeContent.add(all.next());
}
return storeContent.equals(expectedStoreContent);
}
return storeContent.equals(expectedStoreContent);
} catch (final Exception swallow) {
swallow.printStackTrace();
System.err.println(swallow.getMessage());
return false;
}
} catch (final Exception swallow) {
swallow.printStackTrace();
System.err.println(swallow.getMessage());
return false;
}
}, "Could not get expected result in time.");
},
60_000L,
"Could not get expected result in time.");
}

private <K> void verifyWindowedCountWithSurrogateTimestamp(final Windowed<K> key,
final long value) throws Exception {
TestUtils.waitForCondition(() -> {
try {
final ReadOnlyWindowStore<K, ValueAndTimestamp<Long>> store =
kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedWindowStore());
final ValueAndTimestamp<Long> count = store.fetch(key.key(), key.window().start());
return count.value() == value && count.timestamp() == -1L;
} catch (final Exception swallow) {
swallow.printStackTrace();
System.err.println(swallow.getMessage());
return false;
}
}, "Could not get expected result in time.");
TestUtils.waitForCondition(
() -> {
try {
final ReadOnlyWindowStore<K, ValueAndTimestamp<Long>> store =
kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedWindowStore());
final ValueAndTimestamp<Long> count = store.fetch(key.key(), key.window().start());
return count.value() == value && count.timestamp() == -1L;
} catch (final Exception swallow) {
swallow.printStackTrace();
System.err.println(swallow.getMessage());
return false;
}
},
60_000L,
"Could not get expected result in time.");
}

private <K> void verifyWindowedCountWithTimestamp(final Windowed<K> key,
final long value,
final long timestamp) throws Exception {
TestUtils.waitForCondition(() -> {
try {
final ReadOnlyWindowStore<K, ValueAndTimestamp<Long>> store =
kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedWindowStore());
final ValueAndTimestamp<Long> count = store.fetch(key.key(), key.window().start());
return count.value() == value && count.timestamp() == timestamp;
} catch (final Exception swallow) {
swallow.printStackTrace();
System.err.println(swallow.getMessage());
return false;
}
}, "Could not get expected result in time.");
TestUtils.waitForCondition(
() -> {
try {
final ReadOnlyWindowStore<K, ValueAndTimestamp<Long>> store =
kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedWindowStore());
final ValueAndTimestamp<Long> count = store.fetch(key.key(), key.window().start());
return count.value() == value && count.timestamp() == timestamp;
} catch (final Exception swallow) {
swallow.printStackTrace();
System.err.println(swallow.getMessage());
return false;
}
},
60_000L,
"Could not get expected result in time.");
}

private <K, V> void processKeyValueAndVerifyWindowedCountWithTimestamp(final K key,
Expand All @@ -854,23 +878,26 @@ private <K, V> void processKeyValueAndVerifyWindowedCountWithTimestamp(final K k
IntegerSerializer.class),
timestamp);

TestUtils.waitForCondition(() -> {
try {
final ReadOnlyWindowStore<K, ValueAndTimestamp<V>> store =
kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedWindowStore());
try (final KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>> all = store.all()) {
final List<KeyValue<Windowed<K>, ValueAndTimestamp<V>>> storeContent = new LinkedList<>();
while (all.hasNext()) {
storeContent.add(all.next());
TestUtils.waitForCondition(
() -> {
try {
final ReadOnlyWindowStore<K, ValueAndTimestamp<V>> store =
kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedWindowStore());
try (final KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>> all = store.all()) {
final List<KeyValue<Windowed<K>, ValueAndTimestamp<V>>> storeContent = new LinkedList<>();
while (all.hasNext()) {
storeContent.add(all.next());
}
return storeContent.equals(expectedStoreContent);
}
return storeContent.equals(expectedStoreContent);
} catch (final Exception swallow) {
swallow.printStackTrace();
System.err.println(swallow.getMessage());
return false;
}
} catch (final Exception swallow) {
swallow.printStackTrace();
System.err.println(swallow.getMessage());
return false;
}
}, "Could not get expected result in time.");
},
60_000L,
"Could not get expected result in time.");
}

private static class KeyValueProcessor implements Processor<Integer, Integer> {
Expand Down

0 comments on commit 06e246d

Please sign in to comment.