Skip to content

Commit

Permalink
Core: Add REST spec and request for commits to multiple tables (#7741)
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra authored Jun 9, 2023
1 parent 2a83d9e commit cdc1a27
Show file tree
Hide file tree
Showing 7 changed files with 521 additions and 1 deletion.
27 changes: 26 additions & 1 deletion core/src/main/java/org/apache/iceberg/rest/RESTSerializers.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.catalog.TableIdentifierParser;
import org.apache.iceberg.rest.auth.OAuth2Util;
import org.apache.iceberg.rest.requests.CommitTransactionRequest;
import org.apache.iceberg.rest.requests.CommitTransactionRequestParser;
import org.apache.iceberg.rest.requests.ImmutableReportMetricsRequest;
import org.apache.iceberg.rest.requests.ReportMetricsRequest;
import org.apache.iceberg.rest.requests.ReportMetricsRequestParser;
Expand Down Expand Up @@ -83,7 +85,10 @@ public static void registerAll(ObjectMapper mapper) {
.addDeserializer(ReportMetricsRequest.class, new ReportMetricsRequestDeserializer<>())
.addSerializer(ImmutableReportMetricsRequest.class, new ReportMetricsRequestSerializer<>())
.addDeserializer(
ImmutableReportMetricsRequest.class, new ReportMetricsRequestDeserializer<>());
ImmutableReportMetricsRequest.class, new ReportMetricsRequestDeserializer<>())
.addSerializer(CommitTransactionRequest.class, new CommitTransactionRequestSerializer())
.addDeserializer(
CommitTransactionRequest.class, new CommitTransactionRequestDeserializer());
mapper.registerModule(module);
}

Expand Down Expand Up @@ -280,4 +285,24 @@ public T deserialize(JsonParser p, DeserializationContext context) throws IOExce
return (T) ReportMetricsRequestParser.fromJson(jsonNode);
}
}

public static class CommitTransactionRequestSerializer
extends JsonSerializer<CommitTransactionRequest> {
@Override
public void serialize(
CommitTransactionRequest request, JsonGenerator gen, SerializerProvider serializers)
throws IOException {
CommitTransactionRequestParser.toJson(request, gen);
}
}

public static class CommitTransactionRequestDeserializer
extends JsonDeserializer<CommitTransactionRequest> {
@Override
public CommitTransactionRequest deserialize(JsonParser p, DeserializationContext context)
throws IOException {
JsonNode jsonNode = p.getCodec().readTree(p);
return CommitTransactionRequestParser.fromJson(jsonNode);
}
}
}
4 changes: 4 additions & 0 deletions core/src/main/java/org/apache/iceberg/rest/ResourcePaths.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,8 @@ public String metrics(TableIdentifier identifier) {
RESTUtil.encodeString(identifier.name()),
"metrics");
}

public String commitTransaction() {
return SLASH.join("v1", prefix, "transactions", "commit");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.rest.requests;

import java.util.List;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.rest.RESTRequest;

public class CommitTransactionRequest implements RESTRequest {
private final List<UpdateTableRequest> tableChanges;

public CommitTransactionRequest(List<UpdateTableRequest> tableChanges) {
this.tableChanges = tableChanges;
validate();
}

public List<UpdateTableRequest> tableChanges() {
return ImmutableList.copyOf(tableChanges);
}

@Override
public void validate() {
Preconditions.checkArgument(null != tableChanges, "Invalid table changes: null");
Preconditions.checkArgument(!tableChanges.isEmpty(), "Invalid table changes: empty");
for (UpdateTableRequest tableChange : tableChanges) {
Preconditions.checkArgument(
null != tableChange.identifier(), "Invalid table changes: table identifier required");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.rest.requests;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
import java.util.List;
import org.apache.iceberg.MetadataUpdate;
import org.apache.iceberg.MetadataUpdateParser;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.catalog.TableIdentifierParser;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.JsonUtil;

public class CommitTransactionRequestParser {
private static final String TABLE_CHANGES = "table-changes";
private static final String IDENTIFIER = "identifier";
private static final String REQUIREMENTS = "requirements";
private static final String UPDATES = "updates";

private CommitTransactionRequestParser() {}

public static String toJson(CommitTransactionRequest request) {
return toJson(request, false);
}

public static String toJson(CommitTransactionRequest request, boolean pretty) {
return JsonUtil.generate(gen -> toJson(request, gen), pretty);
}

public static void toJson(CommitTransactionRequest request, JsonGenerator gen)
throws IOException {
Preconditions.checkArgument(null != request, "Invalid commit tx request: null");

gen.writeStartObject();
gen.writeFieldName(TABLE_CHANGES);
gen.writeStartArray();

for (UpdateTableRequest tableChange : request.tableChanges()) {
gen.writeStartObject();

gen.writeFieldName(IDENTIFIER);
TableIdentifierParser.toJson(tableChange.identifier(), gen);

gen.writeArrayFieldStart(REQUIREMENTS);
for (UpdateTableRequest.UpdateRequirement updateRequirement : tableChange.requirements()) {
UpdateRequirementParser.toJson(updateRequirement, gen);
}
gen.writeEndArray();

gen.writeArrayFieldStart(UPDATES);
for (MetadataUpdate metadataUpdate : tableChange.updates()) {
MetadataUpdateParser.toJson(metadataUpdate, gen);
}
gen.writeEndArray();

gen.writeEndObject();
}

gen.writeEndArray();
gen.writeEndObject();
}

public static CommitTransactionRequest fromJson(String json) {
return JsonUtil.parse(json, CommitTransactionRequestParser::fromJson);
}

public static CommitTransactionRequest fromJson(JsonNode json) {
Preconditions.checkArgument(null != json, "Cannot parse commit tx request from null object");

List<UpdateTableRequest> tableChanges = Lists.newArrayList();
JsonNode changes = JsonUtil.get(TABLE_CHANGES, json);

Preconditions.checkArgument(
changes.isArray(), "Cannot parse commit tx request from non-array: %s", changes);

for (JsonNode node : changes) {
TableIdentifier identifier = null;
List<UpdateTableRequest.UpdateRequirement> requirements = Lists.newArrayList();
List<MetadataUpdate> updates = Lists.newArrayList();

if (node.hasNonNull(IDENTIFIER)) {
identifier = TableIdentifierParser.fromJson(JsonUtil.get(IDENTIFIER, node));
}

if (node.hasNonNull(REQUIREMENTS)) {
JsonNode requirementsNode = JsonUtil.get(REQUIREMENTS, node);
Preconditions.checkArgument(
requirementsNode.isArray(),
"Cannot parse requirements from non-array: %s",
requirementsNode);
requirementsNode.forEach(req -> requirements.add(UpdateRequirementParser.fromJson(req)));
}

if (node.hasNonNull(UPDATES)) {
JsonNode updatesNode = JsonUtil.get(UPDATES, node);
Preconditions.checkArgument(
updatesNode.isArray(), "Cannot parse metadata updates from non-array: %s", updatesNode);

updatesNode.forEach(update -> updates.add(MetadataUpdateParser.fromJson(update)));
}

tableChanges.add(new UpdateTableRequest(identifier, requirements, updates));
}

return new CommitTransactionRequest(tableChanges);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iceberg.MetadataUpdate;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand All @@ -33,6 +34,7 @@

public class UpdateTableRequest implements RESTRequest {

private TableIdentifier identifier;
private List<UpdateRequirement> requirements;
private List<MetadataUpdate> updates;

Expand All @@ -45,6 +47,14 @@ public UpdateTableRequest(List<UpdateRequirement> requirements, List<MetadataUpd
this.updates = updates;
}

UpdateTableRequest(
TableIdentifier identifier,
List<UpdateRequirement> requirements,
List<MetadataUpdate> updates) {
this(requirements, updates);
this.identifier = identifier;
}

@Override
public void validate() {}

Expand All @@ -56,6 +66,10 @@ public List<MetadataUpdate> updates() {
return updates != null ? updates : ImmutableList.of();
}

public TableIdentifier identifier() {
return identifier;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
Expand Down
Loading

0 comments on commit cdc1a27

Please sign in to comment.