diff --git a/docs/content/maintenance/system-tables.md b/docs/content/maintenance/system-tables.md index 462f8c27f887..0246d6faf8dc 100644 --- a/docs/content/maintenance/system-tables.md +++ b/docs/content/maintenance/system-tables.md @@ -406,4 +406,23 @@ SELECT * FROM T$statistics; 1 rows in set */ ``` +### Binlog Table + +You can streaming or batch query the binlog through binlog table. In this system table, +the update before and update after will be packed in one row. + +``` +/* ++------------------+----------------------+-----------------------+ +| rowkind | column_0 | column_1 | ++------------------+----------------------+-----------------------+ +| +I | [col_0] | [col_1] | ++------------------+----------------------+-----------------------+ +| +U | [col_0_ub, col_0_ua] | [col_1_ub, col_1_ua] | ++------------------+----------------------+-----------------------+ +| -D | [col_0] | [col_1] | ++------------------+----------------------+-----------------------+ +*/ +``` + diff --git a/paimon-common/src/main/java/org/apache/paimon/reader/PackChangelogReader.java b/paimon-common/src/main/java/org/apache/paimon/reader/PackChangelogReader.java new file mode 100644 index 000000000000..a60780ff5e06 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/reader/PackChangelogReader.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.reader; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.types.RowKind; +import org.apache.paimon.types.RowType; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.function.BiFunction; + +/** The reader which will pack the update before and update after message together. */ +public class PackChangelogReader implements RecordReader { + + private final RecordReader reader; + private final BiFunction function; + private final InternalRowSerializer serializer; + private boolean initialized = false; + + public PackChangelogReader( + RecordReader reader, + BiFunction function, + RowType rowType) { + this.reader = reader; + this.function = function; + this.serializer = new InternalRowSerializer(rowType); + } + + @Nullable + @Override + public RecordIterator readBatch() throws IOException { + if (!initialized) { + initialized = true; + return new InternRecordIterator(reader, function, serializer); + } + return null; + } + + @Override + public void close() throws IOException { + reader.close(); + } + + private static class InternRecordIterator implements RecordIterator { + + private RecordIterator currentBatch; + + private final BiFunction function; + private final RecordReader reader; + private final InternalRowSerializer serializer; + private boolean endOfData; + + public InternRecordIterator( + RecordReader reader, + BiFunction function, + InternalRowSerializer serializer) { + this.reader = reader; + this.function = function; + this.serializer = serializer; + this.endOfData = false; + } + + @Nullable + @Override + public InternalRow next() throws IOException { + InternalRow row1 = nextRow(); + if (row1 == null) { + return null; + } + InternalRow row2 = null; + if (row1.getRowKind() == RowKind.UPDATE_BEFORE) { + row1 = serializer.copy(row1); + row2 = nextRow(); + } + return function.apply(row1, row2); + } + + @Nullable + private InternalRow nextRow() throws IOException { + InternalRow row = null; + while (!endOfData && row == null) { + RecordIterator batch = nextBatch(); + if (batch == null) { + endOfData = true; + return null; + } + + row = batch.next(); + if (row == null) { + releaseBatch(); + } + } + return row; + } + + @Nullable + private RecordIterator nextBatch() throws IOException { + if (currentBatch == null) { + currentBatch = reader.readBatch(); + } + return currentBatch; + } + + @Override + public void releaseBatch() { + if (currentBatch != null) { + currentBatch.releaseBatch(); + currentBatch = null; + } + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java index e0acd9fb38ea..7438f9393d60 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java @@ -526,13 +526,13 @@ public DataTableScan withShard(int indexOfThisSubtask, int numberOfParallelSubta } } - private class AuditLogRead implements InnerTableRead { + class AuditLogRead implements InnerTableRead { - private final InnerTableRead dataRead; + protected final InnerTableRead dataRead; - private int[] readProjection; + protected int[] readProjection; - private AuditLogRead(InnerTableRead dataRead) { + protected AuditLogRead(InnerTableRead dataRead) { this.dataRead = dataRead.forceKeepDelete(); this.readProjection = defaultProjection(); } @@ -600,9 +600,9 @@ private InternalRow convertRow(InternalRow data) { } /** A {@link ProjectedRow} which returns row kind when mapping index is negative. */ - private static class AuditLogRow extends ProjectedRow { + static class AuditLogRow extends ProjectedRow { - private AuditLogRow(int[] indexMapping, InternalRow row) { + AuditLogRow(int[] indexMapping, InternalRow row) { super(indexMapping); replaceRow(row); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java new file mode 100644 index 000000000000..96f9f6ed6185 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.system; + +import org.apache.paimon.data.GenericArray; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.reader.PackChangelogReader; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.SpecialFields; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.InnerTableRead; +import org.apache.paimon.table.source.Split; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.RowType; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER; + +/** + * A {@link Table} for reading binlog of table. The binlog format is as below. + * + *

INSERT: [+I, [co1, null], [col2, null]] + * + *

UPDATE: [+U, [co1_ub, col1_ua], [col2_ub, col2_ua]] + * + *

DELETE: [-D, [co1, null], [col2, null]] + */ +public class BinlogTable extends AuditLogTable { + + public static final String BINLOG = "binlog"; + + private final FileStoreTable wrapped; + + public BinlogTable(FileStoreTable wrapped) { + super(wrapped); + this.wrapped = wrapped; + } + + @Override + public String name() { + return wrapped.name() + SYSTEM_TABLE_SPLITTER + BINLOG; + } + + @Override + public RowType rowType() { + List fields = new ArrayList<>(); + fields.add(SpecialFields.ROW_KIND); + for (DataField field : wrapped.rowType().getFields()) { + DataField newField = + new DataField( + field.id(), + field.name(), + new ArrayType(field.type().nullable()), // convert to nullable + field.description()); + fields.add(newField); + } + return new RowType(fields); + } + + @Override + public InnerTableRead newRead() { + return new BinlogRead(wrapped.newRead()); + } + + @Override + public Table copy(Map dynamicOptions) { + return new BinlogTable(wrapped.copy(dynamicOptions)); + } + + private class BinlogRead extends AuditLogRead { + + private BinlogRead(InnerTableRead dataRead) { + super(dataRead); + } + + @Override + public RecordReader createReader(Split split) throws IOException { + DataSplit dataSplit = (DataSplit) split; + if (dataSplit.isStreaming()) { + return new PackChangelogReader( + dataRead.createReader(split), + (row1, row2) -> + new AuditLogRow( + readProjection, + convertToArray( + row1, row2, wrapped.rowType().fieldGetters())), + wrapped.rowType()); + } else { + return dataRead.createReader(split) + .transform( + (row) -> + new AuditLogRow( + readProjection, + convertToArray( + row, + null, + wrapped.rowType().fieldGetters()))); + } + } + + private InternalRow convertToArray( + InternalRow row1, + @Nullable InternalRow row2, + InternalRow.FieldGetter[] fieldGetters) { + GenericRow row = new GenericRow(row1.getFieldCount()); + for (int i = 0; i < row1.getFieldCount(); i++) { + Object o1 = fieldGetters[i].getFieldOrNull(row1); + Object o2; + if (row2 != null) { + o2 = fieldGetters[i].getFieldOrNull(row2); + row.setField(i, new GenericArray(new Object[] {o1, o2})); + } else { + row.setField(i, new GenericArray(new Object[] {o1})); + } + } + // If no row2 provided, then follow the row1 kind. + if (row2 == null) { + row.setRowKind(row1.getRowKind()); + } else { + row.setRowKind(row2.getRowKind()); + } + return row; + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java index a84f41ec1a51..3d5b211316ec 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java @@ -41,6 +41,7 @@ import static org.apache.paimon.table.system.AggregationFieldsTable.AGGREGATION_FIELDS; import static org.apache.paimon.table.system.AllTableOptionsTable.ALL_TABLE_OPTIONS; import static org.apache.paimon.table.system.AuditLogTable.AUDIT_LOG; +import static org.apache.paimon.table.system.BinlogTable.BINLOG; import static org.apache.paimon.table.system.BranchesTable.BRANCHES; import static org.apache.paimon.table.system.BucketsTable.BUCKETS; import static org.apache.paimon.table.system.CatalogOptionsTable.CATALOG_OPTIONS; @@ -77,6 +78,7 @@ public class SystemTableLoader { .put(READ_OPTIMIZED, ReadOptimizedTable::new) .put(AGGREGATION_FIELDS, AggregationFieldsTable::new) .put(STATISTICS, StatisticTable::new) + .put(BINLOG, BinlogTable::new) .build(); public static final List SYSTEM_TABLES = new ArrayList<>(SYSTEM_TABLE_LOADERS.keySet()); diff --git a/paimon-flink/paimon-flink-common/pom.xml b/paimon-flink/paimon-flink-common/pom.xml index 4452af266e5e..91222983bf6b 100644 --- a/paimon-flink/paimon-flink-common/pom.xml +++ b/paimon-flink/paimon-flink-common/pom.xml @@ -162,6 +162,12 @@ under the License. iceberg-data ${iceberg.version} test + + + parquet-avro + org.apache.parquet + + diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SystemTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SystemTableITCase.java new file mode 100644 index 000000000000..771f4acc5e58 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SystemTableITCase.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink; + +import org.apache.paimon.utils.BlockingIterator; + +import org.apache.flink.types.Row; +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** ITCase for system table. */ +public class SystemTableITCase extends CatalogTableITCase { + + @Test + public void testBinlogTableStreamRead() throws Exception { + sql( + "CREATE TABLE T (a INT, b INT, primary key (a) NOT ENFORCED) with ('changelog-producer' = 'lookup', " + + "'bucket' = '2')"); + BlockingIterator iterator = + streamSqlBlockIter("SELECT * FROM T$binlog /*+ OPTIONS('scan.mode' = 'latest') */"); + sql("INSERT INTO T VALUES (1, 2)"); + sql("INSERT INTO T VALUES (1, 3)"); + sql("INSERT INTO T VALUES (2, 2)"); + List rows = iterator.collect(3); + assertThat(rows) + .containsExactly( + Row.of("+I", new Integer[] {1}, new Integer[] {2}), + Row.of("+U", new Integer[] {1, 1}, new Integer[] {2, 3}), + Row.of("+I", new Integer[] {2}, new Integer[] {2})); + iterator.close(); + } + + @Test + public void testBinlogTableBatchRead() throws Exception { + sql( + "CREATE TABLE T (a INT, b INT, primary key (a) NOT ENFORCED) with ('changelog-producer' = 'lookup', " + + "'bucket' = '2')"); + sql("INSERT INTO T VALUES (1, 2)"); + sql("INSERT INTO T VALUES (1, 3)"); + sql("INSERT INTO T VALUES (2, 2)"); + List rows = sql("SELECT * FROM T$binlog /*+ OPTIONS('scan.mode' = 'latest') */"); + assertThat(rows) + .containsExactly( + Row.of("+I", new Integer[] {1}, new Integer[] {3}), + Row.of("+I", new Integer[] {2}, new Integer[] {2})); + } +}