Skip to content

Commit

Permalink
Implement inserts for kafka connector
Browse files Browse the repository at this point in the history
  • Loading branch information
charlesjmorgan committed Jun 30, 2020
1 parent 6d936a4 commit acdfe1b
Show file tree
Hide file tree
Showing 33 changed files with 2,474 additions and 27 deletions.
10 changes: 10 additions & 0 deletions presto-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,16 @@
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>net.sf.opencsv</groupId>
<artifactId>opencsv</artifactId>
</dependency>

<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</dependency>

<!-- Presto SPI -->
<dependency>
<groupId>io.prestosql</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.prestosql.decoder.DecoderColumnHandle;
import io.prestosql.plugin.kafka.encoder.EncoderColumnHandle;
import io.prestosql.spi.connector.ColumnMetadata;
import io.prestosql.spi.type.Type;

Expand All @@ -25,7 +26,7 @@
import static java.util.Objects.requireNonNull;

public final class KafkaColumnHandle
implements DecoderColumnHandle, Comparable<KafkaColumnHandle>
implements EncoderColumnHandle, DecoderColumnHandle, Comparable<KafkaColumnHandle>
{
private final int ordinalPosition;

Expand Down Expand Up @@ -92,12 +93,6 @@ public KafkaColumnHandle(
this.internal = internal;
}

@JsonProperty
public int getOrdinalPosition()
{
return ordinalPosition;
}

@Override
@JsonProperty
public String getName()
Expand Down Expand Up @@ -152,6 +147,13 @@ public boolean isInternal()
return internal;
}

@Override
@JsonProperty
public int getOrdinalPosition()
{
return ordinalPosition;
}

ColumnMetadata getColumnMetadata()
{
return ColumnMetadata.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.airlift.bootstrap.LifeCycleManager;
import io.prestosql.spi.connector.Connector;
import io.prestosql.spi.connector.ConnectorMetadata;
import io.prestosql.spi.connector.ConnectorPageSinkProvider;
import io.prestosql.spi.connector.ConnectorRecordSetProvider;
import io.prestosql.spi.connector.ConnectorSplitManager;
import io.prestosql.spi.connector.ConnectorTransactionHandle;
Expand All @@ -34,18 +35,21 @@ public class KafkaConnector
private final ConnectorMetadata metadata;
private final ConnectorSplitManager splitManager;
private final ConnectorRecordSetProvider recordSetProvider;
private final ConnectorPageSinkProvider pageSinkProvider;

@Inject
public KafkaConnector(
LifeCycleManager lifeCycleManager,
ConnectorMetadata metadata,
ConnectorSplitManager splitManager,
ConnectorRecordSetProvider recordSetProvider)
ConnectorRecordSetProvider recordSetProvider,
ConnectorPageSinkProvider pageSinkProvider)
{
this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
this.metadata = requireNonNull(metadata, "metadata is null");
this.splitManager = requireNonNull(splitManager, "splitManager is null");
this.recordSetProvider = requireNonNull(recordSetProvider, "recordSetProvider is null");
this.pageSinkProvider = requireNonNull(pageSinkProvider, "pageSinkProvider is null");
}

@Override
Expand Down Expand Up @@ -73,6 +77,12 @@ public ConnectorRecordSetProvider getRecordSetProvider()
return recordSetProvider;
}

@Override
public ConnectorPageSinkProvider getPageSinkProvider()
{
return pageSinkProvider;
}

@Override
public final void shutdown()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
import com.google.inject.Module;
import com.google.inject.Scopes;
import io.prestosql.decoder.DecoderModule;
import io.prestosql.plugin.base.classloader.ClassLoaderSafeConnectorPageSinkProvider;
import io.prestosql.plugin.base.classloader.ClassLoaderSafeConnectorRecordSetProvider;
import io.prestosql.plugin.base.classloader.ClassLoaderSafeConnectorSplitManager;
import io.prestosql.plugin.base.classloader.ForClassLoaderSafe;
import io.prestosql.plugin.kafka.encoder.EncoderModule;
import io.prestosql.spi.connector.ConnectorMetadata;
import io.prestosql.spi.connector.ConnectorPageSinkProvider;
import io.prestosql.spi.connector.ConnectorRecordSetProvider;
import io.prestosql.spi.connector.ConnectorSplitManager;
import io.prestosql.spi.type.Type;
Expand All @@ -48,6 +51,8 @@ public void configure(Binder binder)
binder.bind(ConnectorSplitManager.class).to(ClassLoaderSafeConnectorSplitManager.class).in(Scopes.SINGLETON);
binder.bind(ConnectorRecordSetProvider.class).annotatedWith(ForClassLoaderSafe.class).to(KafkaRecordSetProvider.class).in(Scopes.SINGLETON);
binder.bind(ConnectorRecordSetProvider.class).to(ClassLoaderSafeConnectorRecordSetProvider.class).in(Scopes.SINGLETON);
binder.bind(ConnectorPageSinkProvider.class).annotatedWith(ForClassLoaderSafe.class).to(KafkaPageSinkProvider.class).in(Scopes.SINGLETON);
binder.bind(ConnectorPageSinkProvider.class).to(ClassLoaderSafeConnectorPageSinkProvider.class).in(Scopes.SINGLETON);
binder.bind(KafkaConnector.class).in(Scopes.SINGLETON);

configBinder(binder).bindConfig(KafkaConfig.class);
Expand All @@ -57,6 +62,7 @@ public void configure(Binder binder)
jsonCodecBinder(binder).bindJsonCodec(KafkaTopicDescription.class);

binder.install(new DecoderModule());
binder.install(new EncoderModule());
}

private static final class TypeDeserializer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.prestosql.spi.connector.ColumnHandle;
import io.prestosql.spi.connector.ConnectorHandleResolver;
import io.prestosql.spi.connector.ConnectorInsertTableHandle;
import io.prestosql.spi.connector.ConnectorSplit;
import io.prestosql.spi.connector.ConnectorTableHandle;
import io.prestosql.spi.connector.ConnectorTransactionHandle;
Expand All @@ -37,6 +38,12 @@ public Class<? extends ColumnHandle> getColumnHandleClass()
return KafkaColumnHandle.class;
}

@Override
public Class<? extends ConnectorInsertTableHandle> getInsertTableHandleClass()
{
return KafkaTableHandle.class;
}

@Override
public Class<? extends ConnectorSplit> getSplitClass()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,25 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.slice.Slice;
import io.prestosql.decoder.dummy.DummyRowDecoder;
import io.prestosql.spi.connector.ColumnHandle;
import io.prestosql.spi.connector.ColumnMetadata;
import io.prestosql.spi.connector.ConnectorInsertTableHandle;
import io.prestosql.spi.connector.ConnectorMetadata;
import io.prestosql.spi.connector.ConnectorOutputMetadata;
import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.connector.ConnectorTableHandle;
import io.prestosql.spi.connector.ConnectorTableMetadata;
import io.prestosql.spi.connector.ConnectorTableProperties;
import io.prestosql.spi.connector.SchemaTableName;
import io.prestosql.spi.connector.SchemaTablePrefix;
import io.prestosql.spi.connector.TableNotFoundException;
import io.prestosql.spi.statistics.ComputedStatistics;

import javax.inject.Inject;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -82,7 +87,10 @@ public KafkaTableHandle getTableHandle(ConnectorSession session, SchemaTableName
getDataFormat(kafkaTopicDescription.getKey()),
getDataFormat(kafkaTopicDescription.getMessage()),
kafkaTopicDescription.getKey().flatMap(KafkaTopicFieldGroup::getDataSchema),
kafkaTopicDescription.getMessage().flatMap(KafkaTopicFieldGroup::getDataSchema)))
kafkaTopicDescription.getMessage().flatMap(KafkaTopicFieldGroup::getDataSchema),
getColumnHandles(schemaTableName).values().stream()
.map(columnHandle -> (KafkaColumnHandle) columnHandle)
.collect(toImmutableList())))
.orElse(null);
}

Expand All @@ -107,13 +115,16 @@ public List<SchemaTableName> listTables(ConnectorSession session, Optional<Strin
.collect(toImmutableList());
}

@SuppressWarnings("ValueOfIncrementOrDecrementUsed")
@Override
public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle)
{
KafkaTableHandle kafkaTableHandle = convertTableHandle(tableHandle);
return getColumnHandles(kafkaTableHandle.toSchemaTableName());
}

SchemaTableName schemaTableName = kafkaTableHandle.toSchemaTableName();
@SuppressWarnings("ValueOfIncrementOrDecrementUsed")
private Map<String, ColumnHandle> getColumnHandles(SchemaTableName schemaTableName)
{
KafkaTopicDescription kafkaTopicDescription = getRequiredTopicDescription(schemaTableName);

ImmutableMap.Builder<String, ColumnHandle> columnHandles = ImmutableMap.builder();
Expand Down Expand Up @@ -235,4 +246,28 @@ private Optional<KafkaTopicDescription> getTopicDescription(SchemaTableName sche
.map(Optional::get)
.findFirst();
}

@Override
public ConnectorInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle, List<ColumnHandle> columns)
{
KafkaTableHandle table = (KafkaTableHandle) tableHandle;

return new KafkaTableHandle(
table.getSchemaName(),
table.getTableName(),
table.getTopicName(),
table.getKeyDataFormat(),
table.getMessageDataFormat(),
table.getKeyDataSchemaLocation(),
table.getMessageDataSchemaLocation(),
columns.stream()
.map(KafkaColumnHandle.class::cast)
.collect(toImmutableList()));
}

@Override
public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session, ConnectorInsertTableHandle insertHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prestosql.plugin.kafka;

import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Shorts;
import com.google.common.primitives.SignedBytes;
import io.airlift.slice.Slice;
import io.prestosql.plugin.kafka.encoder.EncoderColumnHandle;
import io.prestosql.plugin.kafka.encoder.RowEncoder;
import io.prestosql.spi.Page;
import io.prestosql.spi.block.Block;
import io.prestosql.spi.connector.ConnectorPageSink;
import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.type.TimestampType;
import io.prestosql.spi.type.TimestampWithTimeZoneType;
import io.prestosql.spi.type.Type;
import io.prestosql.spi.type.VarcharType;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;

import static io.prestosql.spi.type.BigintType.BIGINT;
import static io.prestosql.spi.type.BooleanType.BOOLEAN;
import static io.prestosql.spi.type.DateType.DATE;
import static io.prestosql.spi.type.DoubleType.DOUBLE;
import static io.prestosql.spi.type.IntegerType.INTEGER;
import static io.prestosql.spi.type.RealType.REAL;
import static io.prestosql.spi.type.SmallintType.SMALLINT;
import static io.prestosql.spi.type.TimeType.TIME;
import static io.prestosql.spi.type.TimeWithTimeZoneType.TIME_WITH_TIME_ZONE;
import static io.prestosql.spi.type.TinyintType.TINYINT;
import static io.prestosql.spi.type.VarbinaryType.VARBINARY;
import static java.lang.Float.intBitsToFloat;
import static java.lang.Math.toIntExact;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.completedFuture;

public class KafkaPageSink
implements ConnectorPageSink
{
private final String topicName;
private final List<KafkaColumnHandle> columns;
private final RowEncoder keyEncoder;
private final RowEncoder messageEncoder;
private final ConnectorSession session;
private final KafkaProducer<byte[], byte[]> producer;

public KafkaPageSink(
String topicName,
List<KafkaColumnHandle> columns,
RowEncoder keyEncoder,
RowEncoder messageEncoder,
ConnectorSession session,
PlainTextKafkaProducerFactory producerFactory)
{
this.topicName = requireNonNull(topicName, "topicName is null");
this.columns = (requireNonNull(ImmutableList.copyOf(columns), "columns is null"));
this.keyEncoder = requireNonNull(keyEncoder, "keyEncoder is null");
this.messageEncoder = requireNonNull(messageEncoder, "messageEncoder is null");
this.session = requireNonNull(session, "session is null");
this.producer = requireNonNull(producerFactory.create(new ByteArraySerializer(), new ByteArraySerializer()), "producerFactory is null");
}

@Override
public CompletableFuture<?> appendPage(Page page)
{
for (int position = 0; position < page.getPositionCount(); position++) {
for (int channel = 0; channel < page.getChannelCount(); channel++) {
appendColumn(messageEncoder, page, channel, position);
}
producer.send(new ProducerRecord<>(topicName, messageEncoder.toByteArray()));
keyEncoder.clear();
messageEncoder.clear();
}
producer.flush();
return NOT_BLOCKED;
}

private void appendColumn(RowEncoder rowEncoder, Page page, int channel, int position)
{
Block block = page.getBlock(channel);
EncoderColumnHandle columnHandle = columns.get(channel);
Type type = columns.get(channel).getType();
if (block.isNull(position)) {
rowEncoder.putNullValue(columnHandle);
}
else if (type == BOOLEAN) {
rowEncoder.put(columnHandle, type.getBoolean(block, position));
}
else if (type == BIGINT) {
rowEncoder.put(columnHandle, type.getLong(block, position));
}
else if (type == INTEGER) {
rowEncoder.put(columnHandle, toIntExact(type.getLong(block, position)));
}
else if (type == SMALLINT) {
rowEncoder.put(columnHandle, Shorts.checkedCast(type.getLong(block, position)));
}
else if (type == TINYINT) {
rowEncoder.put(columnHandle, SignedBytes.checkedCast(type.getLong(block, position)));
}
else if (type == DOUBLE) {
rowEncoder.put(columnHandle, type.getDouble(block, position));
}
else if (type == REAL) {
rowEncoder.put(columnHandle, intBitsToFloat(toIntExact(type.getLong(block, position))));
}
else if (type instanceof VarcharType) {
rowEncoder.put(columnHandle, type.getSlice(block, position).toStringUtf8());
}
else if (type == VARBINARY) {
rowEncoder.put(columnHandle, type.getSlice(block, position).toByteBuffer());
}
else if (type == DATE) {
rowEncoder.put(columnHandle, type.getObjectValue(session, block, position));
}
else if (type == TIME) {
rowEncoder.put(columnHandle, type.getObjectValue(session, block, position));
}
else if (type == TIME_WITH_TIME_ZONE) {
rowEncoder.put(columnHandle, type.getObjectValue(session, block, position));
}
else if (type instanceof TimestampType) {
rowEncoder.put(columnHandle, type.getObjectValue(session, block, position));
}
else if (type instanceof TimestampWithTimeZoneType) {
rowEncoder.put(columnHandle, type.getObjectValue(session, block, position));
}
}

@Override
public CompletableFuture<Collection<Slice>> finish()
{
producer.close();
return completedFuture(ImmutableList.of());
}

@Override
public void abort() {}
}
Loading

0 comments on commit acdfe1b

Please sign in to comment.