Skip to content

Commit

Permalink
fix: Don't wait for streams thread to be in running state (#4908)
Browse files Browse the repository at this point in the history
* don't wait with standby
  • Loading branch information
vpapavas authored and spena committed Mar 27, 2020
1 parent 17e2355 commit 2f83119
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 90 deletions.
12 changes: 0 additions & 12 deletions ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -198,12 +198,6 @@ public class KsqlConfig extends AbstractConfig {
+ "or set in the CLI. It's only enabled when lag.reporting.enable is true. "
+ "By default, any amount of lag is is allowed.";

public static final String KSQL_QUERY_PULL_STREAMSTORE_REBALANCING_TIMEOUT_MS_CONFIG =
"ksql.query.pull.streamsstore.rebalancing.timeout.ms";
public static final Long KSQL_QUERY_PULL_STREAMSTORE_REBALANCING_TIMEOUT_MS_DEFAULT = 10000L;
public static final String KSQL_QUERY_PULL_STREAMSTORE_REBALANCING_TIMEOUT_MS_DOC = "Timeout in "
+ "milliseconds when waiting for rebalancing of the stream store during a pull query";

public static final String KSQL_QUERY_PULL_METRICS_ENABLED =
"ksql.query.pull.metrics.enabled";
public static final String KSQL_QUERY_PULL_METRICS_ENABLED_DOC =
Expand Down Expand Up @@ -572,12 +566,6 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
zeroOrPositive(),
Importance.MEDIUM,
KSQL_QUERY_PULL_MAX_ALLOWED_OFFSET_LAG_DOC
).define(
KSQL_QUERY_PULL_STREAMSTORE_REBALANCING_TIMEOUT_MS_CONFIG,
ConfigDef.Type.LONG,
KSQL_QUERY_PULL_STREAMSTORE_REBALANCING_TIMEOUT_MS_DEFAULT,
Importance.LOW,
KSQL_QUERY_PULL_STREAMSTORE_REBALANCING_TIMEOUT_MS_DOC
).define(
KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG,
Type.STRING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@

import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.execution.streams.materialization.MaterializationException;
import io.confluent.ksql.execution.streams.materialization.MaterializationTimeOutException;
import io.confluent.ksql.execution.streams.materialization.NotRunningException;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.util.KsqlConfig;
import java.util.function.Supplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.StoreQueryParameters;
Expand All @@ -38,39 +36,25 @@ class KsStateStore {
private final KafkaStreams kafkaStreams;
private final LogicalSchema schema;
private final KsqlConfig ksqlConfig;
private final Supplier<Long> clock;

KsStateStore(
final String stateStoreName,
final KafkaStreams kafkaStreams,
final LogicalSchema schema,
final KsqlConfig ksqlConfig
) {
this(stateStoreName, kafkaStreams, schema, ksqlConfig, System::currentTimeMillis);
}

@VisibleForTesting
KsStateStore(
final String stateStoreName,
final KafkaStreams kafkaStreams,
final LogicalSchema schema,
final KsqlConfig ksqlConfig,
final Supplier<Long> clock
final KsqlConfig ksqlConfig
) {
this.kafkaStreams = requireNonNull(kafkaStreams, "kafkaStreams");
this.stateStoreName = requireNonNull(stateStoreName, "stateStoreName");
this.schema = requireNonNull(schema, "schema");
this.ksqlConfig = requireNonNull(ksqlConfig, "ksqlConfig");
this.clock = requireNonNull(clock, "clock");
}

LogicalSchema schema() {
return schema;
}

<T> T store(final QueryableStoreType<T> queryableStoreType) {
awaitRunning();

try {
if (ksqlConfig.getBoolean(KsqlConfig.KSQL_QUERY_PULL_ENABLE_STANDBY_READS)) {
// True flag allows queries on standby and replica state stores
Expand All @@ -91,19 +75,4 @@ <T> T store(final QueryableStoreType<T> queryableStoreType) {
throw new MaterializationException("State store currently unavailable: " + stateStoreName, e);
}
}

private void awaitRunning() {
final long timeoutMs =
ksqlConfig.getLong(KsqlConfig.KSQL_QUERY_PULL_STREAMSTORE_REBALANCING_TIMEOUT_MS_CONFIG);
final long threshold = clock.get() + timeoutMs;
while (kafkaStreams.state() == State.REBALANCING) {
if (clock.get() > threshold) {
throw new MaterializationTimeOutException("Store failed to rebalance within the configured "
+ "timeout. timeout: " + timeoutMs + "ms, config: "
+ KsqlConfig.KSQL_QUERY_PULL_STREAMSTORE_REBALANCING_TIMEOUT_MS_CONFIG);
}

Thread.yield();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,19 @@
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.common.testing.NullPointerTester;
import com.google.common.testing.NullPointerTester.Visibility;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.execution.streams.materialization.MaterializationException;
import io.confluent.ksql.execution.streams.materialization.MaterializationTimeOutException;
import io.confluent.ksql.execution.streams.materialization.NotRunningException;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import io.confluent.ksql.util.KsqlConfig;
import java.util.function.Supplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.StoreQueryParameters;
Expand All @@ -47,7 +44,6 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.mockito.InOrder;
import org.mockito.Mock;
Expand All @@ -59,35 +55,25 @@
public class KsStateStoreTest {

private static final String STORE_NAME = "someStore";
private static final Long TIMEOUT_MS = 10L;
private static final LogicalSchema SCHEMA = LogicalSchema.builder()
.keyColumn(ColumnName.of("k0"), SqlTypes.STRING)
.keyColumn(ColumnName.of("v0"), SqlTypes.BIGINT)
.build();

@Rule
public final Timeout timeout = Timeout.seconds(10);

@Rule
public final ExpectedException expectedException = ExpectedException.none();

@Mock
private KafkaStreams kafkaStreams;
@Mock
private Supplier<Long> clock;
@Mock
private KsqlConfig ksqlConfig;

private KsStateStore store;

@Before
public void setUp() {
store = new KsStateStore(STORE_NAME, kafkaStreams, SCHEMA, ksqlConfig, clock);

when(clock.get()).thenReturn(0L);
store = new KsStateStore(STORE_NAME, kafkaStreams, SCHEMA, ksqlConfig);
when(kafkaStreams.state()).thenReturn(State.RUNNING);
when(ksqlConfig.getLong(KsqlConfig.KSQL_QUERY_PULL_STREAMSTORE_REBALANCING_TIMEOUT_MS_CONFIG))
.thenReturn(TIMEOUT_MS);
}

@SuppressWarnings("UnstableApiUsage")
Expand All @@ -96,43 +82,21 @@ public void shouldThrowNPEs() {
new NullPointerTester()
.setDefault(KafkaStreams.class, kafkaStreams)
.setDefault(LogicalSchema.class, SCHEMA)
.setDefault(Supplier.class, clock)
.setDefault(KsqlConfig.class, ksqlConfig)
.testConstructors(KsStateStore.class, Visibility.PACKAGE);
}

@Test
public void shouldAwaitRunning() {
public void shouldNotAwaitRunning() {
// Given:
when(kafkaStreams.state())
.thenReturn(State.REBALANCING)
.thenReturn(State.REBALANCING)
.thenReturn(State.RUNNING);

final QueryableStoreType<ReadOnlySessionStore<String, Long>> storeType =
QueryableStoreTypes.sessionStore();

// When:

store.store(storeType);

// Then:
verify(kafkaStreams, atLeast(3)).state();
}

@Test
public void shouldThrowIfDoesNotFinishRebalanceBeforeTimeout() {
// Given:
when(kafkaStreams.state()).thenReturn(State.REBALANCING);
when(clock.get()).thenReturn(0L, 5L, TIMEOUT_MS + 1);

// When:
expectedException.expect(MaterializationTimeOutException.class);
expectedException.expectMessage(
"Store failed to rebalance within the configured timeout. timeout: 10ms");

// When:
store.store(QueryableStoreTypes.sessionStore());
verify(kafkaStreams, never()).state();
}

@Test
Expand All @@ -144,24 +108,20 @@ public void shouldThrowIfNotRunningAfterFailedToGetStore() {
when(kafkaStreams.store(any())).thenThrow(new IllegalStateException());

// When:
expectedException.expect(NotRunningException.class);
expectedException.expectMessage("The query was not in a running state. state: NOT_RUNNING");
expectedException.expect(MaterializationException.class);
expectedException.expectMessage("State store currently unavailable: someStore");

// When:
store.store(QueryableStoreTypes.sessionStore());
}

@Test
public void shouldGetStoreOnceRunning() {
// Given:
when(kafkaStreams.state()).thenReturn(State.RUNNING);

// When:
store.store(QueryableStoreTypes.<String, Long>sessionStore());

// Then:
final InOrder inOrder = Mockito.inOrder(kafkaStreams);
inOrder.verify(kafkaStreams, atLeast(1)).state();
inOrder.verify(kafkaStreams).store(any());
}

Expand Down

0 comments on commit 2f83119

Please sign in to comment.