Skip to content

Commit

Permalink
feat: some robustness improvements for Connect integration
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra committed Aug 19, 2019
1 parent a79adb4 commit 696dec3
Show file tree
Hide file tree
Showing 13 changed files with 192 additions and 46 deletions.
5 changes: 5 additions & 0 deletions ksql-engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@
<artifactId>guava</artifactId>
</dependency>

<dependency>
<groupId>com.github.rholder</groupId>
<artifactId>guava-retrying</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>slice</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ private void handleConnector(final String name) {
try {
final ConnectResponse<ConnectorInfo> describe = connectClient.describe(name);
if (!describe.datum().isPresent()) {
describe.error()
.ifPresent(error -> LOG.warn("Failed to describe connect {} due to: {}", name, error));
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@

package io.confluent.ksql.connect;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.AbstractScheduledService;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.execution.expression.tree.Literal;
import io.confluent.ksql.execution.expression.tree.QualifiedName;
Expand All @@ -32,12 +33,15 @@
import io.confluent.ksql.properties.with.CommonCreateConfigs;
import io.confluent.ksql.properties.with.CreateConfigs;
import io.confluent.ksql.util.KsqlConstants;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -53,23 +57,41 @@
* registry.</p>
*/
@ThreadSafe
final class ConnectPollingService extends AbstractScheduledService {
final class ConnectPollingService extends AbstractExecutionThreadService {

private static final Logger LOG = LoggerFactory.getLogger(ConnectPollingService.class);
private static final int INTERVAL_S = 30;
private static final Connector STOP_SENTINEL =
new Connector("_stop_", ignored -> false, Function.identity(), DataSourceType.KSTREAM, "");

private final KsqlExecutionContext executionContext;
private final Consumer<CreateSource> sourceCallback;
private final int pollIntervalSecs;

// we use a blocking queue as a thread safe buffer between this class
// and others that may be calling #addConnector(Connector) - this also
// allows us to notify when a connector was added.
private BlockingQueue<Connector> connectorQueue;
private Set<Connector> connectors;

ConnectPollingService(
final KsqlExecutionContext executionContext,
final Consumer<CreateSource> sourceCallback
) {
this(executionContext, sourceCallback, INTERVAL_S);
}

@VisibleForTesting
ConnectPollingService(
final KsqlExecutionContext executionContext,
final Consumer<CreateSource> sourceCallback,
final int pollIntervalSecs
) {
this.executionContext = Objects.requireNonNull(executionContext, "executionContext");
this.sourceCallback = Objects.requireNonNull(sourceCallback, "sourceCallback");
this.connectors = ConcurrentHashMap.newKeySet();
this.connectors = new HashSet<>();
this.connectorQueue = new LinkedBlockingDeque<>();
this.pollIntervalSecs = pollIntervalSecs;
}

/**
Expand All @@ -81,13 +103,33 @@ final class ConnectPollingService extends AbstractScheduledService {
* @param connector a connector to register
*/
void addConnector(final Connector connector) {
connectors.add(connector);
connectorQueue.add(connector);
}

@Override
protected void runOneIteration() {
protected void run() throws Exception {
while (isRunning()) {
final Connector connector = connectorQueue.poll(pollIntervalSecs, TimeUnit.SECONDS);
if (connector == STOP_SENTINEL || connectorQueue.removeIf(c -> c == STOP_SENTINEL)) {
return;
} else if (connector != null) {
connectors.add(connector);
}

drainQueue();
runOneIteration();
}
}

@VisibleForTesting
void drainQueue() {
connectorQueue.drainTo(connectors);
}

@VisibleForTesting
void runOneIteration() {
// avoid making external calls if unnecessary
if (connectors.isEmpty()) {
// avoid making external calls if unnecessary
return;
}

Expand Down Expand Up @@ -116,6 +158,13 @@ protected void runOneIteration() {
}
}

@Override
protected void triggerShutdown() {
// add the sentinel to the queue so that any blocking operation
// gets resolved
connectorQueue.add(STOP_SENTINEL);
}

private void handleTopic(
final String topic,
final Set<String> subjects,
Expand Down Expand Up @@ -151,8 +200,4 @@ private void handleTopic(
}
}

@Override
protected Scheduler scheduler() {
return Scheduler.newFixedRateSchedule(0, INTERVAL_S, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,21 +66,23 @@ public interface ConnectClient {
class ConnectResponse<T> {
private final Optional<T> datum;
private final Optional<String> error;
private final int httpCode;

public static <T> ConnectResponse<T> of(final T datum) {
return new ConnectResponse<>(datum, null);
public static <T> ConnectResponse<T> of(final T datum, final int code) {
return new ConnectResponse<>(datum, null, code);
}

public static <T> ConnectResponse<T> of(final String error) {
return new ConnectResponse<>(null, error);
public static <T> ConnectResponse<T> of(final String error, final int code) {
return new ConnectResponse<>(null, error, code);
}

private ConnectResponse(final T datum, final String error) {
private ConnectResponse(final T datum, final String error, final int code) {
KsqlPreconditions.checkArgument(
datum != null ^ error != null,
"expected exactly one of datum or error to be null");
this.datum = Optional.ofNullable(datum);
this.error = Optional.ofNullable(error);
this.httpCode = code;
}

public Optional<T> datum() {
Expand All @@ -90,6 +92,10 @@ public Optional<T> datum() {
public Optional<String> error() {
return error;
}

public int httpCode() {
return httpCode;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
package io.confluent.ksql.services;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.json.JsonMapper;
import io.confluent.ksql.util.KsqlException;
Expand All @@ -25,6 +29,8 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import org.apache.http.HttpStatus;
import org.apache.http.client.ResponseHandler;
Expand All @@ -49,6 +55,7 @@ public class DefaultConnectClient implements ConnectClient {
private static final String CONNECTORS = "/connectors";
private static final String STATUS = "/status";
private static final int DEFAULT_TIMEOUT_MS = 5_000;
private static final int MAX_ATTEMPTS = 3;

private final URI connectUri;

Expand All @@ -74,7 +81,7 @@ public ConnectResponse<ConnectorInfo> create(
connector,
config);

final ConnectResponse<ConnectorInfo> connectResponse = Request
final ConnectResponse<ConnectorInfo> connectResponse = withRetries(() -> Request
.Post(connectUri.resolve(CONNECTORS))
.socketTimeout(DEFAULT_TIMEOUT_MS)
.connectTimeout(DEFAULT_TIMEOUT_MS)
Expand All @@ -87,7 +94,7 @@ public ConnectResponse<ConnectorInfo> create(
)
.execute()
.handleResponse(
createHandler(HttpStatus.SC_CREATED, ConnectorInfo.class, Function.identity()));
createHandler(HttpStatus.SC_CREATED, ConnectorInfo.class, Function.identity())));

connectResponse.error()
.ifPresent(error -> LOG.warn("Did not CREATE connector {}: {}", connector, error));
Expand All @@ -104,13 +111,13 @@ public ConnectResponse<List<String>> connectors() {
try {
LOG.debug("Issuing request to Kafka Connect at URI {} to list connectors", connectUri);

final ConnectResponse<List<String>> connectResponse = Request
final ConnectResponse<List<String>> connectResponse = withRetries(() -> Request
.Get(connectUri.resolve(CONNECTORS))
.socketTimeout(DEFAULT_TIMEOUT_MS)
.connectTimeout(DEFAULT_TIMEOUT_MS)
.execute()
.handleResponse(
createHandler(HttpStatus.SC_OK, List.class, foo -> (List<String>) foo));
createHandler(HttpStatus.SC_OK, List.class, foo -> (List<String>) foo)));

connectResponse.error()
.ifPresent(error -> LOG.warn("Could not list connectors: {}.", error));
Expand All @@ -128,13 +135,13 @@ public ConnectResponse<ConnectorStateInfo> status(final String connector) {
connectUri,
connector);

final ConnectResponse<ConnectorStateInfo> connectResponse = Request
final ConnectResponse<ConnectorStateInfo> connectResponse = withRetries(() -> Request
.Get(connectUri.resolve(CONNECTORS + "/" + connector + STATUS))
.socketTimeout(DEFAULT_TIMEOUT_MS)
.connectTimeout(DEFAULT_TIMEOUT_MS)
.execute()
.handleResponse(
createHandler(HttpStatus.SC_OK, ConnectorStateInfo.class, Function.identity()));
createHandler(HttpStatus.SC_OK, ConnectorStateInfo.class, Function.identity())));

connectResponse.error()
.ifPresent(error ->
Expand All @@ -152,13 +159,13 @@ public ConnectResponse<ConnectorInfo> describe(final String connector) {
LOG.debug("Issuing request to Kafka Connect at URI {} to get config for {}",
connectUri, connector);

final ConnectResponse<ConnectorInfo> connectResponse = Request
final ConnectResponse<ConnectorInfo> connectResponse = withRetries(() -> Request
.Get(connectUri.resolve(String.format("%s/%s", CONNECTORS, connector)))
.socketTimeout(DEFAULT_TIMEOUT_MS)
.connectTimeout(DEFAULT_TIMEOUT_MS)
.execute()
.handleResponse(
createHandler(HttpStatus.SC_OK, ConnectorInfo.class, Function.identity()));
createHandler(HttpStatus.SC_OK, ConnectorInfo.class, Function.identity())));

connectResponse.error()
.ifPresent(error -> LOG.warn("Could not list connectors: {}.", error));
Expand All @@ -169,22 +176,49 @@ public ConnectResponse<ConnectorInfo> describe(final String connector) {
}
}

@SuppressWarnings("unchecked")
private static <T> ConnectResponse<T> withRetries(final Callable<ConnectResponse<T>> action) {
try {
return RetryerBuilder.<ConnectResponse<T>>newBuilder()
.withStopStrategy(StopStrategies.stopAfterAttempt(MAX_ATTEMPTS))
.withWaitStrategy(WaitStrategies.exponentialWait())
.retryIfResult(
result -> result == null || result.httpCode() >= HttpStatus.SC_INTERNAL_SERVER_ERROR)
.retryIfException()
.build()
.call(action);
} catch (ExecutionException e) {
// this should never happen because we retryIfException()
throw new KsqlServerException("Unexpected exception!", e);
} catch (RetryException e) {
LOG.warn("Failed to query connect cluster after {} attempts.", e.getNumberOfFailedAttempts());
if (e.getLastFailedAttempt().hasResult()) {
return (ConnectResponse<T>) e.getLastFailedAttempt().getResult();
}

// should rarely happen - only if some IOException happens and we didn't
// even get to send the request to the server
throw new KsqlServerException(e.getCause());
}
}

private static <T, C> ResponseHandler<ConnectResponse<T>> createHandler(
final int expectedStatus,
final Class<C> entityClass,
final Function<C, T> cast
) {
return httpResponse -> {
final int code = httpResponse.getStatusLine().getStatusCode();
if (httpResponse.getStatusLine().getStatusCode() != expectedStatus) {
final String entity = EntityUtils.toString(httpResponse.getEntity());
return ConnectResponse.of(entity);
return ConnectResponse.of(entity, code);
}

final T info = cast.apply(MAPPER.readValue(
httpResponse.getEntity().getContent(),
entityClass));

return ConnectResponse.of(info);
return ConnectResponse.of(info, code);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.confluent.ksql.services.ConnectClient.ConnectResponse;
import io.confluent.ksql.util.LimitedProxyBuilder;
import java.util.Map;
import org.apache.http.HttpStatus;

/**
* Supplies {@link ConnectClient}s to use that do not make any
Expand All @@ -32,10 +33,14 @@ private SandboxConnectClient() { }

public static ConnectClient createProxy() {
return LimitedProxyBuilder.forClass(ConnectClient.class)
.swallow("create", methodParams(String.class, Map.class), ConnectResponse.of("sandbox"))
.swallow("describe", methodParams(String.class), ConnectResponse.of("sandbox"))
.swallow("connectors", methodParams(), ConnectResponse.of(ImmutableList.of()))
.swallow("status", methodParams(String.class), ConnectResponse.of("sandbox"))
.swallow("create", methodParams(String.class, Map.class),
ConnectResponse.of("sandbox", HttpStatus.SC_INTERNAL_SERVER_ERROR))
.swallow("describe", methodParams(String.class),
ConnectResponse.of("sandbox", HttpStatus.SC_INTERNAL_SERVER_ERROR))
.swallow("connectors", methodParams(),
ConnectResponse.of(ImmutableList.of(), HttpStatus.SC_OK))
.swallow("status", methodParams(String.class),
ConnectResponse.of("sandbox", HttpStatus.SC_INTERNAL_SERVER_ERROR))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.confluent.ksql.connect;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyFloat;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.verify;
Expand All @@ -30,8 +29,8 @@
import io.confluent.ksql.util.KsqlServerException;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.http.HttpStatus;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand Down Expand Up @@ -234,9 +233,10 @@ private void givenConnectors(final String... names){
ImmutableMap.of(),
ImmutableList.of(),
ConnectorType.SOURCE
)));
), HttpStatus.SC_CREATED));
}
when(connectClient.connectors()).thenReturn(ConnectResponse.of(ImmutableList.copyOf(names)));
when(connectClient.connectors()).thenReturn(ConnectResponse.of(ImmutableList.copyOf(names),
HttpStatus.SC_OK));
}

private OngoingStubbing<?> givenConnectorRecord(OngoingStubbing<?> stubbing) {
Expand Down
Loading

0 comments on commit 696dec3

Please sign in to comment.