From cdb54d9bdc57d0036e82f3567c24133a4661b6ee Mon Sep 17 00:00:00 2001 From: FANNG Date: Wed, 31 Jul 2024 19:35:00 +0800 Subject: [PATCH] [#4062] feat(iceberg-rest-server): support RegisterTable for Iceberg REST server (#4088) ### What changes were proposed in this pull request? 1. upgrade Iceberg version to 1.5.2 2. support register table interface for Iceberg REST server ### Why are the changes needed? Fix: #4062 ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? 1. add UT and IT 2. test in local env --------- Co-authored-by: Jerry Shao --- gradle/libs.versions.toml | 2 +- .../iceberg/common/ops/IcebergTableOps.java | 5 ++ .../common/utils/TestIcebergCatalogUtil.java | 2 +- .../rest/IcebergNamespaceOperations.java | 15 ++++++ .../test/IcebergRESTServiceIT.java | 47 ++++++++++++++--- .../service/rest/IcebergRestTestUtil.java | 2 +- .../service/rest/IcebergTableOpsForTest.java | 51 +++++++++++++++++++ .../iceberg/service/rest/IcebergTestBase.java | 13 +++-- .../rest/TestIcebergNamespaceOperations.java | 29 ++++++++++- .../rest/TestIcebergTableOperations.java | 13 ++--- 10 files changed, 157 insertions(+), 22 deletions(-) create mode 100644 iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOpsForTest.java diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index ad870912695..00626f15415 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -43,7 +43,7 @@ commons-collections4 = "4.4" commons-dbcp2 = "2.11.0" caffeine = "2.9.3" rocksdbjni = "7.10.2" -iceberg = '1.3.1' # used for Gravitino Iceberg catalog and Iceberg REST service +iceberg = '1.5.2' # used for Gravitino Iceberg catalog and Iceberg REST service iceberg4spark = "1.4.1" # used for compile spark connector paimon = '0.8.0' spark33 = "3.3.4" diff --git a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergTableOps.java b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergTableOps.java index 820914a4586..1f53b640ec7 100644 --- a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergTableOps.java +++ b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergTableOps.java @@ -37,6 +37,7 @@ import org.apache.iceberg.rest.CatalogHandlers; import org.apache.iceberg.rest.requests.CreateNamespaceRequest; import org.apache.iceberg.rest.requests.CreateTableRequest; +import org.apache.iceberg.rest.requests.RegisterTableRequest; import org.apache.iceberg.rest.requests.RenameTableRequest; import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest; import org.apache.iceberg.rest.requests.UpdateTableRequest; @@ -111,6 +112,10 @@ public UpdateNamespacePropertiesResponse updateNamespaceProperties( asNamespaceCatalog, namespace, updateNamespacePropertiesRequest); } + public LoadTableResponse registerTable(Namespace namespace, RegisterTableRequest request) { + return CatalogHandlers.registerTable(catalog, namespace, request); + } + public LoadTableResponse createTable(Namespace namespace, CreateTableRequest request) { request.validate(); if (request.stageCreate()) { diff --git a/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/utils/TestIcebergCatalogUtil.java b/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/utils/TestIcebergCatalogUtil.java index 7a580e24d45..5cd34aeb1e6 100644 --- a/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/utils/TestIcebergCatalogUtil.java +++ b/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/utils/TestIcebergCatalogUtil.java @@ -60,7 +60,7 @@ void testLoadCatalog() { }); Map properties = new HashMap<>(); - properties.put(CatalogProperties.URI, "jdbc://0.0.0.0:3306"); + properties.put(CatalogProperties.URI, "jdbc:sqlite::memory:"); properties.put(CatalogProperties.WAREHOUSE_LOCATION, "test"); properties.put(IcebergConstants.GRAVITINO_JDBC_DRIVER, "org.sqlite.JDBC"); properties.put(IcebergConstants.ICEBERG_JDBC_USER, "test"); diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergNamespaceOperations.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergNamespaceOperations.java index 104c64029dc..b37fab80b90 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergNamespaceOperations.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergNamespaceOperations.java @@ -40,10 +40,12 @@ import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.rest.RESTUtil; import org.apache.iceberg.rest.requests.CreateNamespaceRequest; +import org.apache.iceberg.rest.requests.RegisterTableRequest; import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest; import org.apache.iceberg.rest.responses.CreateNamespaceResponse; import org.apache.iceberg.rest.responses.GetNamespaceResponse; import org.apache.iceberg.rest.responses.ListNamespacesResponse; +import org.apache.iceberg.rest.responses.LoadTableResponse; import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -122,4 +124,17 @@ public Response updateNamespace( icebergTableOps.updateNamespaceProperties(RESTUtil.decodeNamespace(namespace), request); return IcebergRestUtils.ok(response); } + + @POST + @Path("{namespace}/register") + @Produces(MediaType.APPLICATION_JSON) + @Timed(name = "register-table." + MetricNames.HTTP_PROCESS_DURATION, absolute = true) + @ResponseMetered(name = "register-table", absolute = true) + public Response registerTable( + @PathParam("namespace") String namespace, RegisterTableRequest request) { + LOG.info("Register table, namespace: {}, request: {}", namespace, request); + LoadTableResponse response = + icebergTableOps.registerTable(RESTUtil.decodeNamespace(namespace), request); + return IcebergRestUtils.ok(response); + } } diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRESTServiceIT.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRESTServiceIT.java index 6a674200869..eb196b3a444 100644 --- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRESTServiceIT.java +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRESTServiceIT.java @@ -24,9 +24,7 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.gravitino.iceberg.common.IcebergCatalogBackend; import org.apache.iceberg.exceptions.BadRequestException; -import org.apache.iceberg.exceptions.ServiceFailureException; import org.apache.iceberg.exceptions.ValidationException; import org.apache.spark.sql.AnalysisException; import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException; @@ -262,13 +260,8 @@ void testRenameTable() { "CREATE TABLE iceberg_rest_table_test.rename_foo1" + "(id bigint COMMENT 'unique id',data string) using iceberg"); - Class exception = - catalogType == IcebergCatalogBackend.HIVE - ? ServiceFailureException.class - : TableAlreadyExistsException.class; - Assertions.assertThrowsExactly( - exception, + TableAlreadyExistsException.class, () -> sql( "ALTER TABLE iceberg_rest_table_test.rename_foo2 " @@ -526,4 +519,42 @@ void testSnapshot() { convertToStringMap(sql("select * from iceberg_rest_table_test.snapshot_foo1")); Assertions.assertEquals(ImmutableMap.of("1", "a", "2", "b"), result); } + + @Test + @EnabledIf("catalogTypeNotMemory") + void testRegisterTable() { + String registerDB = "iceberg_register_db"; + String registerTableName = "register_foo1"; + sql("CREATE DATABASE " + registerDB); + sql( + String.format( + "CREATE TABLE %s.%s (id bigint COMMENT 'unique id',data string) USING iceberg", + registerDB, registerTableName)); + sql(String.format("INSERT INTO %s.%s VALUES (1, 'a')", registerDB, registerTableName)); + + // get metadata location + List metadataLocations = + convertToStringList( + sql( + String.format( + "SELECT file FROM %s.%s.metadata_log_entries", registerDB, registerTableName)), + 0); + String metadataLocation = metadataLocations.get(metadataLocations.size() - 1); + + // register table + String register = + String.format( + "CALL rest.system.register_table(table => 'iceberg_rest_table_test.register_foo2', metadata_file=> '%s')", + metadataLocation); + sql(register); + + Map result = + convertToStringMap(sql("SELECT * FROM iceberg_rest_table_test.register_foo2")); + Assertions.assertEquals(ImmutableMap.of("1", "a"), result); + + // insert other data + sql("INSERT INTO iceberg_rest_table_test.register_foo2 VALUES (2, 'b')"); + result = convertToStringMap(sql("SELECT * FROM iceberg_rest_table_test.register_foo2")); + Assertions.assertEquals(ImmutableMap.of("1", "a", "2", "b"), result); + } } diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java index ed50607b97a..0e3d29e4975 100644 --- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java @@ -66,7 +66,7 @@ public static ResourceConfig getIcebergResourceConfig(Class c, boolean bindIcebe } if (bindIcebergTableOps) { - IcebergTableOps icebergTableOps = new IcebergTableOps(); + IcebergTableOps icebergTableOps = new IcebergTableOpsForTest(); IcebergMetricsManager icebergMetricsManager = new IcebergMetricsManager(new IcebergConfig()); resourceConfig.register( new AbstractBinder() { diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOpsForTest.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOpsForTest.java new file mode 100644 index 00000000000..af1ea314bf3 --- /dev/null +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOpsForTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.iceberg.service.rest; + +import org.apache.gravitino.iceberg.common.ops.IcebergTableOps; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.rest.requests.RegisterTableRequest; +import org.apache.iceberg.rest.responses.LoadTableResponse; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StringType; +import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; + +public class IcebergTableOpsForTest extends IcebergTableOps { + @Override + public LoadTableResponse registerTable(Namespace namespace, RegisterTableRequest request) { + if (request.name().contains("fail")) { + throw new AlreadyExistsException("Already exits exception for test"); + } + + Schema mockSchema = new Schema(NestedField.of(1, false, "foo_string", StringType.get())); + TableMetadata tableMetadata = + TableMetadata.newTableMetadata( + mockSchema, PartitionSpec.unpartitioned(), "/mock", ImmutableMap.of()); + LoadTableResponse loadTableResponse = + LoadTableResponse.builder() + .withTableMetadata(tableMetadata) + .addAllConfig(ImmutableMap.of()) + .build(); + return loadTableResponse; + } +} diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergTestBase.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergTestBase.java index 0e7872604ec..ee672bf093f 100644 --- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergTestBase.java +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergTestBase.java @@ -63,19 +63,24 @@ public Invocation.Builder getReportMetricsClientBuilder(String name) { } public Invocation.Builder getNamespaceClientBuilder() { - return getNamespaceClientBuilder(Optional.empty(), Optional.empty()); + return getNamespaceClientBuilder(Optional.empty(), Optional.empty(), Optional.empty()); } public Invocation.Builder getNamespaceClientBuilder(Optional namespace) { - return getNamespaceClientBuilder(namespace, Optional.empty()); + return getNamespaceClientBuilder(namespace, Optional.empty(), Optional.empty()); } public Invocation.Builder getNamespaceClientBuilder( - Optional namespace, Optional> queryParams) { + Optional namespace, + Optional extraPath, + Optional> queryParams) { String path = Joiner.on("/") .skipNulls() - .join(IcebergRestTestUtil.NAMESPACE_PATH, namespace.orElseGet(() -> null)); + .join( + IcebergRestTestUtil.NAMESPACE_PATH, + namespace.orElseGet(() -> null), + extraPath.orElseGet(() -> null)); return getIcebergClientBuilder(path, queryParams); } diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergNamespaceOperations.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergNamespaceOperations.java index 4033dd4a46b..251f9e90709 100644 --- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergNamespaceOperations.java +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergNamespaceOperations.java @@ -31,6 +31,8 @@ import javax.ws.rs.core.Response.Status; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.rest.requests.CreateNamespaceRequest; +import org.apache.iceberg.rest.requests.ImmutableRegisterTableRequest; +import org.apache.iceberg.rest.requests.RegisterTableRequest; import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest; import org.apache.iceberg.rest.responses.CreateNamespaceResponse; import org.apache.iceberg.rest.responses.GetNamespaceResponse; @@ -61,12 +63,20 @@ private Response doCreateNamespace(String... name) { .post(Entity.entity(request, MediaType.APPLICATION_JSON_TYPE)); } + private Response doRegisterTable(String tableName) { + RegisterTableRequest request = + ImmutableRegisterTableRequest.builder().name(tableName).metadataLocation("mock").build(); + return getNamespaceClientBuilder( + Optional.of("register_ns"), Optional.of("register"), Optional.empty()) + .post(Entity.entity(request, MediaType.APPLICATION_JSON_TYPE)); + } + private Response doListNamespace(Optional parent) { Optional> queryParam = parent.isPresent() ? Optional.of(ImmutableMap.of("parent", parent.get())) : Optional.empty(); - return getNamespaceClientBuilder(Optional.empty(), queryParam).get(); + return getNamespaceClientBuilder(Optional.empty(), Optional.empty(), queryParam).get(); } private Response doUpdateNamespace(String name) { @@ -121,6 +131,16 @@ protected void verifyCreateNamespaceSucc(String... name) { Assertions.assertEquals(namespaceResponse.properties(), properties); } + private void verifyRegisterTableSucc(String tableName) { + Response response = doRegisterTable(tableName); + Assertions.assertEquals(Status.OK.getStatusCode(), response.getStatus()); + } + + private void verifyRegisterTableFail(int statusCode, String tableName) { + Response response = doRegisterTable(tableName); + Assertions.assertEquals(statusCode, response.getStatus()); + } + private void verifyCreateNamespaceFail(int statusCode, String... name) { Response response = doCreateNamespace(name); Assertions.assertEquals(statusCode, response.getStatus()); @@ -159,6 +179,13 @@ void testDropNamespace() { verifyDropNamespaceFail(500, ""); } + @Test + void testRegisterTable() { + verifyRegisterTableSucc("register_foo1"); + // Iceberg REST service will throw AlreadyExistsException in test if table name contains 'fail' + verifyRegisterTableFail(409, "fail_register_foo1"); + } + private void dropAllExistingNamespace() { Response response = doListNamespace(Optional.empty()); Assertions.assertEquals(Status.OK.getStatusCode(), response.getStatus()); diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergTableOperations.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergTableOperations.java index ccd60babf8f..e2d49d98528 100644 --- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergTableOperations.java +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergTableOperations.java @@ -20,6 +20,7 @@ package org.apache.gravitino.iceberg.service.rest; import com.google.common.collect.ImmutableSet; +import java.util.List; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -29,10 +30,10 @@ import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import org.apache.iceberg.MetadataUpdate; -import org.apache.iceberg.MetadataUpdate.AddSchema; -import org.apache.iceberg.MetadataUpdate.SetCurrentSchema; import org.apache.iceberg.Schema; import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.UpdateRequirement; +import org.apache.iceberg.UpdateRequirements; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.metrics.CommitReport; @@ -107,10 +108,10 @@ private Response doLoadTable(String name) { } private Response doUpdateTable(String name, TableMetadata base) { - MetadataUpdate addSchema = new AddSchema(newTableSchema, base.lastColumnId()); - MetadataUpdate setCurrentSchema = new SetCurrentSchema(1); - UpdateTableRequest updateTableRequest = - UpdateTableRequest.builderFor(base).update(addSchema).update(setCurrentSchema).build(); + TableMetadata newMetadata = base.updateSchema(newTableSchema, base.lastColumnId()); + List metadataUpdates = newMetadata.changes(); + List requirements = UpdateRequirements.forUpdateTable(base, metadataUpdates); + UpdateTableRequest updateTableRequest = new UpdateTableRequest(requirements, metadataUpdates); return getTableClientBuilder(Optional.of(name)) .post(Entity.entity(updateTableRequest, MediaType.APPLICATION_JSON_TYPE)); }