Skip to content

Commit

Permalink
move mixed-format codes from core to a new module amoro-mixed-format-…
Browse files Browse the repository at this point in the history
…core
  • Loading branch information
baiyangtx committed Aug 17, 2024
1 parent 9def89f commit a4bd588
Show file tree
Hide file tree
Showing 264 changed files with 1,217 additions and 299 deletions.
6 changes: 6 additions & 0 deletions amoro-ams/amoro-ams-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.amoro</groupId>
<artifactId>amoro-mixed-format-core</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
Expand Down
13 changes: 13 additions & 0 deletions amoro-ams/amoro-ams-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@
<artifactId>amoro-ams-api</artifactId>
</dependency>

<dependency>
<groupId>org.apache.amoro</groupId>
<artifactId>amoro-mixed-format-core</artifactId>
</dependency>

<dependency>
<groupId>org.apache.amoro</groupId>
<artifactId>amoro-mixed-format-hive</artifactId>
Expand Down Expand Up @@ -329,6 +334,14 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.amoro</groupId>
<artifactId>amoro-mixed-format-core</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.amoro</groupId>
<artifactId>amoro-ams-api</artifactId>
Expand Down
53 changes: 0 additions & 53 deletions amoro-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,35 +74,6 @@
<artifactId>kryo</artifactId>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-data</artifactId>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-orc</artifactId>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-common</artifactId>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-parquet</artifactId>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-aws</artifactId>
</dependency>

<dependency>
<groupId>org.apache.parquet</groupId>
Expand Down Expand Up @@ -156,30 +127,6 @@
<artifactId>lucene-core</artifactId>
</dependency>

<!-- test dependencies -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-api</artifactId>
<version>${iceberg.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
<version>${iceberg.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-data</artifactId>
<version>${iceberg.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;

/** Stores hadoop config files for {@link MixedTable} */
/** Stores hadoop config files */
public class TableMetaStore implements Serializable {
private static final long serialVersionUID = 1L;

Expand Down
172 changes: 7 additions & 165 deletions amoro-core/src/main/java/org/apache/amoro/utils/MixedCatalogUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,12 @@
import org.apache.amoro.TableFormat;
import org.apache.amoro.api.CatalogMeta;
import org.apache.amoro.api.TableMeta;
import org.apache.amoro.io.AuthenticatedFileIO;
import org.apache.amoro.op.MixedHadoopTableOperations;
import org.apache.amoro.op.MixedTableOperations;
import org.apache.amoro.properties.CatalogMetaProperties;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
import org.apache.amoro.table.TableIdentifier;
import org.apache.amoro.table.TableMetaStore;
import org.apache.amoro.table.TableProperties;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CachingCatalog;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.Table;
import org.apache.iceberg.aws.glue.GlueCatalog;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.hadoop.HadoopTableOperations;
import org.apache.iceberg.rest.RESTCatalog;
import org.apache.iceberg.util.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -99,42 +83,6 @@ public static void mergeCatalogProperties(CatalogMeta meta, Map<String, String>
}
}

/**
* add initialize properties for iceberg catalog
*
* @param catalogName - catalog name
* @param metastoreType - metastore type
* @param properties - catalog properties
* @return catalog properties with initialize properties.
*/
public static Map<String, String> withIcebergCatalogInitializeProperties(
String catalogName, String metastoreType, Map<String, String> properties) {
Map<String, String> icebergCatalogProperties = Maps.newHashMap(properties);
icebergCatalogProperties.put(
org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE, metastoreType);
if (CatalogMetaProperties.CATALOG_TYPE_GLUE.equals(metastoreType)) {
icebergCatalogProperties.put(CatalogProperties.CATALOG_IMPL, GlueCatalog.class.getName());
}
if (CatalogMetaProperties.CATALOG_TYPE_AMS.equalsIgnoreCase(metastoreType)) {
icebergCatalogProperties.put(CatalogProperties.WAREHOUSE_LOCATION, catalogName);
if (!icebergCatalogProperties.containsKey(CatalogProperties.CATALOG_IMPL)) {
icebergCatalogProperties.put(CatalogProperties.CATALOG_IMPL, RESTCatalog.class.getName());
}
}

if (CatalogMetaProperties.CATALOG_TYPE_CUSTOM.equalsIgnoreCase(metastoreType)) {
Preconditions.checkArgument(
icebergCatalogProperties.containsKey(CatalogProperties.CATALOG_IMPL),
"Custom catalog properties must contains " + CatalogProperties.CATALOG_IMPL);
}

if (icebergCatalogProperties.containsKey(CatalogProperties.CATALOG_IMPL)) {
icebergCatalogProperties.remove(org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE);
}

return icebergCatalogProperties;
}

/** Build {@link TableMetaStore} from catalog meta. */
public static TableMetaStore buildMetaStore(CatalogMeta catalogMeta) {
// load storage configs
Expand All @@ -154,7 +102,7 @@ public static TableMetaStore buildMetaStore(CatalogMeta catalogMeta) {
}

boolean loadAuthFromAMS =
PropertyUtil.propertyAsBoolean(
propertyAsBoolean(
catalogMeta.getCatalogProperties(),
CatalogMetaProperties.LOAD_AUTH_FROM_AMS,
CatalogMetaProperties.LOAD_AUTH_FROM_AMS_DEFAULT);
Expand Down Expand Up @@ -219,82 +167,6 @@ public static TableMetaStore buildMetaStore(CatalogMeta catalogMeta) {
return builder.build();
}

/** Wrap table operation with authorization logic for {@link Table}. */
public static Table useMixedTableOperations(
Table table,
String tableLocation,
AuthenticatedFileIO authenticatedFileIO,
Configuration configuration) {
if (table instanceof org.apache.iceberg.BaseTable) {
org.apache.iceberg.BaseTable baseTable = (org.apache.iceberg.BaseTable) table;
if (baseTable.operations() instanceof MixedHadoopTableOperations) {
return table;
} else if (baseTable.operations() instanceof MixedTableOperations) {
return table;
} else if (baseTable.operations() instanceof HadoopTableOperations) {
return new org.apache.iceberg.BaseTable(
new MixedHadoopTableOperations(
new Path(tableLocation), authenticatedFileIO, configuration),
table.name());
} else {
return new org.apache.iceberg.BaseTable(
new MixedTableOperations(((BaseTable) table).operations(), authenticatedFileIO),
table.name());
}
}
return table;
}

/**
* merge properties of table level in catalog properties to table(properties key start with
* table.)
*
* @param tableProperties properties in table
* @param catalogProperties properties in catalog
* @return merged table properties
*/
public static Map<String, String> mergeCatalogPropertiesToTable(
Map<String, String> tableProperties, Map<String, String> catalogProperties) {
Map<String, String> mergedProperties =
catalogProperties.entrySet().stream()
.filter(e -> e.getKey().startsWith(CatalogMetaProperties.TABLE_PROPERTIES_PREFIX))
.collect(
Collectors.toMap(
e ->
e.getKey()
.substring(CatalogMetaProperties.TABLE_PROPERTIES_PREFIX.length()),
Map.Entry::getValue));

if (!PropertyUtil.propertyAsBoolean(
tableProperties,
TableProperties.ENABLE_LOG_STORE,
TableProperties.ENABLE_LOG_STORE_DEFAULT)) {
mergedProperties =
mergedProperties.entrySet().stream()
.filter(
e -> !e.getKey().startsWith(CatalogMetaProperties.LOG_STORE_PROPERTIES_PREFIX))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

String optimizationEnabled =
tableProperties.getOrDefault(
TableProperties.ENABLE_SELF_OPTIMIZING,
mergedProperties.getOrDefault(
TableProperties.ENABLE_SELF_OPTIMIZING,
String.valueOf(TableProperties.ENABLE_SELF_OPTIMIZING_DEFAULT)));
if (!Boolean.parseBoolean(optimizationEnabled)) {
mergedProperties =
mergedProperties.entrySet().stream()
.filter(e -> !e.getKey().startsWith(CatalogMetaProperties.OPTIMIZE_PROPERTIES_PREFIX))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
// maintain 'optimize.enable' flag as false in table properties
mergedProperties.put(TableProperties.ENABLE_SELF_OPTIMIZING, optimizationEnabled);
}
mergedProperties.putAll(tableProperties);

return mergedProperties;
}

public static TableIdentifier tableId(TableMeta tableMeta) {
return TableIdentifier.of(
tableMeta.getTableIdentifier().getCatalog(),
Expand Down Expand Up @@ -342,42 +214,12 @@ public static <T> void copyProperty(
}
}

/**
* Build cache catalog.
*
* @param catalog The catalog of the wrap that needs to be cached
* @param properties table properties
* @return If Cache is enabled, CachingCatalog is returned, otherwise, the input catalog is
* returned
*/
public static Catalog buildCacheCatalog(Catalog catalog, Map<String, String> properties) {
boolean cacheEnabled =
PropertyUtil.propertyAsBoolean(
properties, CatalogProperties.CACHE_ENABLED, CatalogProperties.CACHE_ENABLED_DEFAULT);

boolean cacheCaseSensitive =
PropertyUtil.propertyAsBoolean(
properties,
CatalogProperties.CACHE_CASE_SENSITIVE,
CatalogProperties.CACHE_CASE_SENSITIVE_DEFAULT);

long cacheExpirationIntervalMs =
PropertyUtil.propertyAsLong(
properties,
CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS,
CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS_DEFAULT);

// An expiration interval of 0ms effectively disables caching.
// Do not wrap with CachingCatalog.
if (cacheExpirationIntervalMs <= 0) {
LOG.warn(
"Configuration `{}` is {}, less than or equal to 0, then the cache will not take effect.",
CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS,
cacheExpirationIntervalMs);
cacheEnabled = false;
private static boolean propertyAsBoolean(
Map<String, String> properties, String property, boolean defaultValue) {
String value = properties.get(property);
if (value != null) {
return Boolean.parseBoolean(value);
}
return cacheEnabled
? CachingCatalog.wrap(catalog, cacheCaseSensitive, cacheExpirationIntervalMs)
: catalog;
return defaultValue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,4 @@
# limitations under the License.
#

org.apache.amoro.formats.iceberg.IcebergCatalogFactory
org.apache.amoro.formats.paimon.PaimonCatalogFactory
org.apache.amoro.formats.mixed.MixedIcebergCatalogFactory
org.apache.amoro.formats.mixed.MixedHiveCatalogFactory
org.apache.amoro.formats.paimon.PaimonCatalogFactory
Loading

0 comments on commit a4bd588

Please sign in to comment.