Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue-494] FLIP-107: Expose metadata for the new table source #504

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
5a72ad3
[Issue-494] FLIP-107: Expose metadata for new Table source
thekingofcity Apr 26, 2021
ac9e447
[Issue-494] FLIP-107: Expose metadata for new Table source
thekingofcity Apr 27, 2021
c91163d
[Issue-494] FLIP-107: Expose metadata for new Table source
thekingofcity Apr 28, 2021
d8b6fd1
Merge branch 'master' into issue-494-FLIP-107-Expose-metadata-for-new…
thekingofcity Apr 29, 2021
546f2a6
[Issue-494] FLIP-107: Expose metadata for new Table source
thekingofcity Apr 30, 2021
9da412a
[Issue-494] FLIP-107: Expose metadata for new Table source
thekingofcity Apr 30, 2021
91ac18e
[Issue-494] FLIP-107: Expose metadata for new Table source
thekingofcity Apr 30, 2021
8065a10
[Issue-494] FLIP-107: Expose metadata for new Table source
thekingofcity Apr 30, 2021
74edaab
[Issue-494] FLIP-107: Expose metadata for new Table source
thekingofcity Apr 30, 2021
395fde9
[Issue-494] FLIP-107: Expose metadata for new Table source
thekingofcity Apr 30, 2021
8dcb605
[Issue-494] FLIP-107: Expose metadata for new Table source
thekingofcity May 14, 2021
3dd02b1
Merge branch 'master' into issue-494-FLIP-107-Expose-metadata-for-new…
thekingofcity May 14, 2021
5566d0f
remove redundant code
thekingofcity May 14, 2021
1734661
fix conflicting ITCase
thekingofcity May 14, 2021
0761e7e
remove redundant code
thekingofcity May 14, 2021
3c13d9d
update the doc
thekingofcity May 14, 2021
80604b1
fix copy
thekingofcity May 17, 2021
2043c19
remove useless test (won't throw error if fails)
thekingofcity May 17, 2021
7e59bf5
use sequence instead of array
thekingofcity May 17, 2021
30f9ef7
add a full initializer for FlinkPravegaDynamicTableSource
thekingofcity May 17, 2021
5a247c3
remove check according to flink's implementation
thekingofcity May 17, 2021
fd8fc39
relocate POJO
thekingofcity May 17, 2021
603487e
make initializer package-private
thekingofcity May 17, 2021
090f303
format
thekingofcity May 17, 2021
558cefe
format
thekingofcity May 17, 2021
9c1dc3c
metadataKeys null check
thekingofcity May 17, 2021
7463bef
get arity from correct and undeprecated way
thekingofcity May 18, 2021
979c95f
rename producedDataType to physicalDataType in tests
thekingofcity May 18, 2021
2b53f8d
remove redundant comment
thekingofcity May 18, 2021
a300efd
fix the wrong field for the copy function
thekingofcity May 18, 2021
cb17365
format
thekingofcity May 18, 2021
75fa7b9
use metadataKey.size() as virtualArity
thekingofcity May 18, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 44 additions & 16 deletions documentation/src/docs/table-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,26 +46,25 @@ Pravega Stream can be used as a table source/sink within a Flink table program.
The example below shows how to create a table connecting a Pravega stream as both source and sink:

```sql
create table pravega (
user_id STRING,
CREATE TABLE user_behavior (
user_id STRING,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
log_ts TIMESTAMP(3),
ts as log_ts + INTERVAL '1' SECOND,
watermark for ts as ts
)
with (
'connector' = 'pravega'
'controller-uri' = 'tcp://localhost:9090',
'scope' = 'scope',
'scan.execution.type' = 'streaming',
'scan.reader-group.name' = 'group1',
'scan.streams' = 'stream',
'sink.stream' = 'stream',
'sink.routing-key.field.name' = 'user_id',
'format' = 'json'
)
ts as log_ts + INTERVAL '1' SECOND,
watermark for ts as ts
)
WITH (
'connector' = 'pravega'
'controller-uri' = 'tcp://localhost:9090',
'scope' = 'scope',
'scan.execution.type' = 'streaming',
'scan.streams' = 'stream',
'sink.stream' = 'stream',
'sink.routing-key.field.name' = 'user_id',
'format' = 'json'
)
```

## Connector options
Expand Down Expand Up @@ -107,6 +106,35 @@ A `StreamCut` represents a consistent position in the stream, and can be fetched
`scan.start-streamcuts` and `scan.end-streamcuts` can be specified to perform bounded read and "start-at-some-point" read for Pravega streams.
Pravega source supports read from multiple streams, and if read from multiple streams, please make sure the order of the streamcuts keeps the same as the order of the streams.

### Read metadata from pravega

The connector could provide event metadata (e.g. event pointer) for each event.
This would facilitate the development of jobs that care about the stream position of the event data, e.g. for indexing purposes.

Metadata `event_pointer` is a sequence of bytes that could be read from the pravega via the connector.
To read it, simply add the `METADATA VIRTUAL` keyword to the end of the `event_pointer` field.

```sql
CREATE TABLE test (
key STRING,
event_pointer BYTES METADATA VIRTUAL
)
WITH (
'connector' = 'pravega'
'controller-uri' = 'tcp://localhost:9090',
'scope' = 'scope',
'scan.streams' = 'stream',
'format' = 'json'
)
```

After getting the bytes from the connector, it can be used to retrieve the original data from the pravega.

To get the data:
1. Convert the `byte[]` to `ByteBuffer`: `ByteBuffer#wrap`
2. Get the event pointer: `EventPointer#fromBytes`
3. Get the data: `EventStreamReader#fetchEvent`

### Changelog Source

If messages in Pravega stream is change event captured from other databases using CDC tools, then you can use a CDC format to interpret messages as INSERT/UPDATE/DELETE messages into Flink SQL system.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/**
* Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.pravega.connectors.flink.dynamic.table;

import io.pravega.client.stream.EventRead;
import io.pravega.client.stream.Serializer;
import io.pravega.connectors.flink.dynamic.table.FlinkPravegaDynamicTableSource.ReadableMetadata;
import io.pravega.connectors.flink.serialization.PravegaDeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;

import java.util.List;

public class FlinkPravegaDynamicDeserializationSchema extends PravegaDeserializationSchema<RowData> {
// metadata keys that the rowData have and is a subset of ReadableMetadata
private final List<String> metadataKeys;

// source datatype arity without metadata
private final int physicalArity;

public FlinkPravegaDynamicDeserializationSchema(
TypeInformation<RowData> typeInfo,
int physicalArity,
Serializer<RowData> serializer,
List<String> metadataKeys) {
super(typeInfo, serializer);
this.metadataKeys = metadataKeys;
this.physicalArity = physicalArity;
}

@Override
public RowData extractEvent(EventRead<RowData> eventRead) {
RowData rowData = eventRead.getEvent();
if (metadataKeys.size() == 0) {
return rowData;
}

// use GenericRowData to manipulate rowData's field
final GenericRowData producedRow = new GenericRowData(rowData.getRowKind(), physicalArity + metadataKeys.size());

// set the physical(original) field
final GenericRowData physicalRow = (GenericRowData) rowData;
int pos = 0;
for (; pos < physicalArity; pos++) {
producedRow.setField(pos, physicalRow.getField(pos));
}

// set the metadata field after the physical field, no effect if the key is not supported
for (; pos < physicalArity + metadataKeys.size(); pos++) {
String metadataKey = metadataKeys.get(pos - physicalArity);
if (ReadableMetadata.EVENT_POINTER.key.equals(metadataKey)) {
producedRow.setField(pos, eventRead.getEventPointer().toBytes().array());
}
}

return producedRow;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,41 @@
import io.pravega.connectors.flink.FlinkPravegaInputFormat;
import io.pravega.connectors.flink.FlinkPravegaReader;
import io.pravega.connectors.flink.PravegaConfig;
import io.pravega.connectors.flink.util.FlinkPravegaUtils.FlinkDeserializer;
import io.pravega.connectors.flink.util.StreamWithBoundaries;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.InputFormatProvider;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nullable;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Stream;

public class FlinkPravegaDynamicTableSource implements ScanTableSource {
public class FlinkPravegaDynamicTableSource implements ScanTableSource, SupportsReadingMetadata {

// Source produced data type
private final DataType producedDataType;
protected DataType producedDataType;

// Data type to configure the format
private final DataType physicalDataType;

// Metadata that is appended at the end of a physical source row
private List<String> metadataKeys;

// Scan format for decoding records from Pravega
private final DecodingFormat<DeserializationSchema<RowData>> decodingFormat;
Expand Down Expand Up @@ -72,7 +85,7 @@ public class FlinkPravegaDynamicTableSource implements ScanTableSource {

/**
* Creates a Pravega {@link DynamicTableSource}.
* @param producedDataType source produced data type
* @param physicalDataType source produced data type
* @param decodingFormat scan format for decoding records from Pravega
* @param readerGroupName the reader group name
* @param pravegaConfig Pravega connection configuration
Expand All @@ -85,7 +98,7 @@ public class FlinkPravegaDynamicTableSource implements ScanTableSource {
* @param isStreamingReader flag to determine streaming or batch read
* @param isBounded flag to determine if the source stream is bounded
*/
public FlinkPravegaDynamicTableSource(DataType producedDataType,
public FlinkPravegaDynamicTableSource(DataType physicalDataType,
DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
String readerGroupName,
PravegaConfig pravegaConfig,
Expand All @@ -97,10 +110,49 @@ public FlinkPravegaDynamicTableSource(DataType producedDataType,
String uid,
boolean isStreamingReader,
boolean isBounded) {
this(
physicalDataType,
// producedDataType should be the same as physicalDataType on initialization
// and will be updated on `applyReadableMetadata`
physicalDataType,
// metadataKeys will be empty on initialization and will be updated on `applyReadableMetadata`
Collections.emptyList(),
decodingFormat,
readerGroupName,
pravegaConfig,
streams,
readerGroupRefreshTimeMillis,
checkpointInitiateTimeoutMillis,
eventReadTimeoutMillis,
maxOutstandingCheckpointRequest,
uid,
isStreamingReader,
isBounded
);
}

FlinkPravegaDynamicTableSource(DataType physicalDataType,
DataType producedDataType,
List<String> metadataKeys,
DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
String readerGroupName,
PravegaConfig pravegaConfig,
List<StreamWithBoundaries> streams,
long readerGroupRefreshTimeMillis,
long checkpointInitiateTimeoutMillis,
long eventReadTimeoutMillis,
int maxOutstandingCheckpointRequest,
String uid,
boolean isStreamingReader,
boolean isBounded) {
this.physicalDataType = Preconditions.checkNotNull(
physicalDataType, "Physical data type must not be null.");
this.producedDataType = Preconditions.checkNotNull(
producedDataType, "Produced data type must not be null.");
this.decodingFormat = Preconditions.checkNotNull(
decodingFormat, "Decoding format must not be null.");
this.metadataKeys = Preconditions.checkNotNull(
metadataKeys, "Metadata Keys must not be null.");
this.readerGroupName = readerGroupName;
this.pravegaConfig = Preconditions.checkNotNull(
pravegaConfig, "Pravega config must not be null.");
Expand All @@ -122,10 +174,20 @@ public ChangelogMode getChangelogMode() {

@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
// create a PravegaDeserializationSchema that will expose metadata to the row
FlinkDeserializer<RowData> deserializer = new FlinkDeserializer<>(
decodingFormat.createRuntimeDecoder(runtimeProviderContext, producedDataType));
final FlinkPravegaDynamicDeserializationSchema flinkDeserializer
= new FlinkPravegaDynamicDeserializationSchema(
runtimeProviderContext.createTypeInformation(producedDataType),
producedDataType.getChildren().size() - metadataKeys.size(),
deserializer,
metadataKeys);

if (isStreamingReader) {
FlinkPravegaReader.Builder<RowData> readerBuilder = FlinkPravegaReader.<RowData>builder()
.withPravegaConfig(pravegaConfig)
.withDeserializationSchema(decodingFormat.createRuntimeDecoder(runtimeProviderContext, producedDataType))
.withDeserializationSchema(flinkDeserializer)
.withReaderGroupRefreshTime(Time.milliseconds(readerGroupRefreshTimeMillis))
.withCheckpointInitiateTimeout(Time.milliseconds(checkpointInitiateTimeoutMillis))
.withEventReadTimeout(Time.milliseconds(eventReadTimeoutMillis))
Expand All @@ -142,7 +204,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
} else {
FlinkPravegaInputFormat.Builder<RowData> inputFormatBuilder = FlinkPravegaInputFormat.<RowData>builder()
.withPravegaConfig(pravegaConfig)
.withDeserializationSchema(decodingFormat.createRuntimeDecoder(runtimeProviderContext, producedDataType));
.withDeserializationSchema(flinkDeserializer);

for (StreamWithBoundaries stream : streams) {
inputFormatBuilder.forStream(stream.getStream(), stream.getFrom(), stream.getTo());
Expand All @@ -155,7 +217,9 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
@Override
public DynamicTableSource copy() {
return new FlinkPravegaDynamicTableSource(
this.physicalDataType,
this.producedDataType,
this.metadataKeys,
this.decodingFormat,
this.readerGroupName,
this.pravegaConfig,
Expand Down Expand Up @@ -190,7 +254,9 @@ public boolean equals(Object o) {
isStreamingReader == that.isStreamingReader &&
isBounded == that.isBounded &&
producedDataType.equals(that.producedDataType) &&
physicalDataType.equals(that.physicalDataType) &&
decodingFormat.equals(that.decodingFormat) &&
metadataKeys.equals(that.metadataKeys) &&
Objects.equals(readerGroupName, that.readerGroupName) &&
pravegaConfig.equals(that.pravegaConfig) &&
streams.equals(that.streams) &&
Expand All @@ -201,7 +267,9 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(
producedDataType,
physicalDataType,
decodingFormat,
metadataKeys,
readerGroupName,
pravegaConfig,
streams,
Expand All @@ -213,4 +281,33 @@ public int hashCode() {
isStreamingReader,
isBounded);
}

@Override
public Map<String, DataType> listReadableMetadata() {
final Map<String, DataType> metadataMap = new LinkedHashMap<>();
Stream.of(ReadableMetadata.values()).forEachOrdered(m -> metadataMap.put(m.key, m.dataType));
return metadataMap;
}

@Override
public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
this.metadataKeys = metadataKeys;
this.producedDataType = producedDataType;
}

enum ReadableMetadata {
EVENT_POINTER(
"event_pointer",
DataTypes.BYTES().notNull()
);

final String key;

final DataType dataType;

ReadableMetadata(String key, DataType dataType) {
this.key = key;
this.dataType = dataType;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.util.Preconditions;

import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Objects;

Expand Down Expand Up @@ -121,7 +122,7 @@ public static <T> EventStreamReader<T> createPravegaReader(
*
* @param <T> The type of the event.
*/
public static final class FlinkDeserializer<T> implements Serializer<T> {
public static final class FlinkDeserializer<T> implements Serializer<T>, Serializable {

private final DeserializationSchema<T> deserializationSchema;

Expand Down
Loading