Skip to content

Commit

Permalink
[#4062] feat(iceberg-rest-server): support RegisterTable for Iceberg …
Browse files Browse the repository at this point in the history
…REST server (#4088)

### What changes were proposed in this pull request?
1. upgrade Iceberg version to 1.5.2
2. support register table interface for Iceberg REST server

### Why are the changes needed?

Fix: #4062 

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
1. add UT and IT
2. test in local env

---------

Co-authored-by: Jerry Shao <[email protected]>
  • Loading branch information
FANNG1 and jerryshao authored Jul 31, 2024
1 parent fee9afa commit cdb54d9
Show file tree
Hide file tree
Showing 10 changed files with 157 additions and 22 deletions.
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ commons-collections4 = "4.4"
commons-dbcp2 = "2.11.0"
caffeine = "2.9.3"
rocksdbjni = "7.10.2"
iceberg = '1.3.1' # used for Gravitino Iceberg catalog and Iceberg REST service
iceberg = '1.5.2' # used for Gravitino Iceberg catalog and Iceberg REST service
iceberg4spark = "1.4.1" # used for compile spark connector
paimon = '0.8.0'
spark33 = "3.3.4"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.iceberg.rest.CatalogHandlers;
import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
import org.apache.iceberg.rest.requests.CreateTableRequest;
import org.apache.iceberg.rest.requests.RegisterTableRequest;
import org.apache.iceberg.rest.requests.RenameTableRequest;
import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest;
import org.apache.iceberg.rest.requests.UpdateTableRequest;
Expand Down Expand Up @@ -111,6 +112,10 @@ public UpdateNamespacePropertiesResponse updateNamespaceProperties(
asNamespaceCatalog, namespace, updateNamespacePropertiesRequest);
}

public LoadTableResponse registerTable(Namespace namespace, RegisterTableRequest request) {
return CatalogHandlers.registerTable(catalog, namespace, request);
}

public LoadTableResponse createTable(Namespace namespace, CreateTableRequest request) {
request.validate();
if (request.stageCreate()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ void testLoadCatalog() {
});

Map<String, String> properties = new HashMap<>();
properties.put(CatalogProperties.URI, "jdbc://0.0.0.0:3306");
properties.put(CatalogProperties.URI, "jdbc:sqlite::memory:");
properties.put(CatalogProperties.WAREHOUSE_LOCATION, "test");
properties.put(IcebergConstants.GRAVITINO_JDBC_DRIVER, "org.sqlite.JDBC");
properties.put(IcebergConstants.ICEBERG_JDBC_USER, "test");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.rest.RESTUtil;
import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
import org.apache.iceberg.rest.requests.RegisterTableRequest;
import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest;
import org.apache.iceberg.rest.responses.CreateNamespaceResponse;
import org.apache.iceberg.rest.responses.GetNamespaceResponse;
import org.apache.iceberg.rest.responses.ListNamespacesResponse;
import org.apache.iceberg.rest.responses.LoadTableResponse;
import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -122,4 +124,17 @@ public Response updateNamespace(
icebergTableOps.updateNamespaceProperties(RESTUtil.decodeNamespace(namespace), request);
return IcebergRestUtils.ok(response);
}

@POST
@Path("{namespace}/register")
@Produces(MediaType.APPLICATION_JSON)
@Timed(name = "register-table." + MetricNames.HTTP_PROCESS_DURATION, absolute = true)
@ResponseMetered(name = "register-table", absolute = true)
public Response registerTable(
@PathParam("namespace") String namespace, RegisterTableRequest request) {
LOG.info("Register table, namespace: {}, request: {}", namespace, request);
LoadTableResponse response =
icebergTableOps.registerTable(RESTUtil.decodeNamespace(namespace), request);
return IcebergRestUtils.ok(response);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.gravitino.iceberg.common.IcebergCatalogBackend;
import org.apache.iceberg.exceptions.BadRequestException;
import org.apache.iceberg.exceptions.ServiceFailureException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
Expand Down Expand Up @@ -262,13 +260,8 @@ void testRenameTable() {
"CREATE TABLE iceberg_rest_table_test.rename_foo1"
+ "(id bigint COMMENT 'unique id',data string) using iceberg");

Class exception =
catalogType == IcebergCatalogBackend.HIVE
? ServiceFailureException.class
: TableAlreadyExistsException.class;

Assertions.assertThrowsExactly(
exception,
TableAlreadyExistsException.class,
() ->
sql(
"ALTER TABLE iceberg_rest_table_test.rename_foo2 "
Expand Down Expand Up @@ -526,4 +519,42 @@ void testSnapshot() {
convertToStringMap(sql("select * from iceberg_rest_table_test.snapshot_foo1"));
Assertions.assertEquals(ImmutableMap.of("1", "a", "2", "b"), result);
}

@Test
@EnabledIf("catalogTypeNotMemory")
void testRegisterTable() {
String registerDB = "iceberg_register_db";
String registerTableName = "register_foo1";
sql("CREATE DATABASE " + registerDB);
sql(
String.format(
"CREATE TABLE %s.%s (id bigint COMMENT 'unique id',data string) USING iceberg",
registerDB, registerTableName));
sql(String.format("INSERT INTO %s.%s VALUES (1, 'a')", registerDB, registerTableName));

// get metadata location
List<String> metadataLocations =
convertToStringList(
sql(
String.format(
"SELECT file FROM %s.%s.metadata_log_entries", registerDB, registerTableName)),
0);
String metadataLocation = metadataLocations.get(metadataLocations.size() - 1);

// register table
String register =
String.format(
"CALL rest.system.register_table(table => 'iceberg_rest_table_test.register_foo2', metadata_file=> '%s')",
metadataLocation);
sql(register);

Map<String, String> result =
convertToStringMap(sql("SELECT * FROM iceberg_rest_table_test.register_foo2"));
Assertions.assertEquals(ImmutableMap.of("1", "a"), result);

// insert other data
sql("INSERT INTO iceberg_rest_table_test.register_foo2 VALUES (2, 'b')");
result = convertToStringMap(sql("SELECT * FROM iceberg_rest_table_test.register_foo2"));
Assertions.assertEquals(ImmutableMap.of("1", "a", "2", "b"), result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public static ResourceConfig getIcebergResourceConfig(Class c, boolean bindIcebe
}

if (bindIcebergTableOps) {
IcebergTableOps icebergTableOps = new IcebergTableOps();
IcebergTableOps icebergTableOps = new IcebergTableOpsForTest();
IcebergMetricsManager icebergMetricsManager = new IcebergMetricsManager(new IcebergConfig());
resourceConfig.register(
new AbstractBinder() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.iceberg.service.rest;

import org.apache.gravitino.iceberg.common.ops.IcebergTableOps;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.rest.requests.RegisterTableRequest;
import org.apache.iceberg.rest.responses.LoadTableResponse;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.types.Types.StringType;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;

public class IcebergTableOpsForTest extends IcebergTableOps {
@Override
public LoadTableResponse registerTable(Namespace namespace, RegisterTableRequest request) {
if (request.name().contains("fail")) {
throw new AlreadyExistsException("Already exits exception for test");
}

Schema mockSchema = new Schema(NestedField.of(1, false, "foo_string", StringType.get()));
TableMetadata tableMetadata =
TableMetadata.newTableMetadata(
mockSchema, PartitionSpec.unpartitioned(), "/mock", ImmutableMap.of());
LoadTableResponse loadTableResponse =
LoadTableResponse.builder()
.withTableMetadata(tableMetadata)
.addAllConfig(ImmutableMap.of())
.build();
return loadTableResponse;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,24 @@ public Invocation.Builder getReportMetricsClientBuilder(String name) {
}

public Invocation.Builder getNamespaceClientBuilder() {
return getNamespaceClientBuilder(Optional.empty(), Optional.empty());
return getNamespaceClientBuilder(Optional.empty(), Optional.empty(), Optional.empty());
}

public Invocation.Builder getNamespaceClientBuilder(Optional<String> namespace) {
return getNamespaceClientBuilder(namespace, Optional.empty());
return getNamespaceClientBuilder(namespace, Optional.empty(), Optional.empty());
}

public Invocation.Builder getNamespaceClientBuilder(
Optional<String> namespace, Optional<Map<String, String>> queryParams) {
Optional<String> namespace,
Optional<String> extraPath,
Optional<Map<String, String>> queryParams) {
String path =
Joiner.on("/")
.skipNulls()
.join(IcebergRestTestUtil.NAMESPACE_PATH, namespace.orElseGet(() -> null));
.join(
IcebergRestTestUtil.NAMESPACE_PATH,
namespace.orElseGet(() -> null),
extraPath.orElseGet(() -> null));
return getIcebergClientBuilder(path, queryParams);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import javax.ws.rs.core.Response.Status;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
import org.apache.iceberg.rest.requests.ImmutableRegisterTableRequest;
import org.apache.iceberg.rest.requests.RegisterTableRequest;
import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest;
import org.apache.iceberg.rest.responses.CreateNamespaceResponse;
import org.apache.iceberg.rest.responses.GetNamespaceResponse;
Expand Down Expand Up @@ -61,12 +63,20 @@ private Response doCreateNamespace(String... name) {
.post(Entity.entity(request, MediaType.APPLICATION_JSON_TYPE));
}

private Response doRegisterTable(String tableName) {
RegisterTableRequest request =
ImmutableRegisterTableRequest.builder().name(tableName).metadataLocation("mock").build();
return getNamespaceClientBuilder(
Optional.of("register_ns"), Optional.of("register"), Optional.empty())
.post(Entity.entity(request, MediaType.APPLICATION_JSON_TYPE));
}

private Response doListNamespace(Optional<String> parent) {
Optional<Map<String, String>> queryParam =
parent.isPresent()
? Optional.of(ImmutableMap.of("parent", parent.get()))
: Optional.empty();
return getNamespaceClientBuilder(Optional.empty(), queryParam).get();
return getNamespaceClientBuilder(Optional.empty(), Optional.empty(), queryParam).get();
}

private Response doUpdateNamespace(String name) {
Expand Down Expand Up @@ -121,6 +131,16 @@ protected void verifyCreateNamespaceSucc(String... name) {
Assertions.assertEquals(namespaceResponse.properties(), properties);
}

private void verifyRegisterTableSucc(String tableName) {
Response response = doRegisterTable(tableName);
Assertions.assertEquals(Status.OK.getStatusCode(), response.getStatus());
}

private void verifyRegisterTableFail(int statusCode, String tableName) {
Response response = doRegisterTable(tableName);
Assertions.assertEquals(statusCode, response.getStatus());
}

private void verifyCreateNamespaceFail(int statusCode, String... name) {
Response response = doCreateNamespace(name);
Assertions.assertEquals(statusCode, response.getStatus());
Expand Down Expand Up @@ -159,6 +179,13 @@ void testDropNamespace() {
verifyDropNamespaceFail(500, "");
}

@Test
void testRegisterTable() {
verifyRegisterTableSucc("register_foo1");
// Iceberg REST service will throw AlreadyExistsException in test if table name contains 'fail'
verifyRegisterTableFail(409, "fail_register_foo1");
}

private void dropAllExistingNamespace() {
Response response = doListNamespace(Optional.empty());
Assertions.assertEquals(Status.OK.getStatusCode(), response.getStatus());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.gravitino.iceberg.service.rest;

import com.google.common.collect.ImmutableSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -29,10 +30,10 @@
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.iceberg.MetadataUpdate;
import org.apache.iceberg.MetadataUpdate.AddSchema;
import org.apache.iceberg.MetadataUpdate.SetCurrentSchema;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.UpdateRequirement;
import org.apache.iceberg.UpdateRequirements;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.metrics.CommitReport;
Expand Down Expand Up @@ -107,10 +108,10 @@ private Response doLoadTable(String name) {
}

private Response doUpdateTable(String name, TableMetadata base) {
MetadataUpdate addSchema = new AddSchema(newTableSchema, base.lastColumnId());
MetadataUpdate setCurrentSchema = new SetCurrentSchema(1);
UpdateTableRequest updateTableRequest =
UpdateTableRequest.builderFor(base).update(addSchema).update(setCurrentSchema).build();
TableMetadata newMetadata = base.updateSchema(newTableSchema, base.lastColumnId());
List<MetadataUpdate> metadataUpdates = newMetadata.changes();
List<UpdateRequirement> requirements = UpdateRequirements.forUpdateTable(base, metadataUpdates);
UpdateTableRequest updateTableRequest = new UpdateTableRequest(requirements, metadataUpdates);
return getTableClientBuilder(Optional.of(name))
.post(Entity.entity(updateTableRequest, MediaType.APPLICATION_JSON_TYPE));
}
Expand Down

0 comments on commit cdb54d9

Please sign in to comment.