Skip to content

Commit

Permalink
Revert "fix: deadlock when closing transient push query (confluentinc…
Browse files Browse the repository at this point in the history
…#4297)"

This reverts commit ac8fb63
  • Loading branch information
big-andy-coates committed Jan 14, 2020
1 parent cb577e1 commit 2033e91
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 366 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public TransientQueryMetadata buildTransientQuery(
final LogicalSchema schema,
final OptionalInt limit
) {
final BlockingRowQueue queue = buildTransientQueryQueue(queryId, physicalPlan, limit);
final TransientQueryQueue queue = buildTransientQueryQueue(queryId, physicalPlan, limit);
final String transientQueryPrefix =
ksqlConfig.getString(KsqlConfig.KSQL_TRANSIENT_QUERY_NAME_PREFIX_CONFIG);
final String applicationId = addTimeSuffix(getQueryApplicationId(
Expand All @@ -171,8 +171,9 @@ public TransientQueryMetadata buildTransientQuery(
streams,
schema,
sources,
queue::setLimitHandler,
planSummary,
queue,
queue.getQueue(),
applicationId,
streamsBuilder.build(),
streamsProperties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,12 @@

package io.confluent.ksql.query;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.util.KsqlException;
import java.util.Collection;
import java.util.Objects;
import java.util.OptionalInt;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
Expand All @@ -32,62 +29,41 @@
/**
* A queue of rows for transient queries.
*/
class TransientQueryQueue implements BlockingRowQueue {
class TransientQueryQueue {

private final LimitQueueCallback callback;
private final BlockingQueue<KeyValue<String, GenericRow>> rowQueue;
private final int offerTimeoutMs;
private volatile boolean closed = false;
private final BlockingQueue<KeyValue<String, GenericRow>> rowQueue =
new LinkedBlockingQueue<>(100);

TransientQueryQueue(final KStream<?, GenericRow> kstream, final OptionalInt limit) {
this(kstream, limit, 100, 100);
}

@VisibleForTesting
TransientQueryQueue(
final KStream<?, GenericRow> kstream,
final OptionalInt limit,
final int queueSizeLimit,
final int offerTimeoutMs
) {
this.callback = limit.isPresent()
? new LimitedQueueCallback(limit.getAsInt())
: new UnlimitedQueueCallback();
this.rowQueue = new LinkedBlockingQueue<>(queueSizeLimit);
this.offerTimeoutMs = offerTimeoutMs;

kstream.foreach(new QueuePopulator<>());
}

@Override
public void setLimitHandler(final LimitHandler limitHandler) {
callback.setLimitHandler(limitHandler);
kstream.foreach(new TransientQueryQueue.QueuePopulator<>(rowQueue, callback));
}

@Override
public KeyValue<String, GenericRow> poll(final long timeout, final TimeUnit unit)
throws InterruptedException {
return rowQueue.poll(timeout, unit);
BlockingQueue<KeyValue<String, GenericRow>> getQueue() {
return rowQueue;
}

@Override
public void drainTo(final Collection<? super KeyValue<String, GenericRow>> collection) {
rowQueue.drainTo(collection);
void setLimitHandler(final LimitHandler limitHandler) {
callback.setLimitHandler(limitHandler);
}

@Override
public int size() {
return rowQueue.size();
}
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
static final class QueuePopulator<K> implements ForeachAction<K, GenericRow> {

@Override
public void close() {
closed = true;
}
private final BlockingQueue<KeyValue<String, GenericRow>> queue;
private final QueueCallback callback;

@VisibleForTesting
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
final class QueuePopulator<K> implements ForeachAction<K, GenericRow> {
QueuePopulator(
final BlockingQueue<KeyValue<String, GenericRow>> queue,
final QueueCallback callback
) {
this.queue = Objects.requireNonNull(queue, "queue");
this.callback = Objects.requireNonNull(callback, "callback");
}

@Override
public void apply(final K key, final GenericRow row) {
Expand All @@ -100,22 +76,18 @@ public void apply(final K key, final GenericRow row) {
return;
}

final KeyValue<String, GenericRow> kv = new KeyValue<>(getStringKey(key), row);
final String keyString = getStringKey(key);
queue.put(new KeyValue<>(keyString, row));

while (!closed) {
if (rowQueue.offer(kv, offerTimeoutMs, TimeUnit.MILLISECONDS)) {
callback.onQueued();
break;
}
}
} catch (final InterruptedException e) {
callback.onQueued();
} catch (final InterruptedException exception) {
throw new KsqlException("InterruptedException while enqueueing:" + key);
}
}

private String getStringKey(final K key) {
if (key instanceof Windowed) {
final Windowed<?> windowedKey = (Windowed<?>) key;
final Windowed windowedKey = (Windowed) key;
return String.format("%s : %s", windowedKey.key(), windowedKey.window());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,38 @@

package io.confluent.ksql.util;

import io.confluent.ksql.GenericRow;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.query.BlockingRowQueue;
import io.confluent.ksql.query.LimitHandler;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.Topology;

/**
* Metadata of a transient query, e.g. {@code SELECT * FROM FOO;}.
*/
public class TransientQueryMetadata extends QueryMetadata {

private final BlockingRowQueue rowQueue;
private final BlockingQueue<KeyValue<String, GenericRow>> rowQueue;
private final AtomicBoolean isRunning = new AtomicBoolean(true);
private final Consumer<LimitHandler> limitHandlerSetter;

// CHECKSTYLE_RULES.OFF: ParameterNumberCheck
public TransientQueryMetadata(
final String statementString,
final KafkaStreams kafkaStreams,
final LogicalSchema logicalSchema,
final Set<SourceName> sourceNames,
final Consumer<LimitHandler> limitHandlerSetter,
final String executionPlan,
final BlockingRowQueue rowQueue,
final BlockingQueue<KeyValue<String, GenericRow>> rowQueue,
final String queryApplicationId,
final Topology topology,
final Map<String, Object> streamsProperties,
Expand All @@ -59,16 +63,17 @@ public TransientQueryMetadata(
topology,
streamsProperties,
overriddenProperties,
closeCallback);

closeCallback
);
this.limitHandlerSetter = Objects.requireNonNull(limitHandlerSetter, "limitHandlerSetter");
this.rowQueue = Objects.requireNonNull(rowQueue, "rowQueue");
}

public boolean isRunning() {
return isRunning.get();
}

public BlockingRowQueue getRowQueue() {
public BlockingQueue<KeyValue<String, GenericRow>> getRowQueue() {
return rowQueue;
}

Expand All @@ -89,16 +94,11 @@ public int hashCode() {
}

public void setLimitHandler(final LimitHandler limitHandler) {
rowQueue.setLimitHandler(limitHandler);
limitHandlerSetter.accept(limitHandler);
}

@Override
public void close() {
// To avoid deadlock, close the queue first to ensure producer side isn't blocked trying to
// write to the blocking queue, otherwise super.close call can deadlock:
rowQueue.close();

// Now safe to close:
super.close();
isRunning.set(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.query.BlockingRowQueue;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.util.KsqlConstants;
Expand All @@ -45,6 +44,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -216,7 +216,7 @@ public void shouldSelectAllFromDerivedStream() throws Exception {
"SELECT * from pageviews_female EMIT CHANGES;");

final List<KeyValue<String, GenericRow>> results = new ArrayList<>();
final BlockingRowQueue rowQueue = queryMetadata.getRowQueue();
final BlockingQueue<KeyValue<String, GenericRow>> rowQueue = queryMetadata.getRowQueue();

// From the mock data, we expect exactly 3 page views from female users.
final List<String> expectedPages = ImmutableList.of("PAGE_2", "PAGE_5", "PAGE_5");
Expand Down Expand Up @@ -402,7 +402,7 @@ private static List<GenericRow> verifyAvailableRows(
final TransientQueryMetadata queryMetadata,
final int expectedRows
) throws Exception {
final BlockingRowQueue rowQueue = queryMetadata.getRowQueue();
final BlockingQueue<KeyValue<String, GenericRow>> rowQueue = queryMetadata.getRowQueue();

TestUtils.waitForCondition(
() -> rowQueue.size() >= expectedRows,
Expand Down
Loading

0 comments on commit 2033e91

Please sign in to comment.