Skip to content

Commit

Permalink
[core] Support catalog options table
Browse files Browse the repository at this point in the history
  • Loading branch information
schnappi17 committed Aug 25, 2023
1 parent 7a61984 commit 9b12364
Show file tree
Hide file tree
Showing 6 changed files with 315 additions and 4 deletions.
16 changes: 16 additions & 0 deletions docs/content/how-to/system-tables.md
Original file line number Diff line number Diff line change
Expand Up @@ -280,3 +280,19 @@ SELECT * FROM sys.all_table_options;
*/
```

### Catalog Options Table
You can query the catalog's option information through catalog options table. The options not shown will be the default value. You can take reference to [Configuration]({{< ref "maintenance/configurations#coreoptions" >}}).

```sql
SELECT * FROM sys.catalog_options;

/*
+-----------+---------------------------+
| key | value |
+-----------+---------------------------+
| warehouse | hdfs:///path/to/warehouse |
+-----------+---------------------------+
1 rows in set
*/
```

Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.system.AllTableOptionsTable;
import org.apache.paimon.table.system.CatalogOptionsTable;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.utils.StringUtils;

Expand All @@ -50,17 +51,20 @@ public abstract class AbstractCatalog implements Catalog {
public static final String DB_SUFFIX = ".db";
protected static final String TABLE_DEFAULT_OPTION_PREFIX = "table-default.";
protected static final List<String> GLOBAL_TABLES =
Arrays.asList(AllTableOptionsTable.ALL_TABLE_OPTIONS);
Arrays.asList(
AllTableOptionsTable.ALL_TABLE_OPTIONS, CatalogOptionsTable.CATALOG_OPTIONS);

protected final FileIO fileIO;
protected final Map<String, String> tableDefaultOptions;
protected final Map<String, String> catalogOptions;

@Nullable protected final LineageMeta lineageMeta;

protected AbstractCatalog(FileIO fileIO) {
this.fileIO = fileIO;
this.lineageMeta = null;
this.tableDefaultOptions = new HashMap<>();
this.catalogOptions = new HashMap<>();
}

protected AbstractCatalog(FileIO fileIO, Map<String, String> options) {
Expand All @@ -69,6 +73,7 @@ protected AbstractCatalog(FileIO fileIO, Map<String, String> options) {
findAndCreateLineageMeta(
Options.fromMap(options), AbstractCatalog.class.getClassLoader());
this.tableDefaultOptions = new HashMap<>();
this.catalogOptions = options;

options.keySet().stream()
.filter(key -> key.startsWith(TABLE_DEFAULT_OPTION_PREFIX))
Expand All @@ -94,7 +99,9 @@ private LineageMeta findAndCreateLineageMeta(Options options, ClassLoader classL
public Table getTable(Identifier identifier) throws TableNotExistException {
if (isSystemDatabase(identifier.getDatabaseName())) {
String tableName = identifier.getObjectName();
Table table = SystemTableLoader.loadGlobal(tableName, fileIO, allTablePaths());
Table table =
SystemTableLoader.loadGlobal(
tableName, fileIO, allTablePaths(), catalogOptions);
if (table == null) {
throw new TableNotExistException(identifier);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.table.system;

import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.ReadonlyTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.ReadOnceTableScan;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.IteratorRecordReader;
import org.apache.paimon.utils.ProjectedRow;

import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import static org.apache.paimon.utils.SerializationUtils.newStringType;

/** This is a system {@link Table} to display catalog options. */
public class CatalogOptionsTable implements ReadonlyTable {

public static final String CATALOG_OPTIONS = "catalog_options";

private final Map<String, String> catalogOptions;

public static final RowType TABLE_TYPE =
new RowType(
Arrays.asList(
new DataField(0, "key", newStringType(false)),
new DataField(1, "value", newStringType(false))));

public CatalogOptionsTable(Map<String, String> catalogOptions) {
this.catalogOptions = catalogOptions;
}

/** A name to identify this table. */
@Override
public String name() {
return CATALOG_OPTIONS;
}

/** Returns the row type of this table. */
@Override
public RowType rowType() {
return TABLE_TYPE;
}

@Override
public InnerTableScan newScan() {
return new CatalogOptionsScan();
}

@Override
public InnerTableRead newRead() {
return new CatalogOptionsRead();
}

/** Primary keys of this table. */
@Override
public List<String> primaryKeys() {
return Collections.singletonList("key");
}

@Override
public Table copy(Map<String, String> dynamicOptions) {
return new CatalogOptionsTable(catalogOptions);
}

private class CatalogOptionsScan extends ReadOnceTableScan {

@Override
public InnerTableScan withFilter(Predicate predicate) {
return this;
}

@Override
public Plan innerPlan() {
return () ->
Collections.singletonList(
new CatalogOptionsTable.CatalogOptionsSplit(catalogOptions));
}
}

private static class CatalogOptionsSplit implements Split {

private static final long serialVersionUID = 1L;

private final Map<String, String> catalogOptions;

private CatalogOptionsSplit(Map<String, String> catalogOptions) {
this.catalogOptions = catalogOptions;
}

@Override
public long rowCount() {
return catalogOptions.size();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CatalogOptionsTable.CatalogOptionsSplit that =
(CatalogOptionsTable.CatalogOptionsSplit) o;
return catalogOptions.equals(that.catalogOptions);
}

@Override
public int hashCode() {
return catalogOptions.hashCode();
}
}

private static class CatalogOptionsRead implements InnerTableRead {

private int[][] projection;

@Override
public InnerTableRead withFilter(Predicate predicate) {
return this;
}

@Override
public InnerTableRead withProjection(int[][] projection) {
this.projection = projection;
return this;
}

@Override
public TableRead withIOManager(IOManager ioManager) {
return this;
}

@Override
public RecordReader<InternalRow> createReader(Split split) throws IOException {
if (!(split instanceof CatalogOptionsTable.CatalogOptionsSplit)) {
throw new IllegalArgumentException("Unsupported split: " + split.getClass());
}
Iterator<InternalRow> rows =
Iterators.transform(
((CatalogOptionsSplit) split).catalogOptions.entrySet().iterator(),
this::toRow);
if (projection != null) {
rows =
Iterators.transform(
rows, row -> ProjectedRow.from(projection).replaceRow(row));
}
return new IteratorRecordReader<>(rows);
}

private InternalRow toRow(Map.Entry<String, String> option) {
return GenericRow.of(
BinaryString.fromString(option.getKey()),
BinaryString.fromString(option.getValue()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import static org.apache.paimon.table.system.AllTableOptionsTable.ALL_TABLE_OPTIONS;
import static org.apache.paimon.table.system.AuditLogTable.AUDIT_LOG;
import static org.apache.paimon.table.system.CatalogOptionsTable.CATALOG_OPTIONS;
import static org.apache.paimon.table.system.ConsumersTable.CONSUMERS;
import static org.apache.paimon.table.system.FilesTable.FILES;
import static org.apache.paimon.table.system.ManifestsTable.MANIFESTS;
Expand Down Expand Up @@ -70,10 +71,15 @@ public static Table load(String type, FileIO fileIO, FileStoreTable dataTable) {

@Nullable
public static Table loadGlobal(
String tableName, FileIO fileIO, Map<String, Map<String, Path>> allTablePaths) {
String tableName,
FileIO fileIO,
Map<String, Map<String, Path>> allTablePaths,
Map<String, String> catalogOptions) {
switch (tableName.toLowerCase()) {
case ALL_TABLE_OPTIONS:
return new AllTableOptionsTable(fileIO, allTablePaths);
case CATALOG_OPTIONS:
return new CatalogOptionsTable(catalogOptions);
default:
return null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.table.system;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.table.TableType;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link CatalogOptionsTable}. */
public class CatalogOptionsTableTest extends TableTestBase {

private Catalog catalog;

private CatalogOptionsTable catalogOptionsTable;
private Options catalogOptions;
@TempDir java.nio.file.Path tempDir;

@BeforeEach
public void before() throws Exception {
catalogOptions = new Options();
catalogOptions.set(CatalogOptions.TABLE_TYPE, TableType.MANAGED);
catalogOptions.set("table-default.scan.infer-parallelism", "false");
catalogOptions.set(CatalogOptions.WAREHOUSE, tempDir.toUri().toString());
catalog = CatalogFactory.createCatalog(CatalogContext.create(catalogOptions));
catalogOptionsTable =
(CatalogOptionsTable) catalog.getTable(new Identifier("sys", "catalog_options"));
}

@Test
public void testCatalogOptionsTable() throws Exception {
List<InternalRow> expectRow = getExceptedResult();
List<InternalRow> result = read(catalogOptionsTable);
assertThat(result).containsExactlyElementsOf(expectRow);
}

private List<InternalRow> getExceptedResult() {
List<InternalRow> expectedRow = new ArrayList<>();
for (Map.Entry<String, String> option : catalogOptions.toMap().entrySet()) {
expectedRow.add(
GenericRow.of(
BinaryString.fromString(option.getKey()),
BinaryString.fromString(option.getValue())));
}
return expectedRow;
}
}
Loading

0 comments on commit 9b12364

Please sign in to comment.