Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle 'RUN SCRIPT' directly in CLI, rather than post it to KSQL. #2331

Merged
merged 20 commits into from
Jan 21, 2019
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
37606f3
Initial new `run` command
big-andy-coates Dec 21, 2018
cdd315d
Merge branch 'master' into cli_run_script
big-andy-coates Jan 9, 2019
9878b3c
- support for CLI commands with spaces in the name.
big-andy-coates Jan 10, 2019
1d87461
remove old runscript functionality
big-andy-coates Jan 11, 2019
64c08ca
fix test
big-andy-coates Jan 11, 2019
196a525
fix test
big-andy-coates Jan 12, 2019
e64be32
Hojjat's requested changes.
big-andy-coates Jan 15, 2019
07d6db3
Merge branch 'master' into cli_run_script
big-andy-coates Jan 15, 2019
1dccb8e
Almog's requested changes.
big-andy-coates Jan 16, 2019
3e618ce
Merge branch 'cli_run_script' of github.com:big-andy-coates/ksql into…
big-andy-coates Jan 16, 2019
0604482
Almog's requested changes.
big-andy-coates Jan 16, 2019
6676651
Merge branch 'master' into cli_run_script
big-andy-coates Jan 16, 2019
c2653e7
Merge branch 'master' into cli_run_script
big-andy-coates Jan 16, 2019
a7d7e2c
Rohan's requested changes (ish)
big-andy-coates Jan 18, 2019
8cebfc8
Merge branch 'cli_run_script' of github.com:big-andy-coates/ksql into…
big-andy-coates Jan 18, 2019
1d3747f
Merge branch 'master' into cli_run_script
big-andy-coates Jan 18, 2019
6f91b84
Fix tests.
big-andy-coates Jan 18, 2019
4ea01cd
Merge branch 'master' into cli_run_script
big-andy-coates Jan 21, 2019
1de4a61
Merge branch 'cli_run_script' of github.com:big-andy-coates/ksql into…
big-andy-coates Jan 21, 2019
f57a8ab
Add tests to cover javadoc comments.
big-andy-coates Jan 21, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 24 additions & 44 deletions ksql-cli/src/main/java/io/confluent/ksql/cli/Cli.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.confluent.ksql.cli.console.Console;
import io.confluent.ksql.cli.console.KsqlTerminal.StatusClosable;
import io.confluent.ksql.cli.console.OutputFormat;
import io.confluent.ksql.cli.console.cmd.CliCommandRegisterUtil;
import io.confluent.ksql.cli.console.cmd.RemoteServerSpecificCommand;
import io.confluent.ksql.ddl.DdlConfig;
import io.confluent.ksql.parser.AstBuilder;
Expand All @@ -44,8 +45,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -57,13 +56,14 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import org.jline.reader.EndOfFileException;
import org.jline.reader.UserInterruptException;
import org.jline.terminal.Terminal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Cli implements Closeable {
public class Cli implements KsqlRequestExecutor, Closeable {

private static final Logger LOGGER = LoggerFactory.getLogger(Cli.class);

Expand All @@ -81,7 +81,7 @@ public static Cli build(
final OutputFormat outputFormat,
final KsqlRestClient restClient
) {
final Console console = Console.build(outputFormat, restClient);
final Console console = Console.build(outputFormat);
return new Cli(streamedQueryRowLimit, streamedQueryTimeoutMs, restClient, console);
}

Expand All @@ -100,8 +100,22 @@ public static Cli build(
this.terminal = terminal;
this.queryStreamExecutorService = Executors.newSingleThreadExecutor();

final Supplier<String> versionSuppler =
() -> restClient.getServerInfo().getResponse().getVersion();

CliCommandRegisterUtil.registerDefaultCommands(this, terminal, versionSuppler);

terminal
.registerCliSpecificCommand(new RemoteServerSpecificCommand(restClient, terminal.writer()));
.registerCliSpecificCommand(new RemoteServerSpecificCommand(restClient));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this line into registerDefaultCommands?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

@Override
public void makeKsqlRequest(final String requestBody) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rename to something like statementText or statements - requestBody implies the http request body

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

try {
printKsqlResponse(restClient.makeKsqlRequest(requestBody));
} catch (IOException e) {
throw new KsqlException(e);
}
}

public void runInteractively() {
Expand Down Expand Up @@ -222,25 +236,21 @@ private void handleStatements(final String line)
);

} else if (statementContext.statement() instanceof SqlBaseParser.ListPropertiesContext) {
listProperties(statementText);
makeKsqlRequest(statementText);

} else if (statementContext.statement() instanceof SqlBaseParser.SetPropertyContext) {
setProperty(statementContext);

} else if (statementContext.statement() instanceof SqlBaseParser.UnsetPropertyContext) {
consecutiveStatements = unsetProperty(consecutiveStatements, statementContext);
} else if (statementContext.statement() instanceof SqlBaseParser.RunScriptContext) {
runScript(statementContext, statementText);
} else if (statementContext.statement() instanceof SqlBaseParser.RegisterTopicContext) {
registerTopic(consecutiveStatements, statementContext, statementText);
} else {
consecutiveStatements.append(statementText);
}
}
if (consecutiveStatements.length() != 0) {
printKsqlResponse(
restClient.makeKsqlRequest(consecutiveStatements.toString())
);
makeKsqlRequest(consecutiveStatements.toString());
}
}

Expand All @@ -256,36 +266,13 @@ private void registerTopic(
consecutiveStatements.append(statementText);
}

private void runScript(
final SqlBaseParser.SingleStatementContext statementContext,
final String statementText
) throws IOException {
final SqlBaseParser.RunScriptContext runScriptContext =
(SqlBaseParser.RunScriptContext) statementContext.statement();
final String schemaFilePath = AstBuilder.unquote(runScriptContext.STRING().getText(), "'");
final String fileContent;
try {
fileContent = new String(Files.readAllBytes(Paths.get(schemaFilePath)), UTF_8);
} catch (final IOException e) {
throw new KsqlException(
" Could not read statements from the provided script file " + schemaFilePath + ": "
+ e + " Make sure the file exists and can be read by KSQL CLI.",
e
);
}
setProperty(KsqlConstants.RUN_SCRIPT_STATEMENTS_CONTENT, fileContent);
printKsqlResponse(
restClient.makeKsqlRequest(statementText)
);
}

private StringBuilder printOrDisplayQueryResults(
final StringBuilder consecutiveStatements,
final SqlBaseParser.SingleStatementContext statementContext,
final String statementText
) throws InterruptedException, IOException, ExecutionException {
if (consecutiveStatements.length() != 0) {
printKsqlResponse(restClient.makeKsqlRequest(consecutiveStatements.toString()));
makeKsqlRequest(consecutiveStatements.toString());
consecutiveStatements.setLength(0);
}
if (statementContext.statement() instanceof SqlBaseParser.QuerystatementContext) {
Expand All @@ -296,11 +283,6 @@ private StringBuilder printOrDisplayQueryResults(
return consecutiveStatements;
}

private void listProperties(final String statementText) throws IOException {
final KsqlEntityList ksqlEntityList = restClient.makeKsqlRequest(statementText).getResponse();
terminal.printKsqlEntityList(ksqlEntityList);
}

private void printKsqlResponse(final RestResponse<KsqlEntityList> response) throws IOException {
if (response.isSuccessful()) {
final KsqlEntityList ksqlEntities = response.getResponse();
Expand Down Expand Up @@ -458,11 +440,9 @@ private void setProperty(final String property, final String value) {
private StringBuilder unsetProperty(
final StringBuilder consecutiveStatements,
final SqlBaseParser.SingleStatementContext statementContext
) throws IOException {
) {
if (consecutiveStatements.length() != 0) {
printKsqlResponse(
restClient.makeKsqlRequest(consecutiveStatements.toString())
);
makeKsqlRequest(consecutiveStatements.toString());
consecutiveStatements.setLength(0);
}
final SqlBaseParser.UnsetPropertyContext unsetPropertyContext =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Confluent Community License; you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.cli;

public interface KsqlRequestExecutor {

/**
* Execute a request on the KSQL servers and handle the response.
*
* @param body the request body.
*/
void makeKsqlRequest(String body);
}
84 changes: 57 additions & 27 deletions ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@

package io.confluent.ksql.cli.console;

import static io.confluent.ksql.util.CmdLineUtil.splitByUnquotedWhitespace;
import static io.confluent.ksql.util.CmdLineUtil.trimTrailingSemiColon;
agavra marked this conversation as resolved.
Show resolved Hide resolved

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.cli.console.KsqlTerminal.StatusClosable;
import io.confluent.ksql.cli.console.cmd.CliCommandRegisterUtil;
import io.confluent.ksql.cli.console.cmd.CliSpecificCommand;
import io.confluent.ksql.cli.console.table.Table;
import io.confluent.ksql.cli.console.table.Table.Builder;
Expand All @@ -34,7 +37,6 @@
import io.confluent.ksql.cli.console.table.builder.TableBuilder;
import io.confluent.ksql.cli.console.table.builder.TablesListTableBuilder;
import io.confluent.ksql.cli.console.table.builder.TopicDescriptionTableBuilder;
import io.confluent.ksql.rest.client.KsqlRestClient;
import io.confluent.ksql.rest.entity.CommandStatusEntity;
import io.confluent.ksql.rest.entity.ExecutionPlan;
import io.confluent.ksql.rest.entity.FieldInfo;
Expand All @@ -61,6 +63,7 @@
import io.confluent.ksql.rest.entity.StreamsList;
import io.confluent.ksql.rest.entity.TablesList;
import io.confluent.ksql.rest.entity.TopicDescription;
import io.confluent.ksql.util.CmdLineUtil;
import io.confluent.ksql.util.HandlerMaps;
import io.confluent.ksql.util.HandlerMaps.ClassHandlerMap1;
import io.confluent.ksql.util.HandlerMaps.Handler1;
Expand All @@ -75,6 +78,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.StringTokenizer;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
Expand All @@ -85,7 +89,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class Console implements Closeable {
public class Console implements Closeable {

private static final Logger log = LoggerFactory.getLogger(Console.class);

Expand Down Expand Up @@ -151,11 +155,11 @@ public interface RowCaptor {
void addRows(List<List<String>> fields);
}

public static Console build(final OutputFormat outputFormat, final KsqlRestClient restClient) {
public static Console build(final OutputFormat outputFormat) {
final AtomicReference<Console> consoleRef = new AtomicReference<>();
final Predicate<String> isCliCommand = line -> {
final Console theConsole = consoleRef.get();
return theConsole != null && theConsole.isCliCommand(line);
return theConsole != null && theConsole.getCliCommand(line).isPresent();
};

final Path historyFilePath = Paths.get(System.getProperty(
Expand All @@ -166,19 +170,15 @@ public static Console build(final OutputFormat outputFormat, final KsqlRestClien

final KsqlTerminal terminal = new JLineTerminal(isCliCommand, historyFilePath);

final Supplier<String> versionSuppler =
() -> restClient.getServerInfo().getResponse().getVersion();

final Console console = new Console(
outputFormat, versionSuppler, terminal, new NoOpRowCaptor());
outputFormat, terminal, new NoOpRowCaptor());

consoleRef.set(console);
return console;
}

public Console(
final OutputFormat outputFormat,
final Supplier<String> versionSuppler,
final KsqlTerminal terminal,
final RowCaptor rowCaptor
) {
Expand All @@ -187,8 +187,6 @@ public Console(
this.rowCaptor = Objects.requireNonNull(rowCaptor, "rowCaptor");
this.cliSpecificCommands = Maps.newLinkedHashMap();
this.objectMapper = new ObjectMapper().disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET);

CliCommandRegisterUtil.registerDefaultCommands(this, versionSuppler);
}

public PrintWriter writer() {
Expand Down Expand Up @@ -289,8 +287,12 @@ public void printKsqlEntityList(final List<KsqlEntity> entityList) throws IOExce
printAsJson(entityList);
break;
case TABULAR:
final boolean showStatements = entityList.size() > 1;
for (final KsqlEntity ksqlEntity : entityList) {
writer().println();
if (showStatements) {
writer().println(ksqlEntity.getStatementText());
}
printAsTable(ksqlEntity);
}
break;
Expand All @@ -303,7 +305,7 @@ public void printKsqlEntityList(final List<KsqlEntity> entityList) throws IOExce
}

public void registerCliSpecificCommand(final CliSpecificCommand cliSpecificCommand) {
cliSpecificCommands.put(cliSpecificCommand.getName(), cliSpecificCommand);
cliSpecificCommands.put(cliSpecificCommand.getName().toLowerCase(), cliSpecificCommand);
}

public void setOutputFormat(final String newFormat) {
Expand All @@ -323,13 +325,21 @@ public OutputFormat getOutputFormat() {
return outputFormat;
}

private boolean isCliCommand(final String line) {
final String[] split = line.split("\\s+", 2);
final String command = split[0]
.trim()
.toLowerCase();
private Optional<CliCmdExecutor> getCliCommand(final String line) {
final List<String> parts = splitByUnquotedWhitespace(trimTrailingSemiColon(line));
if (parts.isEmpty()) {
return Optional.empty();
}

return cliSpecificCommands.containsKey(command);
final String reconstructed = parts.stream()
.collect(Collectors.joining(" "));

final String asLowerCase = reconstructed.toLowerCase();

return cliSpecificCommands.entrySet().stream()
.filter(e -> asLowerCase.startsWith(e.getKey()))
.map(e -> CliCmdExecutor.of(e.getValue(), parts))
.findFirst();
}

private void printAsTable(final GenericRow row) {
Expand Down Expand Up @@ -677,16 +687,36 @@ private boolean maybeHandleCliSpecificCommands(final String line) {
return false;
}

final String[] split = line.split("\\s+", 2);
final String command = split[0].toLowerCase();
return getCliCommand(line)
.map(cmd -> {
cmd.execute(writer());
flush();
return true;
})
.orElse(false);
}

private static final class CliCmdExecutor {

final CliSpecificCommand cliSpecificCommand = cliSpecificCommands.get(command);
if (cliSpecificCommand == null) {
return false;
private final CliSpecificCommand cmd;
private final List<String> args;

private static CliCmdExecutor of(final CliSpecificCommand cmd, final List<String> lineParts) {
final String[] nameParts = cmd.getName().split("\\s+");
final List<String> argList = lineParts.subList(nameParts.length, lineParts.size()).stream()
.map(CmdLineUtil::removeMatchedSingleQuotes)
.collect(Collectors.toList());

return new CliCmdExecutor(cmd, argList);
}

final String commandArg = split.length > 1 ? split[1] : "";
cliSpecificCommand.execute(commandArg);
return true;
private CliCmdExecutor(final CliSpecificCommand cmd, final List<String> args) {
this.cmd = Objects.requireNonNull(cmd, "cmd");
this.args = ImmutableList.copyOf(Objects.requireNonNull(args, "args"));
}

public void execute(final PrintWriter terminal) {
cmd.execute(args, terminal);
}
}
}
Loading