Skip to content

Commit

Permalink
[#3980]feat(flink-connector): Support partition for hive table operat…
Browse files Browse the repository at this point in the history
…ion (#4096)

### What changes were proposed in this pull request?

- support partitions for the hive table

### Why are the changes needed?

- Fix: #3980 

### Does this PR introduce _any_ user-facing change?

- no

### How was this patch tested?

- add ITs
  • Loading branch information
coolderli authored Jul 22, 2024
1 parent 1e8511a commit 529429e
Show file tree
Hide file tree
Showing 5 changed files with 234 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.flink.connector;

import com.google.common.base.Preconditions;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.gravitino.rel.expressions.transforms.Transform;
import org.apache.gravitino.rel.expressions.transforms.Transforms;

public class DefaultPartitionConverter implements PartitionConverter {

private DefaultPartitionConverter() {}

public static final DefaultPartitionConverter INSTANCE = new DefaultPartitionConverter();

@Override
public List<String> toFlinkPartitionKeys(Transform[] partitions) {
List<String> partitionKeys =
Arrays.stream(partitions)
.filter(t -> t instanceof Transforms.IdentityTransform)
.flatMap(t -> Arrays.stream(((Transforms.IdentityTransform) t).fieldName()))
.collect(Collectors.toList());
Preconditions.checkArgument(
partitionKeys.size() == partitions.length,
"Flink only support identity transform for now.");
return partitionKeys;
}

@Override
public Transform[] toGravitinoPartitions(List<String> partitionsKey) {
return partitionsKey.stream().map(Transforms::identity).toArray(Transform[]::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.flink.connector;

import java.util.List;
import org.apache.gravitino.rel.expressions.transforms.Transform;

/**
* The PartitionConverter is used to convert the partition between Flink and Gravitino. The Flink
* only support identity transform. Some of the table like Apache Paimon will use the table
* properties to store the partition transform, so we can implement this interface to achieve more
* partition transform.
*/
public interface PartitionConverter {
/**
* Convert the partition keys to Flink partition keys.
*
* @param partitions The partition keys in Gravitino.
* @return The partition keys in Flink.
*/
public abstract List<String> toFlinkPartitionKeys(Transform[] partitions);

/**
* Convert the partition keys to Gravitino partition keys.
*
* @param partitionsKey The partition keys in Flink.
* @return The partition keys in Gravitino.
*/
public abstract Transform[] toGravitinoPartitions(List<String> partitionsKey);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.gravitino.flink.connector.catalog;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.MapDifference;
import com.google.common.collect.Maps;
import java.util.Arrays;
Expand Down Expand Up @@ -67,24 +66,30 @@
import org.apache.gravitino.exceptions.NonEmptySchemaException;
import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
import org.apache.gravitino.exceptions.TableAlreadyExistsException;
import org.apache.gravitino.flink.connector.PartitionConverter;
import org.apache.gravitino.flink.connector.PropertiesConverter;
import org.apache.gravitino.flink.connector.utils.TypeUtils;
import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.Table;
import org.apache.gravitino.rel.TableChange;
import org.apache.gravitino.rel.expressions.transforms.Transform;

/**
* The BaseCatalog that provides a default implementation for all methods in the {@link
* org.apache.flink.table.catalog.Catalog} interface.
*/
public abstract class BaseCatalog extends AbstractCatalog {
private final PropertiesConverter propertiesConverter;
private final PartitionConverter partitionConverter;

protected BaseCatalog(String catalogName, String defaultDatabase) {
super(catalogName, defaultDatabase);
this.propertiesConverter = getPropertiesConverter();
this.partitionConverter = getPartitionConverter();
}

protected abstract AbstractCatalog realCatalog();

@Override
public void open() throws CatalogException {}

Expand Down Expand Up @@ -260,8 +265,10 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
String comment = table.getComment();
Map<String, String> properties =
propertiesConverter.toGravitinoTableProperties(table.getOptions());
Transform[] partitions =
partitionConverter.toGravitinoPartitions(((CatalogTable) table).getPartitionKeys());
try {
catalog().asTableCatalog().createTable(identifier, columns, comment, properties);
catalog().asTableCatalog().createTable(identifier, columns, comment, properties, partitions);
} catch (NoSuchSchemaException e) {
throw new DatabaseNotExistException(catalogName(), tablePath.getDatabaseName(), e);
} catch (TableAlreadyExistsException e) {
Expand All @@ -280,37 +287,36 @@ public void alterTable(ObjectPath objectPath, CatalogBaseTable catalogBaseTable,
}

@Override
public List<CatalogPartitionSpec> listPartitions(ObjectPath objectPath)
public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
throws TableNotExistException, TableNotPartitionedException, CatalogException {
throw new UnsupportedOperationException();
return realCatalog().listPartitions(tablePath);
}

@Override
public List<CatalogPartitionSpec> listPartitions(
ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec)
ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException,
CatalogException {
throw new UnsupportedOperationException();
return realCatalog().listPartitions(tablePath, partitionSpec);
}

@Override
public List<CatalogPartitionSpec> listPartitionsByFilter(
ObjectPath objectPath, List<Expression> list)
ObjectPath tablePath, List<Expression> filter)
throws TableNotExistException, TableNotPartitionedException, CatalogException {
throw new UnsupportedOperationException();
return realCatalog().listPartitionsByFilter(tablePath, filter);
}

@Override
public CatalogPartition getPartition(
ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec)
public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
throws PartitionNotExistException, CatalogException {
throw new UnsupportedOperationException();
return realCatalog().getPartition(tablePath, partitionSpec);
}

@Override
public boolean partitionExists(ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec)
public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
throws CatalogException {
throw new UnsupportedOperationException();
return realCatalog().partitionExists(tablePath, partitionSpec);
}

@Override
Expand Down Expand Up @@ -347,9 +353,9 @@ public List<String> listFunctions(String s) throws DatabaseNotExistException, Ca
}

@Override
public CatalogFunction getFunction(ObjectPath objectPath)
public CatalogFunction getFunction(ObjectPath tablePath)
throws FunctionNotExistException, CatalogException {
throw new UnsupportedOperationException();
return realCatalog().getFunction(tablePath);
}

@Override
Expand All @@ -376,29 +382,29 @@ public void dropFunction(ObjectPath objectPath, boolean b)
}

@Override
public CatalogTableStatistics getTableStatistics(ObjectPath objectPath)
public CatalogTableStatistics getTableStatistics(ObjectPath tablePath)
throws TableNotExistException, CatalogException {
throw new UnsupportedOperationException();
return realCatalog().getTableStatistics(tablePath);
}

@Override
public CatalogColumnStatistics getTableColumnStatistics(ObjectPath objectPath)
public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath)
throws TableNotExistException, CatalogException {
throw new UnsupportedOperationException();
return realCatalog().getTableColumnStatistics(tablePath);
}

@Override
public CatalogTableStatistics getPartitionStatistics(
ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec)
ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
throws PartitionNotExistException, CatalogException {
throw new UnsupportedOperationException();
return realCatalog().getPartitionStatistics(tablePath, partitionSpec);
}

@Override
public CatalogColumnStatistics getPartitionColumnStatistics(
ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec)
ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
throws PartitionNotExistException, CatalogException {
throw new UnsupportedOperationException();
return realCatalog().getPartitionColumnStatistics(tablePath, partitionSpec);
}

@Override
Expand Down Expand Up @@ -437,6 +443,8 @@ public void alterPartitionColumnStatistics(

protected abstract PropertiesConverter getPropertiesConverter();

protected abstract PartitionConverter getPartitionConverter();

protected CatalogBaseTable toFlinkTable(Table table) {
org.apache.flink.table.api.Schema.Builder builder =
org.apache.flink.table.api.Schema.newBuilder();
Expand All @@ -448,8 +456,8 @@ protected CatalogBaseTable toFlinkTable(Table table) {
}
Map<String, String> flinkTableProperties =
propertiesConverter.toFlinkTableProperties(table.properties());
return CatalogTable.of(
builder.build(), table.comment(), ImmutableList.of(), flinkTableProperties);
List<String> partitionKeys = partitionConverter.toFlinkPartitionKeys(table.partitioning());
return CatalogTable.of(builder.build(), table.comment(), partitionKeys, flinkTableProperties);
}

private Column toGravitinoColumn(org.apache.flink.table.catalog.Column column) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@

import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.factories.Factory;
import org.apache.gravitino.flink.connector.DefaultPartitionConverter;
import org.apache.gravitino.flink.connector.PartitionConverter;
import org.apache.gravitino.flink.connector.PropertiesConverter;
import org.apache.gravitino.flink.connector.catalog.BaseCatalog;
import org.apache.hadoop.hive.conf.HiveConf;
Expand Down Expand Up @@ -75,14 +74,12 @@ protected PropertiesConverter getPropertiesConverter() {
}

@Override
public CatalogTableStatistics getTableStatistics(ObjectPath objectPath)
throws TableNotExistException, CatalogException {
return hiveCatalog.getTableStatistics(objectPath);
protected PartitionConverter getPartitionConverter() {
return DefaultPartitionConverter.INSTANCE;
}

@Override
public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath)
throws TableNotExistException, CatalogException {
return hiveCatalog.getTableColumnStatistics(tablePath);
protected AbstractCatalog realCatalog() {
return hiveCatalog;
}
}
Loading

0 comments on commit 529429e

Please sign in to comment.