Skip to content

Commit

Permalink
Add extra_properties to iceberg table properties
Browse files Browse the repository at this point in the history
Some policies require to add extra properties to Iceberg tables
(e.g. for tracking table origin).

Co-authored-by: Priyansh121096 <[email protected]>
  • Loading branch information
2 people authored and sopel39 committed Oct 30, 2024
1 parent cef4f04 commit 4152f38
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 2 deletions.
4 changes: 4 additions & 0 deletions docs/src/main/sphinx/connector/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,10 @@ connector using a {doc}`WITH </sql/create-table-as>` clause.
- Comma-separated list of columns to use for Parquet bloom filter. It improves
the performance of queries using Equality and IN predicates when reading
Parquet files. Requires Parquet format. Defaults to `[]`.
* - `extra_properties`
- Additional properties added to a Iceberg table. The properties are not used by Trino,
and are available in the `$properties` metadata table.
The properties are not included in the output of `SHOW CREATE TABLE` statements.
:::

The table definition below specifies to use Parquet files, partitioning by columns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,21 @@
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import io.trino.plugin.hive.orc.OrcWriterConfig;
import io.trino.spi.TrinoException;
import io.trino.spi.session.PropertyMetadata;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.MapType;
import io.trino.spi.type.TypeManager;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.iceberg.IcebergConfig.FORMAT_VERSION_SUPPORT_MAX;
import static io.trino.plugin.iceberg.IcebergConfig.FORMAT_VERSION_SUPPORT_MIN;
Expand All @@ -35,6 +40,11 @@
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
import static org.apache.iceberg.TableProperties.ORC_BLOOM_FILTER_COLUMNS;
import static org.apache.iceberg.TableProperties.ORC_BLOOM_FILTER_FPP;
import static org.apache.iceberg.TableProperties.RESERVED_PROPERTIES;

public class IcebergTableProperties
{
Expand All @@ -46,13 +56,36 @@ public class IcebergTableProperties
public static final String ORC_BLOOM_FILTER_COLUMNS_PROPERTY = "orc_bloom_filter_columns";
public static final String ORC_BLOOM_FILTER_FPP_PROPERTY = "orc_bloom_filter_fpp";
public static final String PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY = "parquet_bloom_filter_columns";
public static final String EXTRA_PROPERTIES_PROPERTY = "extra_properties";

public static final Set<String> SUPPORTED_PROPERTIES = ImmutableSet.<String>builder()
.add(FILE_FORMAT_PROPERTY)
.add(PARTITIONING_PROPERTY)
.add(SORTED_BY_PROPERTY)
.add(LOCATION_PROPERTY)
.add(FORMAT_VERSION_PROPERTY)
.add(ORC_BLOOM_FILTER_COLUMNS_PROPERTY)
.add(ORC_BLOOM_FILTER_FPP_PROPERTY)
.add(EXTRA_PROPERTIES_PROPERTY)
.add(PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY)
.build();

// These properties are used by Trino or Iceberg internally and cannot be set directly by users through extra_properties
public static final Set<String> PROTECTED_ICEBERG_NATIVE_PROPERTIES = ImmutableSet.<String>builder()
.addAll(RESERVED_PROPERTIES)
.add(ORC_BLOOM_FILTER_COLUMNS)
.add(ORC_BLOOM_FILTER_FPP)
.add(DEFAULT_FILE_FORMAT)
.add(FORMAT_VERSION)
.build();

private final List<PropertyMetadata<?>> tableProperties;

@Inject
public IcebergTableProperties(
IcebergConfig icebergConfig,
OrcWriterConfig orcWriterConfig)
OrcWriterConfig orcWriterConfig,
TypeManager typeManager)
{
tableProperties = ImmutableList.<PropertyMetadata<?>>builder()
.add(enumProperty(
Expand Down Expand Up @@ -120,7 +153,31 @@ public IcebergTableProperties(
.map(name -> name.toLowerCase(ENGLISH))
.collect(toImmutableList()),
value -> value))
.add(new PropertyMetadata<>(
EXTRA_PROPERTIES_PROPERTY,
"Extra table properties",
new MapType(VARCHAR, VARCHAR, typeManager.getTypeOperators()),
Map.class,
null,
true, // currently not shown in SHOW CREATE TABLE
value -> {
Map<String, String> extraProperties = (Map<String, String>) value;
if (extraProperties.containsValue(null)) {
throw new TrinoException(INVALID_TABLE_PROPERTY, format("Extra table property value cannot be null '%s'", extraProperties));
}
if (extraProperties.containsKey(null)) {
throw new TrinoException(INVALID_TABLE_PROPERTY, format("Extra table property key cannot be null '%s'", extraProperties));
}

return extraProperties;
},
value -> value))
.build();

checkState(SUPPORTED_PROPERTIES.containsAll(tableProperties.stream()
.map(PropertyMetadata::getName)
.collect(toImmutableList())),
"%s does not contain all supported properties", SUPPORTED_PROPERTIES);
}

public List<PropertyMetadata<?>> getTableProperties()
Expand Down Expand Up @@ -188,4 +245,9 @@ public static List<String> getParquetBloomFilterColumns(Map<String, Object> tabl
List<String> parquetBloomFilterColumns = (List<String>) tableProperties.get(PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY);
return parquetBloomFilterColumns == null ? ImmutableList.of() : ImmutableList.copyOf(parquetBloomFilterColumns);
}

public static Optional<Map<String, String>> getExtraProperties(Map<String, Object> tableProperties)
{
return Optional.ofNullable((Map<String, String>) tableProperties.get(EXTRA_PROPERTIES_PROPERTY));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@
import static io.trino.plugin.iceberg.IcebergTableProperties.ORC_BLOOM_FILTER_FPP_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.PARTITIONING_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.PROTECTED_ICEBERG_NATIVE_PROPERTIES;
import static io.trino.plugin.iceberg.IcebergTableProperties.SORTED_BY_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.SUPPORTED_PROPERTIES;
import static io.trino.plugin.iceberg.IcebergTableProperties.getPartitioning;
import static io.trino.plugin.iceberg.IcebergTableProperties.getSortOrder;
import static io.trino.plugin.iceberg.PartitionFields.parsePartitionFields;
Expand Down Expand Up @@ -832,7 +834,29 @@ public static Map<String, String> createTableProperties(ConnectorTableMetadata t
if (tableMetadata.getComment().isPresent()) {
propertiesBuilder.put(TABLE_COMMENT, tableMetadata.getComment().get());
}
return propertiesBuilder.buildOrThrow();

Map<String, String> baseProperties = propertiesBuilder.buildOrThrow();
Map<String, String> extraProperties = IcebergTableProperties.getExtraProperties(tableMetadata.getProperties()).orElseGet(ImmutableMap::of);

Set<String> illegalExtraProperties = Sets.intersection(
ImmutableSet.<String>builder()
.add(TABLE_COMMENT)
.addAll(baseProperties.keySet())
.addAll(SUPPORTED_PROPERTIES)
.addAll(PROTECTED_ICEBERG_NATIVE_PROPERTIES)
.build(),
extraProperties.keySet());

if (!illegalExtraProperties.isEmpty()) {
throw new TrinoException(
INVALID_TABLE_PROPERTY,
format("Illegal keys in extra_properties: %s", illegalExtraProperties));
}

return ImmutableMap.<String, String>builder()
.putAll(baseProperties)
.putAll(extraProperties)
.buildOrThrow();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8421,6 +8421,90 @@ public void testSystemTables()
assertQueryFails("TABLE \"nation$foo\"", "\\Qline 1:1: Table '%s.%s.\"nation$foo\"' does not exist".formatted(catalog, schema));
}

@Test
public void testExtraProperties()
{
String tableName = "test_create_table_with_multiple_extra_properties_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " (c1 integer) WITH (extra_properties = MAP(ARRAY['extra.property.one', 'extra.property.two'], ARRAY['one', 'two']))");

assertThat(query("SELECT key, value FROM \"" + tableName + "$properties\" WHERE key IN ('extra.property.one', 'extra.property.two')"))
.skippingTypesCheck()
.matches("VALUES ('extra.property.one', 'one'), ('extra.property.two', 'two')");

assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testReplaceTableExtraProperties()
{
String tableName = "test_replace_table_with_multiple_extra_properties_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " (c1 integer) WITH (extra_properties = MAP(ARRAY['extra.property.one', 'extra.property.two'], ARRAY['one', 'two']))");
assertUpdate("CREATE OR REPLACE TABLE " + tableName + " (c1 integer) WITH (extra_properties = MAP(ARRAY['extra.property.three'], ARRAY['three']))");

assertThat(query("SELECT key, value FROM \"" + tableName + "$properties\" WHERE key IN ('extra.property.one', 'extra.property.two', 'extra.property.three')"))
.skippingTypesCheck()
.matches("VALUES ('extra.property.three', 'three')");

assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testCreateTableAsSelectWithExtraProperties()
{
String tableName = "test_ctas_with_extra_properties_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " WITH (extra_properties = MAP(ARRAY['extra.property.one', 'extra.property.two'], ARRAY['one', 'two'])) " +
"AS SELECT 1 as c1", 1);

assertThat(query("SELECT key, value FROM \"" + tableName + "$properties\" WHERE key IN ('extra.property.one', 'extra.property.two')"))
.skippingTypesCheck()
.matches("VALUES ('extra.property.one', 'one'), ('extra.property.two', 'two')");

assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testShowCreateNotContainExtraProperties()
{
String tableName = "test_show_create_table_with_extra_properties_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " (c1 integer) WITH (extra_properties = MAP(ARRAY['extra.property.one', 'extra.property.two'], ARRAY['one', 'two']))");

assertThat((String) computeScalar("SHOW CREATE TABLE " + tableName)).doesNotContain("extra_properties", "extra.property.one", "extra.property.two");

assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testNullExtraProperty()
{
assertQueryFails(
"CREATE TABLE test_create_table_with_null_extra_properties (c1 integer) WITH (extra_properties = MAP(ARRAY['null.property'], ARRAY[null]))",
"\\Qline 1:83: Unable to set catalog 'iceberg' table property 'extra_properties' to [MAP(ARRAY['null.property'], ARRAY[null])]: Extra table property value cannot be null '{null.property=null}'\\E");

assertQueryFails(
"CREATE TABLE test_create_table_with_as_null_extra_properties WITH (extra_properties = MAP(ARRAY['null.property'], ARRAY[null])) AS SELECT 1 as c1",
"\\Qline 1:73: Unable to set catalog 'iceberg' table property 'extra_properties' to [MAP(ARRAY['null.property'], ARRAY[null])]: Extra table property value cannot be null '{null.property=null}'\\E");
}

@Test
public void testIllegalExtraPropertyKey()
{
assertQueryFails(
"CREATE TABLE test_create_table_with_illegal_extra_properties (c1 integer) WITH (extra_properties = MAP(ARRAY['sorted_by'], ARRAY['id']))",
"\\QIllegal keys in extra_properties: [sorted_by]");

assertQueryFails(
"CREATE TABLE test_create_table_as_with_illegal_extra_properties WITH (extra_properties = MAP(ARRAY['extra_properties'], ARRAY['some_value'])) AS SELECT 1 as c1",
"\\QIllegal keys in extra_properties: [extra_properties]");

assertQueryFails(
"CREATE TABLE test_create_table_with_as_illegal_extra_properties WITH (extra_properties = MAP(ARRAY['write.format.default'], ARRAY['ORC'])) AS SELECT 1 as c1",
"\\QIllegal keys in extra_properties: [write.format.default]");

assertQueryFails(
"CREATE TABLE test_create_table_with_as_illegal_extra_properties WITH (extra_properties = MAP(ARRAY['comment'], ARRAY['some comment'])) AS SELECT 1 as c1",
"\\QIllegal keys in extra_properties: [comment]");
}

@Override
protected Optional<SetColumnTypeSetup> filterSetColumnTypesDataProvider(SetColumnTypeSetup setup)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,20 @@ public void testTrinoReadingSparkIcebergTablePropertiesData(StorageFormat storag
onSpark().executeQuery("DROP TABLE IF EXISTS " + sparkTableName);
}

@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC})
public void testSparkReadingTrinoIcebergTablePropertiesData()
{
String baseTableName = "test_spark_reading_trino_iceberg_table_properties" + randomNameSuffix();
String trinoTableName = trinoTableName(baseTableName);
String sparkTableName = sparkTableName(baseTableName);

onTrino().executeQuery("CREATE TABLE " + trinoTableName + " (doc_id VARCHAR) WITH (extra_properties = MAP(ARRAY['custom.table-property'], ARRAY['my_custom_value']))");

assertThat(onSpark().executeQuery("SHOW TBLPROPERTIES " + sparkTableName)).contains(row("custom.table-property", "my_custom_value"));

onTrino().executeQuery("DROP TABLE " + trinoTableName);
}

@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion")
public void testTrinoReadingNestedSparkData(StorageFormat storageFormat, int specVersion)
{
Expand Down

0 comments on commit 4152f38

Please sign in to comment.