Skip to content

Commit

Permalink
Upgrade kudu client to 1.15.0
Browse files Browse the repository at this point in the history
Upgrading the kudu client revealed a few problems:
1. Timeouts to kudu tablets were sometimes occurring during deletes due
to a bug in the kudu java client in version 1.13.0.

2. Timeouts were *not* failing query execution because the kudu
connector was configured to flush operations in the background.

3. The two combined above meant tests that did deletes sometimes
actually did not perform deletes and would fail.

This patch upgrades the kudu client, explicitly fails trino execution
when kudu rpcs timeout, and marks unsupported data types from kudu
1.15.0.
  • Loading branch information
grantatspothero authored and hashhar committed Feb 16, 2022
1 parent a6f75d8 commit 103dbcf
Show file tree
Hide file tree
Showing 16 changed files with 175 additions and 21 deletions.
2 changes: 1 addition & 1 deletion docs/src/main/sphinx/connector/kudu.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Requirements

To connect to Kudu, you need:

* Kudu version 1.10 or higher.
* Kudu version 1.13.0 or higher.
* Network access from the Trino coordinator and workers to Kudu. Port 7051 is
the default port.

Expand Down
2 changes: 1 addition & 1 deletion plugin/trino-kudu/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
<kudu.version>1.10.0</kudu.version>
<kudu.version>1.15.0</kudu.version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ public List<KuduSplit> buildKuduSplits(KuduTableHandle tableHandle, DynamicFilte
KuduTable table = tableHandle.getTable(this);
int primaryKeyColumnCount = table.getSchema().getPrimaryKeyColumnCount();
KuduScanToken.KuduScanTokenBuilder builder = client.newScanTokenBuilder(table);
// TODO: remove when kudu client bug is fixed: https://gerrit.cloudera.org/#/c/18166/
builder.includeTabletMetadata(false);

TupleDomain<ColumnHandle> constraint = tableHandle.getConstraint()
.intersect(dynamicFilter.getCurrentPredicate().simplify(100));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.PartialRow;
import org.apache.kudu.client.SessionConfiguration;
import org.apache.kudu.client.Upsert;

import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -59,6 +58,7 @@
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.kudu.client.KuduOperationApplier.applyOperationAndVerifySucceeded;

public class KuduPageSink
implements ConnectorPageSink
Expand Down Expand Up @@ -103,7 +103,6 @@ private KuduPageSink(

this.table = table;
this.session = clientSession.newSession();
this.session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
uuid = UUID.randomUUID().toString();
}

Expand All @@ -125,7 +124,7 @@ public CompletableFuture<?> appendPage(Page page)
}

try {
session.apply(upsert);
applyOperationAndVerifySucceeded(session, upsert);
}
catch (KuduException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.PartialRow;
import org.apache.kudu.client.SessionConfiguration.FlushMode;

import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;

import static org.apache.kudu.client.KuduOperationApplier.applyOperationAndVerifySucceeded;

public class KuduUpdatablePageSource
implements UpdatablePageSource
{
Expand All @@ -50,7 +51,6 @@ public void deleteRows(Block rowIds)
{
Schema schema = table.getSchema();
KuduSession session = clientSession.newSession();
session.setFlushMode(FlushMode.AUTO_FLUSH_BACKGROUND);
try {
try {
for (int i = 0; i < rowIds.getPositionCount(); i++) {
Expand All @@ -59,7 +59,7 @@ public void deleteRows(Block rowIds)
PartialRow row = KeyEncoderAccessor.decodePrimaryKey(schema, slice.getBytes());
Delete delete = table.newDelete();
RowHelper.copyPrimaryKey(schema, row, delete.getRow());
session.apply(delete);
applyOperationAndVerifySucceeded(session, delete);
}
}
finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ private static Type fromKuduClientType(org.apache.kudu.Type ktype, ColumnTypeAtt
return VarbinaryType.VARBINARY;
case DECIMAL:
return DecimalType.createDecimalType(attributes.getPrecision(), attributes.getScale());
// TODO: add support for varchar and date types: https://github.com/trinodb/trino/issues/11009
case VARCHAR:
case DATE:
break;
}
throw new IllegalStateException("Kudu type not implemented for " + ktype);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,10 @@ private static Object toValue(Schema schema, PartialRow bound, Integer idx)
return bound.getBoolean(idx);
case BINARY:
return bound.getBinaryCopy(idx);
// TODO: add support for varchar and date types: https://github.com/trinodb/trino/issues/11009
case VARCHAR:
case DATE:
break;
}
throw new IllegalStateException("Unhandled type " + type + " for range partition");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.RowResult;
import org.apache.kudu.client.RowResultIterator;
import org.apache.kudu.client.SessionConfiguration;
import org.apache.kudu.client.Upsert;

import java.util.ArrayList;
Expand All @@ -41,6 +40,7 @@
import static io.trino.plugin.kudu.KuduClientSession.DEFAULT_SCHEMA;
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static io.trino.spi.StandardErrorCode.GENERIC_USER_ERROR;
import static org.apache.kudu.client.KuduOperationApplier.applyOperationAndVerifySucceeded;

public class SchemaEmulationByTableNameConvention
implements SchemaEmulation
Expand Down Expand Up @@ -68,7 +68,7 @@ public void createSchema(KuduClient client, String schemaName)
try {
Upsert upsert = schemasTable.newUpsert();
upsert.getRow().addString(0, schemaName);
session.apply(upsert);
applyOperationAndVerifySucceeded(session, upsert);
}
finally {
session.close();
Expand Down Expand Up @@ -110,7 +110,7 @@ public void dropSchema(KuduClient client, String schemaName)
try {
Delete delete = schemasTable.newDelete();
delete.getRow().addString(0, schemaName);
session.apply(delete);
applyOperationAndVerifySucceeded(session, delete);
}
finally {
session.close();
Expand Down Expand Up @@ -170,11 +170,10 @@ private void createAndFillSchemasTable(KuduClient client)
KuduTable schemasTable = client.createTable(rawSchemasTableName, schema, options);
KuduSession session = client.newSession();
try {
session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
for (String schemaName : existingSchemaNames) {
Insert insert = schemasTable.newInsert();
insert.getRow().addString(0, schemaName);
session.apply(insert);
applyOperationAndVerifySucceeded(session, insert);
}
}
finally {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kudu.client;

import io.trino.spi.TrinoException;

import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static java.lang.String.format;

/**
* Operation.getChangeType() is package private
*/
public final class KuduOperationApplier
{
private KuduOperationApplier()
{
}

public static OperationResponse applyOperationAndVerifySucceeded(KuduSession kuduSession, Operation operation)
throws KuduException
{
OperationResponse operationResponse = kuduSession.apply(operation);
if (operationResponse != null && operationResponse.hasRowError()) {
throw new TrinoException(GENERIC_INTERNAL_ERROR, format("Error while applying kudu operation %s: %s",
operation.getChangeType().toString(), operationResponse.getRowError()));
}
return operationResponse;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@ public abstract class AbstractKuduIntegrationSmokeTest
{
private TestingKuduServer kuduServer;

protected abstract String getKuduServerVersion();

protected abstract Optional<String> getKuduSchemaEmulationPrefix();

@Override
protected QueryRunner createQueryRunner()
throws Exception
{
kuduServer = new TestingKuduServer();
kuduServer = new TestingKuduServer(getKuduServerVersion());
return createKuduQueryRunnerTpch(kuduServer, getKuduSchemaEmulationPrefix(), CUSTOMER, NATION, ORDERS, REGION);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import java.util.Optional;

public class TestKuduSmokeTestWithDisabledInferSchema
public abstract class AbstractKuduSmokeTestWithDisabledInferSchema
extends AbstractKuduIntegrationSmokeTest
{
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import java.util.Optional;

public class TestKuduSmokeTestWithEmptyInferSchema
public abstract class AbstractKuduSmokeTestWithEmptyInferSchema
extends AbstractKuduIntegrationSmokeTest
{
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import static org.testng.Assert.assertEquals;

public class TestKuduSmokeTestWithStandardInferSchema
public abstract class AbstractKuduSmokeTestWithStandardInferSchema
extends AbstractKuduIntegrationSmokeTest
{
@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.kudu;

public class KuduLatestSmokeTests
{
private static final String KUDU_VERSION = "1.15.0";

public static class TestKuduSmokeTestWithDisabledInferSchema
extends AbstractKuduSmokeTestWithDisabledInferSchema
{
@Override
protected String getKuduServerVersion()
{
return KUDU_VERSION;
}
}

public static class TestKuduSmokeTestWithEmptyInferSchema
extends AbstractKuduSmokeTestWithEmptyInferSchema
{
@Override
protected String getKuduServerVersion()
{
return KUDU_VERSION;
}
}

public static class TestKuduSmokeTestWithStandardInferSchema
extends AbstractKuduSmokeTestWithStandardInferSchema
{
@Override
protected String getKuduServerVersion()
{
return KUDU_VERSION;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.kudu;

public class KuduSmokeTests
{
private static final String KUDU_VERSION = "1.13.0";

public static class TestKuduSmokeTestWithDisabledInferSchema
extends AbstractKuduSmokeTestWithDisabledInferSchema
{
@Override
protected String getKuduServerVersion()
{
return KUDU_VERSION;
}
}

public static class TestKuduSmokeTestWithEmptyInferSchema
extends AbstractKuduSmokeTestWithEmptyInferSchema
{
@Override
protected String getKuduServerVersion()
{
return KUDU_VERSION;
}
}

public static class TestKuduSmokeTestWithStandardInferSchema
extends AbstractKuduSmokeTestWithStandardInferSchema
{
@Override
protected String getKuduServerVersion()
{
return KUDU_VERSION;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
public class TestingKuduServer
implements Closeable
{
private static final String KUDU_IMAGE = "apache/kudu:1.10.0";
private static final String KUDU_IMAGE = "apache/kudu";
private static final Integer KUDU_MASTER_PORT = 7051;
private static final Integer KUDU_TSERVER_PORT = 7050;
private static final Integer NUMBER_OF_REPLICA = 3;
Expand All @@ -47,21 +47,27 @@ public class TestingKuduServer
private final GenericContainer<?> master;
private final List<GenericContainer<?>> tServers;

public TestingKuduServer()
{
// This version should match the kudu client version
this("1.15.0");
}

/**
* Kudu tablets needs to know the host/mapped port it will be bound to in order to configure --rpc_advertised_addresses
* However when using non-fixed ports in testcontainers, we only know the mapped port after the container starts up
* In order to workaround this, create a proxy to forward traffic from the host to the underlying tablets
* Since the ToxiProxy container starts up *before* kudu, we know the mapped port when configuring the kudu tablets
*/
public TestingKuduServer()
public TestingKuduServer(String kuduVersion)
{
network = Network.newNetwork();
ImmutableList.Builder<GenericContainer<?>> tServersBuilder = ImmutableList.builder();

String hostIP = getHostIPAddress();

String masterContainerAlias = "kudu-master";
this.master = new GenericContainer<>(KUDU_IMAGE)
this.master = new GenericContainer<>(format("%s:%s", KUDU_IMAGE, kuduVersion))
.withExposedPorts(KUDU_MASTER_PORT)
.withCommand("master")
.withNetwork(network)
Expand All @@ -75,7 +81,7 @@ public TestingKuduServer()
for (int instance = 0; instance < NUMBER_OF_REPLICA; instance++) {
String instanceName = "kudu-tserver-" + instance;
ToxiproxyContainer.ContainerProxy proxy = toxiProxy.getProxy(instanceName, KUDU_TSERVER_PORT);
GenericContainer<?> tableServer = new GenericContainer<>(KUDU_IMAGE)
GenericContainer<?> tableServer = new GenericContainer<>(format("%s:%s", KUDU_IMAGE, kuduVersion))
.withExposedPorts(KUDU_TSERVER_PORT)
.withCommand("tserver")
.withEnv("KUDU_MASTERS", format("%s:%s", masterContainerAlias, KUDU_MASTER_PORT))
Expand Down

0 comments on commit 103dbcf

Please sign in to comment.