Skip to content

Commit

Permalink
Merge branch 'trinodb:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
behrooz-stripe authored Sep 17, 2024
2 parents fd23000 + f3b8ad0 commit 1e2edd3
Show file tree
Hide file tree
Showing 558 changed files with 7,109 additions and 2,661 deletions.
6 changes: 3 additions & 3 deletions .github/DEVELOPMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -240,17 +240,17 @@ folder). You must have [Node.js](https://nodejs.org/en/download/) and
[Yarn](https://yarnpkg.com/en/) installed to execute these commands. To update
this folder after making changes, simply run:

yarn --cwd core/trino-main/src/main/resources/webapp/src install
yarn --cwd core/trino-web-ui/src/main/resources/webapp/src install

If no Javascript dependencies have changed (i.e., no changes to `package.json`),
it is faster to run:

yarn --cwd core/trino-main/src/main/resources/webapp/src run package
yarn --cwd core/trino-web-ui/src/main/resources/webapp/src run package

To simplify iteration, you can also run in `watch` mode, which automatically
re-compiles when changes to source files are detected:

yarn --cwd core/trino-main/src/main/resources/webapp/src run watch
yarn --cwd core/trino-web-ui/src/main/resources/webapp/src run watch

To iterate quickly, simply re-build the project in IntelliJ after packaging is
complete. Project resources will be hot-reloaded and changes are reflected on
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ jobs:
run: |
export MAVEN_OPTS="${MAVEN_INSTALL_OPTS}"
# Skip checks, these are run in `maven-checks` job and e.g. checkstyle is expensive.
$MAVEN ${MAVEN_TEST} -T 1C clean verify -DskipTests -Dair.check.skip-all=true ${MAVEN_GIB} -Dgib.buildUpstream=never -P errorprone-compiler \
$MAVEN ${MAVEN_TEST} -T 1C clean compile test-compile -DskipTests -Dair.check.skip-all=true ${MAVEN_GIB} -Dgib.buildUpstream=never -P errorprone-compiler \
-pl '!:trino-docs,!:trino-server,!:trino-server-rpm'
test-jdbc-compatibility:
Expand Down
2 changes: 1 addition & 1 deletion client/trino-cli/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.trino</groupId>
<artifactId>trino-root</artifactId>
<version>456-SNAPSHOT</version>
<version>458-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import static io.trino.client.uri.PropertyName.CLIENT_INFO;
import static io.trino.client.uri.PropertyName.CLIENT_TAGS;
import static io.trino.client.uri.PropertyName.DISABLE_COMPRESSION;
import static io.trino.client.uri.PropertyName.ENCODING;
import static io.trino.client.uri.PropertyName.EXTERNAL_AUTHENTICATION;
import static io.trino.client.uri.PropertyName.EXTERNAL_AUTHENTICATION_REDIRECT_HANDLERS;
import static io.trino.client.uri.PropertyName.EXTRA_CREDENTIALS;
Expand Down Expand Up @@ -286,6 +287,10 @@ public class ClientOptions
@Option(names = "--disable-compression", description = "Disable compression of query results")
public boolean disableCompression;

@PropertyMapping(ENCODING)
@Option(names = "--encoding", paramLabel = "<encoding>", description = "Experimental spooled protocol encoding [available: ${ENCODINGS}]")
public Optional<String> encoding = Optional.empty();

@Option(names = "--editing-mode", paramLabel = "<editing-mode>", defaultValue = "EMACS", description = "Editing mode [${COMPLETION-CANDIDATES}] " + DEFAULT_VALUE)
public EditingMode editingMode;

Expand Down Expand Up @@ -337,6 +342,7 @@ public ClientSession toClientSession(TrinoUri uri)
return uri
.toClientSessionBuilder()
.source(uri.getSource().orElse(SOURCE_DEFAULT))
.encoding(encoding)
.build();
}

Expand Down
7 changes: 5 additions & 2 deletions client/trino-cli/src/main/java/io/trino/cli/QueryRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import io.trino.client.StatementClient;
import io.trino.client.uri.HttpClientFactory;
import io.trino.client.uri.TrinoUri;
import okhttp3.Call;
import okhttp3.OkHttpClient;

import java.io.Closeable;
Expand All @@ -33,11 +32,15 @@ public class QueryRunner
private final AtomicReference<ClientSession> session;
private final boolean debug;
private final OkHttpClient httpClient;
private final OkHttpClient segmentHttpClient;

public QueryRunner(TrinoUri uri, ClientSession session, boolean debug)
{
this.session = new AtomicReference<>(requireNonNull(session, "session is null"));
this.httpClient = HttpClientFactory.toHttpClientBuilder(uri, session.getSource()).build();
this.segmentHttpClient = HttpClientFactory
.unauthenticatedClientBuilder(uri, session.getSource())
.build();
this.debug = debug;
}

Expand Down Expand Up @@ -68,7 +71,7 @@ public StatementClient startInternalQuery(String query)

private StatementClient startInternalQuery(ClientSession session, String query)
{
return newStatementClient((Call.Factory) httpClient, session, query);
return newStatementClient(httpClient, segmentHttpClient, session, query);
}

@Override
Expand Down
33 changes: 33 additions & 0 deletions client/trino-cli/src/main/java/io/trino/cli/Trino.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package io.trino.cli;

import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.google.common.net.HostAndPort;
import io.airlift.units.Duration;
import io.trino.cli.ClientOptions.ClientExtraCredential;
Expand All @@ -27,15 +29,20 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Enumeration;
import java.util.Map;
import java.util.Optional;
import java.util.ResourceBundle;
import java.util.stream.Stream;

import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.StandardSystemProperty.USER_HOME;
import static com.google.common.base.Strings.emptyToNull;
import static com.google.common.base.Throwables.getStackTraceAsString;
import static io.trino.cli.ClientOptions.DEBUG_OPTION_NAME;
import static io.trino.client.spooling.encoding.QueryDataDecoders.getSupportedEncodings;
import static java.lang.System.getenv;
import static java.util.Collections.enumeration;
import static java.util.regex.Pattern.quote;

public final class Trino
Expand All @@ -56,6 +63,7 @@ public static CommandLine createCommandLine(Object command)
.registerConverter(ClientExtraCredential.class, ClientExtraCredential::new)
.registerConverter(HostAndPort.class, HostAndPort::fromString)
.registerConverter(Duration.class, Duration::valueOf)
.setResourceBundle(new TrinoResourceBundle())
.setExecutionExceptionHandler((e, cmd, parseResult) -> {
System.err.println(formatCliErrorMessage(e, parseResult.hasMatchedOption(DEBUG_OPTION_NAME)));
return 1;
Expand Down Expand Up @@ -120,4 +128,29 @@ public String[] getVersion()
return new String[] {"Trino CLI " + firstNonNull(version, "(version unknown)")};
}
}

public static class TrinoResourceBundle
extends ResourceBundle
{
private final Map<String, String> variables;

public TrinoResourceBundle()
{
this.variables = ImmutableMap.<String, String>builder()
.put("ENCODINGS", Joiner.on(", ").join(getSupportedEncodings()))
.buildOrThrow();
}

@Override
protected Object handleGetObject(String key)
{
return variables.get(key);
}

@Override
public Enumeration<String> getKeys()
{
return enumeration(variables.keySet());
}
}
}
2 changes: 1 addition & 1 deletion client/trino-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.trino</groupId>
<artifactId>trino-root</artifactId>
<version>456-SNAPSHOT</version>
<version>458-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class ClientSession
private final String transactionId;
private final Duration clientRequestTimeout;
private final boolean compressionDisabled;
private Optional<String> encoding;

public static Builder builder()
{
Expand Down Expand Up @@ -95,7 +96,8 @@ private ClientSession(
Map<String, String> extraCredentials,
String transactionId,
Duration clientRequestTimeout,
boolean compressionDisabled)
boolean compressionDisabled,
Optional<String> encoding)
{
this.server = requireNonNull(server, "server is null");
this.principal = requireNonNull(principal, "principal is null");
Expand All @@ -118,6 +120,7 @@ private ClientSession(
this.extraCredentials = ImmutableMap.copyOf(requireNonNull(extraCredentials, "extraCredentials is null"));
this.clientRequestTimeout = clientRequestTimeout;
this.compressionDisabled = compressionDisabled;
this.encoding = requireNonNull(encoding, "encoding is null");

for (String clientTag : clientTags) {
checkArgument(!clientTag.contains(","), "client tag cannot contain ','");
Expand Down Expand Up @@ -261,6 +264,11 @@ public boolean isCompressionDisabled()
return compressionDisabled;
}

public Optional<String> getEncoding()
{
return encoding;
}

@Override
public String toString()
{
Expand All @@ -283,6 +291,7 @@ public String toString()
.add("resourceEstimates", resourceEstimates)
.add("clientRequestTimeout", clientRequestTimeout)
.add("compressionDisabled", compressionDisabled)
.add("encoding", encoding)
.omitNullValues()
.toString();
}
Expand Down Expand Up @@ -310,6 +319,7 @@ public static final class Builder
private String transactionId;
private Duration clientRequestTimeout;
private boolean compressionDisabled;
private Optional<String> encoding = Optional.empty();

private Builder() {}

Expand Down Expand Up @@ -337,6 +347,7 @@ private Builder(ClientSession clientSession)
transactionId = clientSession.getTransactionId();
clientRequestTimeout = clientSession.getClientRequestTimeout();
compressionDisabled = clientSession.isCompressionDisabled();
encoding = clientSession.getEncoding();
}

public Builder server(URI server)
Expand Down Expand Up @@ -465,6 +476,12 @@ public Builder compressionDisabled(boolean compressionDisabled)
return this;
}

public Builder encoding(Optional<String> encoding)
{
this.encoding = encoding;
return this;
}

public ClientSession build()
{
return new ClientSession(
Expand All @@ -488,7 +505,8 @@ public ClientSession build()
credentials,
transactionId,
clientRequestTimeout,
compressionDisabled);
compressionDisabled,
encoding);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ interface Factory
{
QueryDataDecoder create(List<Column> columns, DataAttributes queryAttributes);

String encodingId();
String encoding();
}

Iterable<List<Object>> decode(InputStream input, DataAttributes segmentAttributes)
throws IOException;

String encodingId();
String encoding();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,23 @@ public final class StatementClientFactory
{
private StatementClientFactory() {}

public static StatementClient newStatementClient(Call.Factory httpCallFactory, ClientSession session, String query)
public static StatementClient newStatementClient(Call.Factory httpCallFactory, Call.Factory segmentHttpCallFactory, ClientSession session, String query)
{
return new StatementClientV1(httpCallFactory, session, query, Optional.empty());
return new StatementClientV1(httpCallFactory, segmentHttpCallFactory, session, query, Optional.empty());
}

public static StatementClient newStatementClient(OkHttpClient httpClient, ClientSession session, String query, Optional<Set<String>> clientCapabilities)
public static StatementClient newStatementClient(OkHttpClient httpClient, Call.Factory segmentHttpCallFactory, ClientSession session, String query, Optional<Set<String>> clientCapabilities)
{
return new StatementClientV1((Call.Factory) httpClient, session, query, clientCapabilities);
return new StatementClientV1((Call.Factory) httpClient, segmentHttpCallFactory, session, query, clientCapabilities);
}

public static StatementClient newStatementClient(OkHttpClient httpClient, QueryDataDecoder.Factory decoderFactory, ClientSession session, String query, Optional<Set<String>> clientCapabilities)
public static StatementClient newStatementClient(Call.Factory httpCallFactory, ClientSession session, String query)
{
return new StatementClientV1(httpCallFactory, new OkHttpClient(), session, query, Optional.empty());
}

public static StatementClient newStatementClient(OkHttpClient httpClient, ClientSession session, String query, Optional<Set<String>> clientCapabilities)
{
return new StatementClientV1((Call.Factory) httpClient, Optional.of(decoderFactory), session, query, clientCapabilities);
return new StatementClientV1((Call.Factory) httpClient, new OkHttpClient(), session, query, clientCapabilities);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.trino.client.spooling.DataAttributes;
import io.trino.client.spooling.EncodedQueryData;
import io.trino.client.spooling.SegmentLoader;
import io.trino.client.spooling.encoding.QueryDataDecoders;
import jakarta.annotation.Nullable;
import okhttp3.Call;
import okhttp3.Headers;
Expand Down Expand Up @@ -108,16 +109,10 @@ class StatementClientV1
private final AtomicReference<State> state = new AtomicReference<>(State.RUNNING);

// Encoded data
private final Optional<QueryDataDecoder.Factory> queryDataDecoderFactory;
private final SegmentLoader segmentDownloader;
private final SegmentLoader segmentLoader;
private final AtomicReference<QueryDataDecoder> decoder = new AtomicReference<>();

public StatementClientV1(Call.Factory httpCallFactory, ClientSession session, String query, Optional<Set<String>> clientCapabilities)
{
this(httpCallFactory, Optional.empty(), session, query, clientCapabilities);
}

public StatementClientV1(Call.Factory httpCallFactory, Optional<QueryDataDecoder.Factory> queryDataDecoder, ClientSession session, String query, Optional<Set<String>> clientCapabilities)
public StatementClientV1(Call.Factory httpCallFactory, Call.Factory segmentHttpCallFactory, ClientSession session, String query, Optional<Set<String>> clientCapabilities)
{
requireNonNull(httpCallFactory, "httpCallFactory is null");
requireNonNull(session, "session is null");
Expand All @@ -139,10 +134,9 @@ public StatementClientV1(Call.Factory httpCallFactory, Optional<QueryDataDecoder
.map(Enum::name)
.collect(toImmutableSet())));
this.compressionDisabled = session.isCompressionDisabled();
this.queryDataDecoderFactory = requireNonNull(queryDataDecoder, "queryDataDecoder is null");
this.segmentDownloader = new SegmentLoader();
this.segmentLoader = new SegmentLoader(requireNonNull(segmentHttpCallFactory, "segmentHttpCallFactory is null"));

Request request = buildQueryRequest(session, query, queryDataDecoder.map(QueryDataDecoder.Factory::encodingId));
Request request = buildQueryRequest(session, query, session.getEncoding());
// Pass empty as materializedJsonSizeLimit to always materialize the first response
// to avoid losing the response body if the initial response parsing fails
executeRequest(request, "starting query", OptionalLong.empty(), this::isTransient);
Expand Down Expand Up @@ -280,7 +274,7 @@ public QueryData currentData()
}

EncodedQueryData queryData = (EncodedQueryData) queryResults.getData();
return queryData.toRawData(decoder.get(), segmentDownloader);
return queryData.toRawData(decoder.get(), segmentLoader);
}

@Override
Expand Down Expand Up @@ -520,13 +514,14 @@ private void processResponse(Headers headers, QueryResults results)
EncodedQueryData encodedData = (EncodedQueryData) results.getData();
DataAttributes queryAttributed = encodedData.getMetadata();
if (decoder.get() == null) {
QueryDataDecoder queryDataDecoder = queryDataDecoderFactory
.orElseThrow(() -> new IllegalStateException("Received encoded data format but there is no decoder"))
verify(QueryDataDecoders.exists(encodedData.getEncoding()), "Received encoded data format but there is no decoder matching %s", encodedData.getEncoding());
QueryDataDecoder queryDataDecoder = QueryDataDecoders
.get(encodedData.getEncoding())
.create(results.getColumns(), queryAttributed);
decoder.set(queryDataDecoder);
}

verify(decoder.get().encodingId().equals(encodedData.getEncodingId()), "Decoder has wrong encoding id, expected %s, got %s", encodedData.getEncodingId(), decoder.get().encodingId());
verify(decoder.get().encoding().equals(encodedData.getEncoding()), "Decoder has wrong encoding id, expected %s, got %s", encodedData.getEncoding(), decoder.get().encoding());
}

currentResults.set(results);
Expand Down Expand Up @@ -576,7 +571,6 @@ public void close()
if (uri != null) {
httpDelete(uri);
}
segmentDownloader.close();
}
}

Expand Down
Loading

0 comments on commit 1e2edd3

Please sign in to comment.