Skip to content

Commit

Permalink
feat: add connector status to LIST CONNECTORS (#4077)
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra authored Dec 6, 2019
1 parent b6a20b9 commit 5ff94b6
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
public class ConnectorListTableBuilder implements TableBuilder<ConnectorList> {

private static final List<String> HEADERS = ImmutableList.of(
"Connector Name", "Type", "Class"
"Connector Name", "Type", "Class", "Status"
);


Expand All @@ -38,7 +38,8 @@ public Table buildTable(final ConnectorList entity) {
.map(info -> ImmutableList.of(
info.getName(),
ObjectUtils.defaultIfNull(info.getType(), ConnectorType.UNKNOWN).name(),
ObjectUtils.defaultIfNull(info.getClassName(), ""))))
ObjectUtils.defaultIfNull(info.getClassName(), ""),
ObjectUtils.defaultIfNull(info.getState(), ""))))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -840,8 +840,8 @@ public void shouldPrintConnectorsList() {
"statement",
ImmutableList.of(),
ImmutableList.of(
new SimpleConnectorInfo("foo", ConnectorType.SOURCE, "clazz"),
new SimpleConnectorInfo("bar", null, null)
new SimpleConnectorInfo("foo", ConnectorType.SOURCE, "clazz", "STATUS"),
new SimpleConnectorInfo("bar", null, null, null)
))
));

Expand All @@ -859,18 +859,19 @@ public void shouldPrintConnectorsList() {
+ " \"connectors\" : [ {\n"
+ " \"name\" : \"foo\",\n"
+ " \"type\" : \"source\",\n"
+ " \"className\" : \"clazz\"\n"
+ " \"className\" : \"clazz\",\n"
+ " \"state\" : \"STATUS\"\n"
+ " }, {\n"
+ " \"name\" : \"bar\"\n"
+ " } ]\n"
+ "} ]\n"));
} else {
assertThat(output, is("\n"
+ " Connector Name | Type | Class \n"
+ "----------------------------------\n"
+ " foo | SOURCE | clazz \n"
+ " bar | UNKNOWN | \n"
+ "----------------------------------\n"));
+ " Connector Name | Type | Class | Status \n"
+ "-------------------------------------------\n"
+ " foo | SOURCE | clazz | STATUS \n"
+ " bar | UNKNOWN | | \n"
+ "-------------------------------------------\n"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.connect.runtime.AbstractStatus.State;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo.AbstractState;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;

public final class ListConnectorsExecutor {
Expand All @@ -58,15 +61,18 @@ public static Optional<KsqlEntity> execute(
final List<SimpleConnectorInfo> infos = new ArrayList<>();
final List<KsqlWarning> warnings = new ArrayList<>();
final Scope scope = configuredStatement.getStatement().getScope();

for (final String name : connectors.datum().get()) {
final ConnectResponse<ConnectorInfo> response = connectClient.describe(name);

if (response.datum().filter(i -> inScope(i.type(), scope)).isPresent()) {
infos.add(fromConnectorInfoResponse(name, response)
);
final ConnectResponse<ConnectorStateInfo> status = connectClient.status(name);
infos.add(fromConnectorInfoResponse(name, response, status));
} else if (response.error().isPresent()) {
if (scope == Scope.ALL) {
infos.add(new SimpleConnectorInfo(name, ConnectorType.UNKNOWN, null));
infos.add(new SimpleConnectorInfo(name, ConnectorType.UNKNOWN, null, null));
}

warnings.add(
new KsqlWarning(
String.format(
Expand Down Expand Up @@ -96,16 +102,35 @@ private static boolean inScope(final ConnectorType type, final Scope scope) {
@SuppressWarnings("OptionalGetWithoutIsPresent")
private static SimpleConnectorInfo fromConnectorInfoResponse(
final String name,
final ConnectResponse<ConnectorInfo> response
final ConnectResponse<ConnectorInfo> response,
final ConnectResponse<ConnectorStateInfo> status
) {
if (response.error().isPresent()) {
return new SimpleConnectorInfo(name, null, null);
if (response.error().isPresent() || status.error().isPresent()) {
return new SimpleConnectorInfo(name, null, null, status.datum().get().connector().state());
}

final ConnectorInfo info = response.datum().get();
return new SimpleConnectorInfo(
name,
info.type(),
info.config().get(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
info.config().get(ConnectorConfig.CONNECTOR_CLASS_CONFIG),
summarizeState(status.datum().get())
);
}

private static String summarizeState(final ConnectorStateInfo connectorState) {
if (!connectorState.connector().state().equals(State.RUNNING.name())) {
return connectorState.connector().state();
}

final long numRunningTasks = connectorState.tasks()
.stream()
.map(AbstractState::state)
.filter(State.RUNNING.name()::equals)
.count();

return String.format("RUNNING (%s/%s tasks RUNNING)",
numRunningTasks,
connectorState.tasks().size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ public void shouldListConnectors() {
assertThat(
((ConnectorList) response.getResponse().get(0)).getConnectors().get(0).getName(),
is("mock-connector"));
assertThat(
((ConnectorList) response.getResponse().get(0)).getConnectors().get(0).getState(),
is("RUNNING (1/1 tasks RUNNING)"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
import org.apache.http.HttpStatus;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo.ConnectorState;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo.TaskState;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -58,6 +61,16 @@ public class ListConnectorsExecutorTest {
ConnectorType.SOURCE
);

private static final ConnectorStateInfo STATUS = new ConnectorStateInfo(
"connector",
new ConnectorState("RUNNING", "foo", "bar"),
ImmutableList.of(
new TaskState(0, "RUNNING", "", ""),
new TaskState(1, "FAILED", "", "")
),
ConnectorType.SOURCE
);

@Mock
private KsqlExecutionContext engine;
@Mock
Expand All @@ -70,6 +83,8 @@ public void setUp() {
when(serviceContext.getConnectClient()).thenReturn(connectClient);
when(connectClient.describe("connector"))
.thenReturn(ConnectResponse.success(INFO, HttpStatus.SC_OK));
when(connectClient.status("connector"))
.thenReturn(ConnectResponse.success(STATUS, HttpStatus.SC_OK));
when(connectClient.describe("connector2"))
.thenReturn(ConnectResponse.failure("DANGER WILL ROBINSON.", HttpStatus.SC_NOT_FOUND));
}
Expand Down Expand Up @@ -97,7 +112,7 @@ public void shouldListValidConnector() {
"",
ImmutableList.of(),
ImmutableList.of(
new SimpleConnectorInfo("connector", ConnectorType.SOURCE, CONNECTOR_CLASS)
new SimpleConnectorInfo("connector", ConnectorType.SOURCE, CONNECTOR_CLASS, "RUNNING (1/2 tasks RUNNING)")
)
)));
}
Expand Down Expand Up @@ -153,7 +168,7 @@ public void shouldListInvalidConnectorWithNoInfo() {
ImmutableList.of(
new KsqlWarning("Could not describe connector connector2: DANGER WILL ROBINSON.")),
ImmutableList.of(
new SimpleConnectorInfo("connector2", ConnectorType.UNKNOWN, null)
new SimpleConnectorInfo("connector2", ConnectorType.UNKNOWN, null, null)
)
)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,19 @@ public class SimpleConnectorInfo {
private final String name;
private final ConnectorType type;
private final String className;
private final String state;

@JsonCreator
public SimpleConnectorInfo(
@JsonProperty("name") final String name,
@JsonProperty("type") final ConnectorType type,
@JsonProperty("className") final String className
@JsonProperty("className") final String className,
@JsonProperty("state") final String state
) {
this.name = Objects.requireNonNull(name, "name");
this.type = type;
this.className = className;
this.state = state;
}

public String getName() {
Expand All @@ -56,6 +59,10 @@ public String getClassName() {
return className;
}

public String getState() {
return state;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand All @@ -67,12 +74,13 @@ public boolean equals(final Object o) {
final SimpleConnectorInfo that = (SimpleConnectorInfo) o;
return Objects.equals(name, that.name)
&& type == that.type
&& Objects.equals(className, that.className);
&& Objects.equals(className, that.className)
&& Objects.equals(state, that.state);
}

@Override
public int hashCode() {
return Objects.hash(name, type, className);
return Objects.hash(name, type, className, state);
}

@Override
Expand All @@ -81,6 +89,7 @@ public String toString() {
+ "name='" + name + '\''
+ ", type=" + type
+ ", className='" + className + '\''
+ ", state=" + state
+ '}';
}
}

0 comments on commit 5ff94b6

Please sign in to comment.