Skip to content

Commit

Permalink
fix: deadlock when closing transient push query (#4297)
Browse files Browse the repository at this point in the history
fixes: #4296

The produce side not calls `offer` in a loop, with a short timeout, to try and put the row into the blocking queue. When the consume side closes the query, e.g. on an `EOFException` if the user has closed the connection, the query first closes the queue; setting a flag the producers are checking on each loop; causing any producers to exit the loop. Then it can safely close the KS topology.

(cherry picked from commit 6b5ce0c)
  • Loading branch information
big-andy-coates committed Jan 14, 2020
1 parent 4dcab06 commit ac8fb63
Show file tree
Hide file tree
Showing 10 changed files with 366 additions and 83 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.query;

import io.confluent.ksql.GenericRow;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.streams.KeyValue;

/**
* The queue between the Kafka-streams topology and the client connection.
*
* <p>The KS topology writes to the queue from its {@code StreamThread}, while the KSQL server
* thread that is servicing the client request reads from the queue and writes to the client
* socket.
*/
public interface BlockingRowQueue {

/**
* Sets the limit handler that will be called when any row limit is reached.
*
* <p>Replaces any previous handler.
*
* @param limitHandler the handler.
*/
void setLimitHandler(LimitHandler limitHandler);

/**
* Poll the queue for a single row
*
* @see BlockingQueue#poll(long, TimeUnit)
*/
KeyValue<String, GenericRow> poll(long timeout, TimeUnit unit)
throws InterruptedException;

/**
* Drain the queue to the supplied {@code collection}.
*
* @see BlockingQueue#drainTo(Collection)
*/
void drainTo(Collection<? super KeyValue<String, GenericRow>> collection);

/**
* The size of the queue.
*
* @see BlockingQueue#size()
*/
int size();

/**
* Close the queue.
*/
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public TransientQueryMetadata buildTransientQuery(
final LogicalSchema schema,
final OptionalInt limit
) {
final TransientQueryQueue queue = buildTransientQueryQueue(queryId, physicalPlan, limit);
final BlockingRowQueue queue = buildTransientQueryQueue(queryId, physicalPlan, limit);
final String transientQueryPrefix =
ksqlConfig.getString(KsqlConfig.KSQL_TRANSIENT_QUERY_NAME_PREFIX_CONFIG);
final String applicationId = addTimeSuffix(getQueryApplicationId(
Expand All @@ -171,9 +171,8 @@ public TransientQueryMetadata buildTransientQuery(
streams,
schema,
sources,
queue::setLimitHandler,
planSummary,
queue.getQueue(),
queue,
applicationId,
streamsBuilder.build(),
streamsProperties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@

package io.confluent.ksql.query;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.util.KsqlException;
import java.util.Collection;
import java.util.Objects;
import java.util.OptionalInt;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
Expand All @@ -29,41 +32,62 @@
/**
* A queue of rows for transient queries.
*/
class TransientQueryQueue {
class TransientQueryQueue implements BlockingRowQueue {

private final LimitQueueCallback callback;
private final BlockingQueue<KeyValue<String, GenericRow>> rowQueue =
new LinkedBlockingQueue<>(100);
private final BlockingQueue<KeyValue<String, GenericRow>> rowQueue;
private final int offerTimeoutMs;
private volatile boolean closed = false;

TransientQueryQueue(final KStream<?, GenericRow> kstream, final OptionalInt limit) {
this(kstream, limit, 100, 100);
}

@VisibleForTesting
TransientQueryQueue(
final KStream<?, GenericRow> kstream,
final OptionalInt limit,
final int queueSizeLimit,
final int offerTimeoutMs
) {
this.callback = limit.isPresent()
? new LimitedQueueCallback(limit.getAsInt())
: new UnlimitedQueueCallback();
this.rowQueue = new LinkedBlockingQueue<>(queueSizeLimit);
this.offerTimeoutMs = offerTimeoutMs;

kstream.foreach(new TransientQueryQueue.QueuePopulator<>(rowQueue, callback));
kstream.foreach(new QueuePopulator<>());
}

BlockingQueue<KeyValue<String, GenericRow>> getQueue() {
return rowQueue;
@Override
public void setLimitHandler(final LimitHandler limitHandler) {
callback.setLimitHandler(limitHandler);
}

void setLimitHandler(final LimitHandler limitHandler) {
callback.setLimitHandler(limitHandler);
@Override
public KeyValue<String, GenericRow> poll(final long timeout, final TimeUnit unit)
throws InterruptedException {
return rowQueue.poll(timeout, unit);
}

@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
static final class QueuePopulator<K> implements ForeachAction<K, GenericRow> {
@Override
public void drainTo(final Collection<? super KeyValue<String, GenericRow>> collection) {
rowQueue.drainTo(collection);
}

private final BlockingQueue<KeyValue<String, GenericRow>> queue;
private final QueueCallback callback;
@Override
public int size() {
return rowQueue.size();
}

QueuePopulator(
final BlockingQueue<KeyValue<String, GenericRow>> queue,
final QueueCallback callback
) {
this.queue = Objects.requireNonNull(queue, "queue");
this.callback = Objects.requireNonNull(callback, "callback");
}
@Override
public void close() {
closed = true;
}

@VisibleForTesting
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
final class QueuePopulator<K> implements ForeachAction<K, GenericRow> {

@Override
public void apply(final K key, final GenericRow row) {
Expand All @@ -76,18 +100,22 @@ public void apply(final K key, final GenericRow row) {
return;
}

final String keyString = getStringKey(key);
queue.put(new KeyValue<>(keyString, row));
final KeyValue<String, GenericRow> kv = new KeyValue<>(getStringKey(key), row);

callback.onQueued();
} catch (final InterruptedException exception) {
while (!closed) {
if (rowQueue.offer(kv, offerTimeoutMs, TimeUnit.MILLISECONDS)) {
callback.onQueued();
break;
}
}
} catch (final InterruptedException e) {
throw new KsqlException("InterruptedException while enqueueing:" + key);
}
}

private String getStringKey(final K key) {
if (key instanceof Windowed) {
final Windowed windowedKey = (Windowed) key;
final Windowed<?> windowedKey = (Windowed<?>) key;
return String.format("%s : %s", windowedKey.key(), windowedKey.window());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,38 +15,34 @@

package io.confluent.ksql.util;

import io.confluent.ksql.GenericRow;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.query.BlockingRowQueue;
import io.confluent.ksql.query.LimitHandler;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.Topology;

/**
* Metadata of a transient query, e.g. {@code SELECT * FROM FOO;}.
*/
public class TransientQueryMetadata extends QueryMetadata {

private final BlockingQueue<KeyValue<String, GenericRow>> rowQueue;
private final BlockingRowQueue rowQueue;
private final AtomicBoolean isRunning = new AtomicBoolean(true);
private final Consumer<LimitHandler> limitHandlerSetter;

// CHECKSTYLE_RULES.OFF: ParameterNumberCheck
public TransientQueryMetadata(
final String statementString,
final KafkaStreams kafkaStreams,
final LogicalSchema logicalSchema,
final Set<SourceName> sourceNames,
final Consumer<LimitHandler> limitHandlerSetter,
final String executionPlan,
final BlockingQueue<KeyValue<String, GenericRow>> rowQueue,
final BlockingRowQueue rowQueue,
final String queryApplicationId,
final Topology topology,
final Map<String, Object> streamsProperties,
Expand All @@ -63,17 +59,16 @@ public TransientQueryMetadata(
topology,
streamsProperties,
overriddenProperties,
closeCallback
);
this.limitHandlerSetter = Objects.requireNonNull(limitHandlerSetter, "limitHandlerSetter");
closeCallback);

this.rowQueue = Objects.requireNonNull(rowQueue, "rowQueue");
}

public boolean isRunning() {
return isRunning.get();
}

public BlockingQueue<KeyValue<String, GenericRow>> getRowQueue() {
public BlockingRowQueue getRowQueue() {
return rowQueue;
}

Expand All @@ -94,11 +89,16 @@ public int hashCode() {
}

public void setLimitHandler(final LimitHandler limitHandler) {
limitHandlerSetter.accept(limitHandler);
rowQueue.setLimitHandler(limitHandler);
}

@Override
public void close() {
// To avoid deadlock, close the queue first to ensure producer side isn't blocked trying to
// write to the blocking queue, otherwise super.close call can deadlock:
rowQueue.close();

// Now safe to close:
super.close();
isRunning.set(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.query.BlockingRowQueue;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.util.KsqlConstants;
Expand All @@ -44,7 +45,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -216,7 +216,7 @@ public void shouldSelectAllFromDerivedStream() throws Exception {
"SELECT * from pageviews_female EMIT CHANGES;");

final List<KeyValue<String, GenericRow>> results = new ArrayList<>();
final BlockingQueue<KeyValue<String, GenericRow>> rowQueue = queryMetadata.getRowQueue();
final BlockingRowQueue rowQueue = queryMetadata.getRowQueue();

// From the mock data, we expect exactly 3 page views from female users.
final List<String> expectedPages = ImmutableList.of("PAGE_2", "PAGE_5", "PAGE_5");
Expand Down Expand Up @@ -402,7 +402,7 @@ private static List<GenericRow> verifyAvailableRows(
final TransientQueryMetadata queryMetadata,
final int expectedRows
) throws Exception {
final BlockingQueue<KeyValue<String, GenericRow>> rowQueue = queryMetadata.getRowQueue();
final BlockingRowQueue rowQueue = queryMetadata.getRowQueue();

TestUtils.waitForCondition(
() -> rowQueue.size() >= expectedRows,
Expand Down
Loading

0 comments on commit ac8fb63

Please sign in to comment.