Skip to content

Commit

Permalink
[apache#3371] feat(flink-connector): support table operation
Browse files Browse the repository at this point in the history
  • Loading branch information
coolderli committed Jun 21, 2024
1 parent 671bdd1 commit 14f3ca1
Showing 1 changed file with 51 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,25 @@
package com.datastrato.gravitino.flink.connector.catalog;

import com.datastrato.gravitino.Catalog;
import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.Namespace;
import com.datastrato.gravitino.Schema;
import com.datastrato.gravitino.SchemaChange;
import com.datastrato.gravitino.exceptions.NoSuchCatalogException;
import com.datastrato.gravitino.exceptions.NoSuchSchemaException;
import com.datastrato.gravitino.exceptions.NonEmptySchemaException;
import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException;
import com.datastrato.gravitino.flink.connector.PropertiesConverter;
import com.datastrato.gravitino.rel.Table;
import com.datastrato.gravitino.rel.TableChange;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.MapDifference;
import com.google.common.collect.Maps;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.compress.utils.Lists;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
Expand Down Expand Up @@ -135,8 +141,14 @@ public void alterDatabase(
}

@Override
public List<String> listTables(String s) throws DatabaseNotExistException, CatalogException {
throw new UnsupportedOperationException();
public List<String> listTables(String database) throws DatabaseNotExistException, CatalogException {
try {
return Stream.of(catalog().asTableCatalog().listTables(Namespace.of(getName(), database)))
.map(NameIdentifier::name)
.collect(Collectors.toList());
} catch (NoSuchSchemaException e) {
throw new DatabaseNotExistException(getName(), database);
}
}

@Override
Expand All @@ -147,24 +159,53 @@ public List<String> listViews(String s) throws DatabaseNotExistException, Catalo
@Override
public CatalogBaseTable getTable(ObjectPath objectPath)
throws TableNotExistException, CatalogException {
throw new UnsupportedOperationException();
try {
Table table = catalog().asTableCatalog()
.loadTable(NameIdentifier.of(
Namespace.of(getName(), objectPath.getDatabaseName()),
objectPath.getObjectName()));
return convertToFlinkTable(table);
} catch (NoSuchCatalogException e) {
throw new CatalogException(e);
}
}

@Override
public boolean tableExists(ObjectPath objectPath) throws CatalogException {
throw new UnsupportedOperationException();
return catalog().asTableCatalog().tableExists(NameIdentifier.of(
Namespace.of(getName(), objectPath.getDatabaseName()), objectPath.getObjectName()));
}

@Override
public void dropTable(ObjectPath objectPath, boolean b)
public void dropTable(ObjectPath objectPath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
throw new UnsupportedOperationException();
boolean dropped = catalog().asTableCatalog().dropTable(NameIdentifier.of(
Namespace.of(getName(), objectPath.getDatabaseName()), objectPath.getObjectName()));
if (!dropped && !ignoreIfNotExists) {
throw new TableNotExistException(getName(), objectPath);
}
}

@Override
public void renameTable(ObjectPath objectPath, String s, boolean b)
public void renameTable(ObjectPath objectPath, String newTableName, boolean ignoreIfNotExists)
throws TableNotExistException, TableAlreadyExistException, CatalogException {
throw new UnsupportedOperationException();
NameIdentifier identifier = NameIdentifier.of(
Namespace.of(getName(), objectPath.getDatabaseName()), objectPath.getObjectName());

if (catalog().asTableCatalog().tableExists(identifier)) {
throw new TableAlreadyExistException(getName(), objectPath);
}

try {
catalog().asTableCatalog().alterTable(
NameIdentifier.of(
Namespace.of(getName(), objectPath.getDatabaseName()), objectPath.getObjectName()),
TableChange.rename(newTableName));
} catch (NoSuchCatalogException e) {
if (!ignoreIfNotExists) {
throw new TableNotExistException(getName(), objectPath);
}
}
}

@Override
Expand Down Expand Up @@ -337,6 +378,8 @@ public void alterPartitionColumnStatistics(

protected abstract PropertiesConverter getPropertiesConverter();

protected abstract CatalogBaseTable convertToFlinkTable(Table table);

@VisibleForTesting
static SchemaChange[] getSchemaChange(CatalogDatabase current, CatalogDatabase updated) {
Map<String, String> currentProperties = current.getProperties();
Expand Down

0 comments on commit 14f3ca1

Please sign in to comment.