Skip to content

Commit

Permalink
[core] Introduce binlog system table to pack the UB and UA (apache#4520)
Browse files Browse the repository at this point in the history
  • Loading branch information
Aitozi authored Nov 14, 2024
1 parent 7e1fe39 commit 9e4b28a
Show file tree
Hide file tree
Showing 7 changed files with 381 additions and 6 deletions.
19 changes: 19 additions & 0 deletions docs/content/maintenance/system-tables.md
Original file line number Diff line number Diff line change
Expand Up @@ -406,4 +406,23 @@ SELECT * FROM T$statistics;
1 rows in set
*/
```
### Binlog Table

You can streaming or batch query the binlog through binlog table. In this system table,
the update before and update after will be packed in one row.

```
/*
+------------------+----------------------+-----------------------+
| rowkind | column_0 | column_1 |
+------------------+----------------------+-----------------------+
| +I | [col_0] | [col_1] |
+------------------+----------------------+-----------------------+
| +U | [col_0_ub, col_0_ua] | [col_1_ub, col_1_ua] |
+------------------+----------------------+-----------------------+
| -D | [col_0] | [col_1] |
+------------------+----------------------+-----------------------+
*/
```


Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.reader;

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.function.BiFunction;

/** The reader which will pack the update before and update after message together. */
public class PackChangelogReader implements RecordReader<InternalRow> {

private final RecordReader<InternalRow> reader;
private final BiFunction<InternalRow, InternalRow, InternalRow> function;
private final InternalRowSerializer serializer;
private boolean initialized = false;

public PackChangelogReader(
RecordReader<InternalRow> reader,
BiFunction<InternalRow, InternalRow, InternalRow> function,
RowType rowType) {
this.reader = reader;
this.function = function;
this.serializer = new InternalRowSerializer(rowType);
}

@Nullable
@Override
public RecordIterator<InternalRow> readBatch() throws IOException {
if (!initialized) {
initialized = true;
return new InternRecordIterator(reader, function, serializer);
}
return null;
}

@Override
public void close() throws IOException {
reader.close();
}

private static class InternRecordIterator implements RecordIterator<InternalRow> {

private RecordIterator<InternalRow> currentBatch;

private final BiFunction<InternalRow, InternalRow, InternalRow> function;
private final RecordReader<InternalRow> reader;
private final InternalRowSerializer serializer;
private boolean endOfData;

public InternRecordIterator(
RecordReader<InternalRow> reader,
BiFunction<InternalRow, InternalRow, InternalRow> function,
InternalRowSerializer serializer) {
this.reader = reader;
this.function = function;
this.serializer = serializer;
this.endOfData = false;
}

@Nullable
@Override
public InternalRow next() throws IOException {
InternalRow row1 = nextRow();
if (row1 == null) {
return null;
}
InternalRow row2 = null;
if (row1.getRowKind() == RowKind.UPDATE_BEFORE) {
row1 = serializer.copy(row1);
row2 = nextRow();
}
return function.apply(row1, row2);
}

@Nullable
private InternalRow nextRow() throws IOException {
InternalRow row = null;
while (!endOfData && row == null) {
RecordIterator<InternalRow> batch = nextBatch();
if (batch == null) {
endOfData = true;
return null;
}

row = batch.next();
if (row == null) {
releaseBatch();
}
}
return row;
}

@Nullable
private RecordIterator<InternalRow> nextBatch() throws IOException {
if (currentBatch == null) {
currentBatch = reader.readBatch();
}
return currentBatch;
}

@Override
public void releaseBatch() {
if (currentBatch != null) {
currentBatch.releaseBatch();
currentBatch = null;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -526,13 +526,13 @@ public DataTableScan withShard(int indexOfThisSubtask, int numberOfParallelSubta
}
}

private class AuditLogRead implements InnerTableRead {
class AuditLogRead implements InnerTableRead {

private final InnerTableRead dataRead;
protected final InnerTableRead dataRead;

private int[] readProjection;
protected int[] readProjection;

private AuditLogRead(InnerTableRead dataRead) {
protected AuditLogRead(InnerTableRead dataRead) {
this.dataRead = dataRead.forceKeepDelete();
this.readProjection = defaultProjection();
}
Expand Down Expand Up @@ -600,9 +600,9 @@ private InternalRow convertRow(InternalRow data) {
}

/** A {@link ProjectedRow} which returns row kind when mapping index is negative. */
private static class AuditLogRow extends ProjectedRow {
static class AuditLogRow extends ProjectedRow {

private AuditLogRow(int[] indexMapping, InternalRow row) {
AuditLogRow(int[] indexMapping, InternalRow row) {
super(indexMapping);
replaceRow(row);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.table.system;

import org.apache.paimon.data.GenericArray;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.reader.PackChangelogReader;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;

/**
* A {@link Table} for reading binlog of table. The binlog format is as below.
*
* <p>INSERT: [+I, [co1, null], [col2, null]]
*
* <p>UPDATE: [+U, [co1_ub, col1_ua], [col2_ub, col2_ua]]
*
* <p>DELETE: [-D, [co1, null], [col2, null]]
*/
public class BinlogTable extends AuditLogTable {

public static final String BINLOG = "binlog";

private final FileStoreTable wrapped;

public BinlogTable(FileStoreTable wrapped) {
super(wrapped);
this.wrapped = wrapped;
}

@Override
public String name() {
return wrapped.name() + SYSTEM_TABLE_SPLITTER + BINLOG;
}

@Override
public RowType rowType() {
List<DataField> fields = new ArrayList<>();
fields.add(SpecialFields.ROW_KIND);
for (DataField field : wrapped.rowType().getFields()) {
DataField newField =
new DataField(
field.id(),
field.name(),
new ArrayType(field.type().nullable()), // convert to nullable
field.description());
fields.add(newField);
}
return new RowType(fields);
}

@Override
public InnerTableRead newRead() {
return new BinlogRead(wrapped.newRead());
}

@Override
public Table copy(Map<String, String> dynamicOptions) {
return new BinlogTable(wrapped.copy(dynamicOptions));
}

private class BinlogRead extends AuditLogRead {

private BinlogRead(InnerTableRead dataRead) {
super(dataRead);
}

@Override
public RecordReader<InternalRow> createReader(Split split) throws IOException {
DataSplit dataSplit = (DataSplit) split;
if (dataSplit.isStreaming()) {
return new PackChangelogReader(
dataRead.createReader(split),
(row1, row2) ->
new AuditLogRow(
readProjection,
convertToArray(
row1, row2, wrapped.rowType().fieldGetters())),
wrapped.rowType());
} else {
return dataRead.createReader(split)
.transform(
(row) ->
new AuditLogRow(
readProjection,
convertToArray(
row,
null,
wrapped.rowType().fieldGetters())));
}
}

private InternalRow convertToArray(
InternalRow row1,
@Nullable InternalRow row2,
InternalRow.FieldGetter[] fieldGetters) {
GenericRow row = new GenericRow(row1.getFieldCount());
for (int i = 0; i < row1.getFieldCount(); i++) {
Object o1 = fieldGetters[i].getFieldOrNull(row1);
Object o2;
if (row2 != null) {
o2 = fieldGetters[i].getFieldOrNull(row2);
row.setField(i, new GenericArray(new Object[] {o1, o2}));
} else {
row.setField(i, new GenericArray(new Object[] {o1}));
}
}
// If no row2 provided, then follow the row1 kind.
if (row2 == null) {
row.setRowKind(row1.getRowKind());
} else {
row.setRowKind(row2.getRowKind());
}
return row;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import static org.apache.paimon.table.system.AggregationFieldsTable.AGGREGATION_FIELDS;
import static org.apache.paimon.table.system.AllTableOptionsTable.ALL_TABLE_OPTIONS;
import static org.apache.paimon.table.system.AuditLogTable.AUDIT_LOG;
import static org.apache.paimon.table.system.BinlogTable.BINLOG;
import static org.apache.paimon.table.system.BranchesTable.BRANCHES;
import static org.apache.paimon.table.system.BucketsTable.BUCKETS;
import static org.apache.paimon.table.system.CatalogOptionsTable.CATALOG_OPTIONS;
Expand Down Expand Up @@ -77,6 +78,7 @@ public class SystemTableLoader {
.put(READ_OPTIMIZED, ReadOptimizedTable::new)
.put(AGGREGATION_FIELDS, AggregationFieldsTable::new)
.put(STATISTICS, StatisticTable::new)
.put(BINLOG, BinlogTable::new)
.build();

public static final List<String> SYSTEM_TABLES = new ArrayList<>(SYSTEM_TABLE_LOADERS.keySet());
Expand Down
6 changes: 6 additions & 0 deletions paimon-flink/paimon-flink-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,12 @@ under the License.
<artifactId>iceberg-data</artifactId>
<version>${iceberg.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>parquet-avro</artifactId>
<groupId>org.apache.parquet</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

Expand Down
Loading

0 comments on commit 9e4b28a

Please sign in to comment.