diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java index 6a3dc23fd4c..13ed9387159 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java @@ -18,6 +18,7 @@ import static java.lang.String.format; import io.delta.kernel.exceptions.*; +import io.delta.kernel.internal.actions.DomainMetadata; import io.delta.kernel.types.DataType; import io.delta.kernel.types.StructType; import io.delta.kernel.utils.DataFileStatus; @@ -274,6 +275,34 @@ public static KernelException invalidConfigurationValueException( return new InvalidConfigurationValueException(key, value, helpMessage); } + public static KernelException domainMetadataUnsupported() { + String message = + "Cannot commit DomainMetadata action(s) because the feature 'domainMetadata' " + + "is not supported on this table."; + return new KernelException(message); + } + + public static KernelException duplicateDomainMetadataAction( + String domain, DomainMetadata action1, DomainMetadata action2) { + String message = + String.format( + "Multiple actions detected for domain '%s' in single transaction: '%s' and '%s'. " + + "Only one action per domain is allowed.", + domain, action1.toString(), action2.toString()); + return new KernelException(message); + } + + public static ConcurrentWriteException concurrentDomainMetadataAction( + DomainMetadata domainMetadataAttempt, DomainMetadata winningDomainMetadata) { + String message = + String.format( + "A concurrent writer added a domainMetadata action for the same domain: %s. " + + "No domain-specific conflict resolution is available for this domain. " + + "Attempted domainMetadata: %s. Winning domainMetadata: %s", + domainMetadataAttempt.getDomain(), domainMetadataAttempt, winningDomainMetadata); + return new ConcurrentWriteException(message); + } + /* ------------------------ HELPER METHODS ----------------------------- */ private static String formatTimestamp(long millisSinceEpochUTC) { return new Timestamp(millisSinceEpochUTC).toInstant().toString(); diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java index 67dca98e89a..5dd6cd2d2d4 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java @@ -23,6 +23,7 @@ import io.delta.kernel.engine.CommitCoordinatorClientHandler; import io.delta.kernel.engine.Engine; import io.delta.kernel.internal.actions.CommitInfo; +import io.delta.kernel.internal.actions.DomainMetadata; import io.delta.kernel.internal.actions.Metadata; import io.delta.kernel.internal.actions.Protocol; import io.delta.kernel.internal.fs.Path; @@ -31,6 +32,7 @@ import io.delta.kernel.internal.snapshot.LogSegment; import io.delta.kernel.internal.snapshot.TableCommitCoordinatorClientHandler; import io.delta.kernel.types.StructType; +import java.util.Map; import java.util.Optional; /** Implementation of {@link Snapshot}. */ @@ -83,6 +85,17 @@ public Protocol getProtocol() { return protocol; } + /** + * Get the domain metadata map from the log replay, which lazily loads and replays a history of + * domain metadata actions, resolving them to produce the current state of the domain metadata. + * + * @return A map where the keys are domain names and the values are {@link DomainMetadata} + * objects. + */ + public Map getDomainMetadataMap() { + return logReplay.getDomainMetadataMap(); + } + public CreateCheckpointIterator getCreateCheckpointIterator(Engine engine) { long minFileRetentionTimestampMillis = System.currentTimeMillis() - TOMBSTONE_RETENTION.fromMetadata(metadata); diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java index eeeffc7cc7d..4cc2b84946d 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java @@ -39,6 +39,7 @@ public class TableFeatures { add("columnMapping"); add("typeWidening-preview"); add("typeWidening"); + add(DOMAIN_METADATA_FEATURE_NAME); } }); @@ -57,6 +58,12 @@ public class TableFeatures { } }); + /** The feature name for domain metadata. */ + public static final String DOMAIN_METADATA_FEATURE_NAME = "domainMetadata"; + + /** The minimum writer version required to support domain metadata. */ + public static final int TABLE_FEATURES_MIN_WRITER_VERSION = 7; + //////////////////// // Helper Methods // //////////////////// @@ -93,7 +100,7 @@ public static void validateReadSupportedTable( *
  • protocol writer version 1. *
  • protocol writer version 2 only with appendOnly feature enabled. *
  • protocol writer version 7 with {@code appendOnly}, {@code inCommitTimestamp}, {@code - * columnMapping}, {@code typeWidening} feature enabled. + * columnMapping}, {@code typeWidening}, {@code domainMetadata} feature enabled. * * * @param protocol Table protocol @@ -125,20 +132,8 @@ public static void validateWriteSupportedTable( throw unsupportedWriterProtocol(tablePath, minWriterVersion); case 7: for (String writerFeature : protocol.getWriterFeatures()) { - switch (writerFeature) { - // Only supported writer features as of today in Kernel - case "appendOnly": - break; - case "inCommitTimestamp": - break; - case "columnMapping": - break; - case "typeWidening-preview": - break; - case "typeWidening": - break; - default: - throw unsupportedWriterFeature(tablePath, writerFeature); + if (!SUPPORTED_WRITER_FEATURES.contains(writerFeature)) { + throw unsupportedWriterFeature(tablePath, writerFeature); } } break; diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java index 7b90d2be1d8..267517651c7 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java @@ -32,11 +32,7 @@ import io.delta.kernel.internal.fs.Path; import io.delta.kernel.internal.replay.ConflictChecker; import io.delta.kernel.internal.replay.ConflictChecker.TransactionRebaseState; -import io.delta.kernel.internal.util.Clock; -import io.delta.kernel.internal.util.ColumnMapping; -import io.delta.kernel.internal.util.FileNames; -import io.delta.kernel.internal.util.InCommitTimestampUtils; -import io.delta.kernel.internal.util.VectorUtils; +import io.delta.kernel.internal.util.*; import io.delta.kernel.types.StructType; import io.delta.kernel.utils.CloseableIterable; import io.delta.kernel.utils.CloseableIterator; @@ -73,6 +69,7 @@ public class TransactionImpl implements Transaction { private final Optional setTxnOpt; private final boolean shouldUpdateProtocol; private final Clock clock; + private final List domainMetadatas = new ArrayList<>(); private Metadata metadata; private boolean shouldUpdateMetadata; @@ -120,6 +117,23 @@ public StructType getSchema(Engine engine) { return readSnapshot.getSchema(engine); } + public Optional getSetTxnOpt() { + return setTxnOpt; + } + + /** + * Internal API to add domain metadata actions for this transaction. Visible for testing. + * + * @param domainMetadatas List of domain metadata to be added to the transaction. + */ + public void addDomainMetadatas(List domainMetadatas) { + this.domainMetadatas.addAll(domainMetadatas); + } + + public List getDomainMetadatas() { + return domainMetadatas; + } + @Override public TransactionCommitResult commit(Engine engine, CloseableIterable dataActions) throws ConcurrentWriteException { @@ -221,6 +235,12 @@ private TransactionCommitResult doCommit( } setTxnOpt.ifPresent(setTxn -> metadataActions.add(createTxnSingleAction(setTxn.toRow()))); + // Check for duplicate domain metadata and if the protocol supports + DomainMetadataUtils.validateDomainMetadatas(domainMetadatas, protocol); + + domainMetadatas.forEach( + dm -> metadataActions.add(createDomainMetadataSingleAction(dm.toRow()))); + try (CloseableIterator stageDataIter = dataActions.iterator()) { // Create a new CloseableIterator that will return the metadata actions followed by the // data actions. @@ -265,10 +285,6 @@ public boolean isBlindAppend() { return true; } - public Optional getSetTxnOpt() { - return setTxnOpt; - } - /** * Generates a timestamp which is greater than the commit timestamp of the readSnapshot. This can * result in an additional file read and that this will only happen if ICT is enabled. diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/DomainMetadata.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/DomainMetadata.java new file mode 100644 index 00000000000..77daa147315 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/DomainMetadata.java @@ -0,0 +1,127 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.actions; + +import static io.delta.kernel.internal.util.InternalUtils.requireNonNull; +import static java.util.Objects.requireNonNull; + +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.data.Row; +import io.delta.kernel.internal.data.GenericRow; +import io.delta.kernel.types.BooleanType; +import io.delta.kernel.types.StringType; +import io.delta.kernel.types.StructType; +import java.util.HashMap; +import java.util.Map; + +/** Delta log action representing an `DomainMetadata` action */ +public class DomainMetadata { + /** Full schema of the {@link DomainMetadata} action in the Delta Log. */ + public static final StructType FULL_SCHEMA = + new StructType() + .add("domain", StringType.STRING, false /* nullable */) + .add("configuration", StringType.STRING, false /* nullable */) + .add("removed", BooleanType.BOOLEAN, false /* nullable */); + + public static DomainMetadata fromColumnVector(ColumnVector vector, int rowId) { + if (vector.isNullAt(rowId)) { + return null; + } + return new DomainMetadata( + requireNonNull(vector.getChild(0), rowId, "domain").getString(rowId), + requireNonNull(vector.getChild(1), rowId, "configuration").getString(rowId), + requireNonNull(vector.getChild(2), rowId, "removed").getBoolean(rowId)); + } + + public static DomainMetadata fromRow(Row row) { + if (row == null) { + return null; + } + assert (row.getSchema().equals(FULL_SCHEMA)); + return new DomainMetadata( + requireNonNull(row, 0, "domain").getString(0), + requireNonNull(row, 1, "configuration").getString(1), + requireNonNull(row, 2, "removed").getBoolean(2)); + } + + private final String domain; + private final String configuration; + private final boolean removed; + + /** + * The domain metadata action contains a configuration string for a named metadata domain. Two + * overlapping transactions conflict if they both contain a domain metadata action for the same + * metadata domain. Per-domain conflict resolution logic can be implemented. + * + * @param domain A string used to identify a specific domain. + * @param configuration A string containing configuration for the metadata domain. + * @param removed If it is true it serves as a tombstone to logically delete a {@link + * DomainMetadata} action. + */ + public DomainMetadata(String domain, String configuration, boolean removed) { + this.domain = requireNonNull(domain, "domain is null"); + this.configuration = requireNonNull(configuration, "configuration is null"); + this.removed = removed; + } + + public String getDomain() { + return domain; + } + + public String getConfiguration() { + return configuration; + } + + public boolean isRemoved() { + return removed; + } + + /** + * Encode as a {@link Row} object with the schema {@link DomainMetadata#FULL_SCHEMA}. + * + * @return {@link Row} object with the schema {@link DomainMetadata#FULL_SCHEMA} + */ + public Row toRow() { + Map domainMetadataMap = new HashMap<>(); + domainMetadataMap.put(0, domain); + domainMetadataMap.put(1, configuration); + domainMetadataMap.put(2, removed); + + return new GenericRow(DomainMetadata.FULL_SCHEMA, domainMetadataMap); + } + + @Override + public String toString() { + return String.format( + "DomainMetadata{domain='%s', configuration='%s', removed='%s'}", + domain, configuration, removed); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + DomainMetadata that = (DomainMetadata) obj; + return removed == that.removed + && domain.equals(that.domain) + && configuration.equals(that.configuration); + } + + @Override + public int hashCode() { + return java.util.Objects.hash(domain, configuration, removed); + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/SingleAction.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/SingleAction.java index 94626ea5f7d..1f32226a0ec 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/SingleAction.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/SingleAction.java @@ -18,6 +18,7 @@ import io.delta.kernel.data.Row; import io.delta.kernel.internal.data.GenericRow; import io.delta.kernel.types.StructType; +import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -32,7 +33,9 @@ public class SingleAction { .add("add", AddFile.FULL_SCHEMA) .add("remove", RemoveFile.FULL_SCHEMA) .add("metaData", Metadata.FULL_SCHEMA) - .add("protocol", Protocol.FULL_SCHEMA); + .add("protocol", Protocol.FULL_SCHEMA) + .add("domainMetadata", DomainMetadata.FULL_SCHEMA); + // Once we start supporting updating CDC or domain metadata enabled tables, we should add the // schema for those fields here. @@ -48,7 +51,9 @@ public class SingleAction { // .add("remove", RemoveFile.FULL_SCHEMA) // not needed for blind appends .add("metaData", Metadata.FULL_SCHEMA) .add("protocol", Protocol.FULL_SCHEMA) - .add("commitInfo", CommitInfo.FULL_SCHEMA); + .add("commitInfo", CommitInfo.FULL_SCHEMA) + .add("domainMetadata", DomainMetadata.FULL_SCHEMA); + // Once we start supporting domain metadata/row tracking enabled tables, we should add the // schema for domain metadata fields here. @@ -61,7 +66,8 @@ public class SingleAction { .add("metaData", Metadata.FULL_SCHEMA) .add("protocol", Protocol.FULL_SCHEMA) .add("cdc", new StructType()) - .add("commitInfo", CommitInfo.FULL_SCHEMA); + .add("commitInfo", CommitInfo.FULL_SCHEMA) + .add("domainMetadata", DomainMetadata.FULL_SCHEMA); // Once we start supporting updating CDC or domain metadata enabled tables, we should add the // schema for those fields here. @@ -71,6 +77,7 @@ public class SingleAction { private static final int METADATA_ORDINAL = FULL_SCHEMA.indexOf("metaData"); private static final int PROTOCOL_ORDINAL = FULL_SCHEMA.indexOf("protocol"); private static final int COMMIT_INFO_ORDINAL = FULL_SCHEMA.indexOf("commitInfo"); + private static final int DOMAIN_METADATA_ORDINAL = FULL_SCHEMA.indexOf("domainMetadata"); public static Row createAddFileSingleAction(Row addFile) { Map singleActionValueMap = new HashMap<>(); @@ -102,6 +109,11 @@ public static Row createCommitInfoSingleAction(Row commitInfo) { return new GenericRow(FULL_SCHEMA, singleActionValueMap); } + public static Row createDomainMetadataSingleAction(Row domainMetadata) { + return new GenericRow( + FULL_SCHEMA, Collections.singletonMap(DOMAIN_METADATA_ORDINAL, domainMetadata)); + } + public static Row createTxnSingleAction(Row txn) { Map singleActionValueMap = new HashMap<>(); singleActionValueMap.put(TXN_ORDINAL, txn); diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/metadatadomain/JsonMetadataDomain.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/metadatadomain/JsonMetadataDomain.java new file mode 100644 index 00000000000..0711b215d47 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/metadatadomain/JsonMetadataDomain.java @@ -0,0 +1,100 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.metadatadomain; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.*; +import io.delta.kernel.exceptions.KernelException; +import io.delta.kernel.internal.SnapshotImpl; +import io.delta.kernel.internal.actions.DomainMetadata; +import io.delta.kernel.internal.rowtracking.RowTrackingMetadataDomain; +import java.util.Optional; + +/** + * Abstract class representing a JSON metadata domain, whose configuration string is a JSON + * serialization of a domain object. This class provides methods to serialize and deserialize a + * metadata domain to and from JSON. Concrete implementations, such as {@link + * RowTrackingMetadataDomain}, should extend this class to define a specific metadata domain. + */ +public abstract class JsonMetadataDomain { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + /** + * Deserializes a JSON string into an instance of the specified metadata domain. + * + * @param json the JSON string to deserialize + * @param clazz the concrete class of the metadata domain object to deserialize into + * @param the type of the object + * @return the deserialized object + * @throws KernelException if the JSON string cannot be parsed + */ + protected static T fromJsonConfiguration(String json, Class clazz) { + try { + return OBJECT_MAPPER.readValue(json, clazz); + } catch (JsonProcessingException e) { + throw new KernelException("Could not parse a JSON to a JsonMetadataDomain object", e); + } + } + + /** + * Retrieves the domain metadata from a snapshot for a given domain, and deserializes it into an + * instance of the specified metadata domain class. + * + * @param snapshot the snapshot to read from + * @param clazz the metadata domain class of the object to deserialize into + * @param domainName the name of the domain + * @param the type of the metadata domain object + * @return an Optional containing the deserialized object if the domain metadata is found, + * otherwise an empty Optional + */ + protected static Optional fromSnapshot( + SnapshotImpl snapshot, Class clazz, String domainName) { + return Optional.ofNullable(snapshot.getDomainMetadataMap().get(domainName)) + .map(domainMetadata -> fromJsonConfiguration(domainMetadata.getConfiguration(), clazz)); + } + + /** + * Returns the name of the domain. + * + * @return the domain name + */ + @JsonIgnore + public abstract String getDomainName(); + + /** + * Serializes this object into a JSON string. + * + * @return the JSON string representation of this object + * @throws KernelException if the object cannot be serialized + */ + public String toJsonConfiguration() { + try { + return OBJECT_MAPPER.writeValueAsString(this); + } catch (JsonProcessingException e) { + throw new KernelException("Could not serialize a JsonMetadataDomain object to JSON", e); + } + } + + /** + * Generate a {@link DomainMetadata} action from this metadata domain. + * + * @return the DomainMetadata action instance + */ + public DomainMetadata toDomainMetadata() { + return new DomainMetadata(getDomainName(), toJsonConfiguration(), false); + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ConflictChecker.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ConflictChecker.java index 1846fd22590..33808db8be4 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ConflictChecker.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ConflictChecker.java @@ -15,9 +15,10 @@ */ package io.delta.kernel.internal.replay; +import static io.delta.kernel.internal.DeltaErrors.concurrentDomainMetadataAction; import static io.delta.kernel.internal.DeltaErrors.wrapEngineExceptionThrowsIO; import static io.delta.kernel.internal.TableConfig.IN_COMMIT_TIMESTAMPS_ENABLED; -import static io.delta.kernel.internal.actions.SingleAction.CONFLICT_RESOLUTION_SCHEMA; +import static io.delta.kernel.internal.actions.SingleAction.*; import static io.delta.kernel.internal.util.FileNames.deltaFile; import static io.delta.kernel.internal.util.Preconditions.checkArgument; import static io.delta.kernel.internal.util.Preconditions.checkState; @@ -29,7 +30,9 @@ import io.delta.kernel.exceptions.ConcurrentWriteException; import io.delta.kernel.internal.*; import io.delta.kernel.internal.actions.CommitInfo; +import io.delta.kernel.internal.actions.DomainMetadata; import io.delta.kernel.internal.actions.SetTransaction; +import io.delta.kernel.internal.util.DomainMetadataUtils; import io.delta.kernel.internal.util.FileNames; import io.delta.kernel.utils.CloseableIterator; import io.delta.kernel.utils.FileStatus; @@ -48,6 +51,8 @@ public class ConflictChecker { private static final int METADATA_ORDINAL = CONFLICT_RESOLUTION_SCHEMA.indexOf("metaData"); private static final int TXN_ORDINAL = CONFLICT_RESOLUTION_SCHEMA.indexOf("txn"); private static final int COMMITINFO_ORDINAL = CONFLICT_RESOLUTION_SCHEMA.indexOf("commitInfo"); + private static final int DOMAIN_METADATA_ORDINAL = + CONFLICT_RESOLUTION_SCHEMA.indexOf("domainMetadata"); // Snapshot of the table read by the transaction that encountered the conflict // (a.k.a the losing transaction) @@ -109,6 +114,7 @@ public TransactionRebaseState resolveConflicts(Engine engine) throws ConcurrentW handleProtocol(batch.getColumnVector(PROTOCOL_ORDINAL)); handleMetadata(batch.getColumnVector(METADATA_ORDINAL)); handleTxn(batch.getColumnVector(TXN_ORDINAL)); + handleDomainMetadata(batch.getColumnVector(DOMAIN_METADATA_ORDINAL)); }); } catch (IOException ioe) { throw new UncheckedIOException("Error reading actions from winning commits.", ioe); @@ -191,6 +197,39 @@ private void handleMetadata(ColumnVector metadataVector) { } } + /** + * Checks whether each of the current transaction's {@link DomainMetadata} conflicts with the + * winning transaction at any domain. + * + *
      + *
    1. Accept the current transaction if its set of metadata domains does not overlap with the + * winning transaction's set of metadata domains. + *
    2. Otherwise, fail the current transaction unless each conflicting domain is associated with + * a domain-specific way of resolving the conflict. + *
    + * + * @param domainMetadataVector domainMetadata rows from the winning transactions + */ + private void handleDomainMetadata(ColumnVector domainMetadataVector) { + // Build a domain metadata map from the winning transaction. + Map winningTxnDomainMetadataMap = new HashMap<>(); + DomainMetadataUtils.populateDomainMetadataMap( + domainMetadataVector, winningTxnDomainMetadataMap); + + for (DomainMetadata currentTxnDM : this.transaction.getDomainMetadatas()) { + // For each domain metadata action in the current transaction, check if it has a conflict with + // the winning transaction. + String domainName = currentTxnDM.getDomain(); + DomainMetadata winningTxnDM = winningTxnDomainMetadataMap.get(domainName); + if (winningTxnDM != null) { + // Conflict - check if the conflict can be resolved. + // Currently, we don't have any domain-specific way of resolving the conflict. + // Domain-specific ways of resolving the conflict can be added here (e.g. for Row Tracking). + throw concurrentDomainMetadataAction(currentTxnDM, winningTxnDM); + } + } + } + /** * Get the commit info from the winning transactions. * diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/CreateCheckpointIterator.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/CreateCheckpointIterator.java index d1b3d55ee72..11ff3879aa1 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/CreateCheckpointIterator.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/CreateCheckpointIterator.java @@ -78,6 +78,8 @@ public class CreateCheckpointIterator implements CloseableIterator txnAppIdToVersion = new HashMap<>(); + // Current state of all domains we have seen in {@link DomainMetadata} during the log replay. We + // traverse the log in reverse, so remembering the domains we have seen is enough for creating a + // checkpoint. + private final Set domainSeen = new HashSet<>(); + // Metadata about the checkpoint to store in `_last_checkpoint` file private long numberOfAddActions = 0; // final number of add actions survived in the checkpoint @@ -234,6 +241,11 @@ private boolean prepareNext() { final ColumnVector txnVector = getVector(actionsBatch, TXN_ORDINAL); processTxn(txnVector, selectionVectorBuffer); + // Step 5: Process the domain metadata + final ColumnVector domainMetadataDomainNameVector = + getVector(actionsBatch, DOMAIN_METADATA_DOMAIN_NAME_ORDINAL); + processDomainMetadata(domainMetadataDomainNameVector, selectionVectorBuffer); + Optional selectionVector = Optional.of(createSelectionVector(selectionVectorBuffer, actionsBatch.getSize())); toReturnNext = Optional.of(new FilteredColumnarBatch(actionsBatch, selectionVector)); @@ -352,6 +364,37 @@ private void processTxn(ColumnVector txnVector, boolean[] selectionVectorBuffer) } } + /** + * Processes domain metadata actions during checkpoint creation. During the reverse log replay, + * for each domain, we only keep the first (latest) domain metadata action encountered by + * selecting them in the selection vector, and ignore any older ones for the same domain by + * unselecting them. + * + * @param domainMetadataVector Column vector containing domain names of domain metadata actions. + * @param selectionVectorBuffer The selection vector to attach to the batch to indicate which + * records to write to the checkpoint and which ones not to. + */ + private void processDomainMetadata( + ColumnVector domainMetadataVector, boolean[] selectionVectorBuffer) { + final int vectorSize = domainMetadataVector.getSize(); + for (int rowId = 0; rowId < vectorSize; rowId++) { + if (domainMetadataVector.isNullAt(rowId)) { + continue; // selectionVector will be `false` at rowId by default + } + + final String domain = domainMetadataVector.getString(rowId); + if (domainSeen.contains(domain)) { + // We do a reverse log replay. The latest domainMetadata seen for a given domain wins and + // should be written to the checkpoint. Anything after the first one shouldn't be in + // checkpoint. + unselect(selectionVectorBuffer, rowId); + } else { + select(selectionVectorBuffer, rowId); + domainSeen.add(domain); + } + } + } + private void unselect(boolean[] selectionVectorBuffer, int rowId) { // Just use the java assert (which are enabled in tests) for sanity checks. This should // never happen. Given this is going to be on the hot path, we want to avoid cost in diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java index 170dad9ac1f..f81ce16c99b 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java @@ -27,13 +27,18 @@ import io.delta.kernel.internal.actions.*; import io.delta.kernel.internal.checkpoints.SidecarFile; import io.delta.kernel.internal.fs.Path; +import io.delta.kernel.internal.lang.Lazy; import io.delta.kernel.internal.snapshot.LogSegment; import io.delta.kernel.internal.snapshot.SnapshotHint; +import io.delta.kernel.internal.util.DomainMetadataUtils; import io.delta.kernel.internal.util.Tuple2; import io.delta.kernel.types.StringType; import io.delta.kernel.types.StructType; import io.delta.kernel.utils.CloseableIterator; import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; /** @@ -74,6 +79,10 @@ private static StructType getAddSchema(boolean shouldReadStats) { return shouldReadStats ? AddFile.SCHEMA_WITH_STATS : AddFile.SCHEMA_WITHOUT_STATS; } + /** Read schema when searching for just the domain metadata */ + public static final StructType DOMAIN_METADATA_READ_SCHEMA = + new StructType().add("domainMetadata", DomainMetadata.FULL_SCHEMA); + public static String SIDECAR_FIELD_NAME = "sidecar"; public static String ADDFILE_FIELD_NAME = "add"; public static String REMOVEFILE_FIELD_NAME = "remove"; @@ -109,6 +118,7 @@ public static StructType getAddRemoveReadSchema(boolean shouldReadStats) { private final Path dataPath; private final LogSegment logSegment; private final Tuple2 protocolAndMetadata; + private final Lazy> domainMetadataMap; public LogReplay( Path logPath, @@ -122,6 +132,8 @@ public LogReplay( this.dataPath = dataPath; this.logSegment = logSegment; this.protocolAndMetadata = loadTableProtocolAndMetadata(engine, snapshotHint, snapshotVersion); + // Lazy loading of domain metadata only when needed + this.domainMetadataMap = new Lazy<>(() -> loadDomainMetadataMap(engine)); } ///////////////// @@ -140,6 +152,10 @@ public Optional getLatestTransactionIdentifier(Engine engine, String appli return loadLatestTransactionVersion(engine, applicationId); } + public Map getDomainMetadataMap() { + return domainMetadataMap.get(); + } + /** * Returns an iterator of {@link FilteredColumnarBatch} representing all the active AddFiles in * the table. @@ -296,4 +312,40 @@ private Optional loadLatestTransactionVersion(Engine engine, String applic return Optional.empty(); } + + /** + * Retrieves a map of domainName to {@link DomainMetadata} from the log files. + * + *

    Loading domain metadata requires an additional round of log replay so this is done lazily + * only when domain metadata is requested. We might want to merge this into {@link + * #loadTableProtocolAndMetadata}. + * + * @param engine The engine used to process the log files. + * @return A map where the keys are domain names and the values are the corresponding {@link + * DomainMetadata} objects. + * @throws UncheckedIOException if an I/O error occurs while closing the iterator. + */ + private Map loadDomainMetadataMap(Engine engine) { + try (CloseableIterator reverseIter = + new ActionsIterator( + engine, + logSegment.allLogFilesReversed(), + DOMAIN_METADATA_READ_SCHEMA, + Optional.empty() /* checkpointPredicate */)) { + Map domainMetadataMap = new HashMap<>(); + while (reverseIter.hasNext()) { + final ColumnarBatch columnarBatch = reverseIter.next().getColumnarBatch(); + assert (columnarBatch.getSchema().equals(DOMAIN_METADATA_READ_SCHEMA)); + + final ColumnVector dmVector = columnarBatch.getColumnVector(0); + + // We are performing a reverse log replay. This function ensures that only the first + // encountered domain metadata for each domain is added to the map. + DomainMetadataUtils.populateDomainMetadataMap(dmVector, domainMetadataMap); + } + return domainMetadataMap; + } catch (IOException ex) { + throw new UncheckedIOException("Could not close iterator", ex); + } + } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTrackingMetadataDomain.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTrackingMetadataDomain.java new file mode 100644 index 00000000000..691e18d36b0 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTrackingMetadataDomain.java @@ -0,0 +1,77 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.rowtracking; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.delta.kernel.internal.SnapshotImpl; +import io.delta.kernel.internal.metadatadomain.JsonMetadataDomain; +import java.util.Optional; + +/** Represents the metadata domain for row tracking. */ +public class RowTrackingMetadataDomain extends JsonMetadataDomain { + + /** + * Creates an instance of {@link RowTrackingMetadataDomain} from a JSON configuration string. + * + * @param json the JSON configuration string + * @return an instance of {@link RowTrackingMetadataDomain} + */ + public static RowTrackingMetadataDomain fromJsonConfiguration(String json) { + return JsonMetadataDomain.fromJsonConfiguration(json, RowTrackingMetadataDomain.class); + } + + /** + * Creates an instance of {@link RowTrackingMetadataDomain} from a {@link SnapshotImpl}. + * + * @param snapshot the snapshot instance + * @return an {@link Optional} containing the {@link RowTrackingMetadataDomain} if present + */ + public static Optional fromSnapshot(SnapshotImpl snapshot) { + return JsonMetadataDomain.fromSnapshot(snapshot, RowTrackingMetadataDomain.class, DOMAIN_NAME); + } + + public static final String DOMAIN_NAME = "delta.rowTracking"; + + /** Default value for row ID high watermark when it is missing in the table */ + public static final long MISSING_ROW_ID_HIGH_WATERMARK = -1L; + + /** The highest assigned fresh row id for the table */ + private long rowIdHighWaterMark; + + /** + * Constructs a RowTrackingMetadataDomain with the specified row ID high water mark. + * + * @param rowIdHighWaterMark the row ID high water mark + */ + @JsonCreator + public RowTrackingMetadataDomain(@JsonProperty("rowIdHighWaterMark") long rowIdHighWaterMark) { + this.rowIdHighWaterMark = rowIdHighWaterMark; + } + + @Override + public String getDomainName() { + return DOMAIN_NAME; + } + + public long getRowIdHighWaterMark() { + return rowIdHighWaterMark; + } + + public void setRowIdHighWaterMark(long rowIdHighWaterMark) { + this.rowIdHighWaterMark = rowIdHighWaterMark; + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/DomainMetadataUtils.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/DomainMetadataUtils.java new file mode 100644 index 00000000000..97a0d85aa73 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/DomainMetadataUtils.java @@ -0,0 +1,102 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.kernel.internal.util; + +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.internal.DeltaErrors; +import io.delta.kernel.internal.TableFeatures; +import io.delta.kernel.internal.actions.DomainMetadata; +import io.delta.kernel.internal.actions.Protocol; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class DomainMetadataUtils { + + private DomainMetadataUtils() { + // Empty private constructor to prevent instantiation + } + + /** + * Populate the map of domain metadata from actions. When encountering duplicate domain metadata + * actions for the same domain, this method preserves the first seen entry and skips subsequent + * entries. This behavior is especially useful for log replay as we want to ensure that earlier + * domain metadata entries take precedence over later ones. + * + * @param domainMetadataActionVector A {@link ColumnVector} containing the domain metadata rows + * @param domainMetadataMap The existing map to be populated with domain metadata entries, where + * the key is the domain name and the value is the domain metadata + */ + public static void populateDomainMetadataMap( + ColumnVector domainMetadataActionVector, Map domainMetadataMap) { + final int vectorSize = domainMetadataActionVector.getSize(); + for (int rowId = 0; rowId < vectorSize; rowId++) { + DomainMetadata dm = DomainMetadata.fromColumnVector(domainMetadataActionVector, rowId); + if (dm != null && !domainMetadataMap.containsKey(dm.getDomain())) { + // We only add the domain metadata if its domain name not already present in the map + domainMetadataMap.put(dm.getDomain(), dm); + } + } + } + + /** + * Checks if the table protocol supports the "domainMetadata" writer feature. + * + * @param protocol the protocol to check + * @return true if the "domainMetadata" feature is supported, false otherwise + */ + public static boolean isDomainMetadataSupported(Protocol protocol) { + List writerFeatures = protocol.getWriterFeatures(); + if (writerFeatures == null) { + return false; + } + return writerFeatures.contains(TableFeatures.DOMAIN_METADATA_FEATURE_NAME) + && protocol.getMinWriterVersion() >= TableFeatures.TABLE_FEATURES_MIN_WRITER_VERSION; + } + + /** + * Validates the list of domain metadata actions before committing them. It ensures that + * + *

      + *
    1. domain metadata actions are only present when supported by the table protocol + *
    2. there are no duplicate domain metadata actions for the same domain in the provided + * actions. + *
    + * + * @param domainMetadataActions The list of domain metadata actions to validate + * @param protocol The protocol to check for domain metadata support + */ + public static void validateDomainMetadatas( + List domainMetadataActions, Protocol protocol) { + if (domainMetadataActions.isEmpty()) return; + + // The list of domain metadata is non-empty, so the protocol must support domain metadata + if (!isDomainMetadataSupported(protocol)) { + throw DeltaErrors.domainMetadataUnsupported(); + } + + Map domainMetadataMap = new HashMap<>(); + for (DomainMetadata domainMetadata : domainMetadataActions) { + String domain = domainMetadata.getDomain(); + if (domainMetadataMap.containsKey(domain)) { + throw DeltaErrors.duplicateDomainMetadataAction( + domain, domainMetadataMap.get(domain), domainMetadata); + } + domainMetadataMap.put(domain, domainMetadata); + } + } +} diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/TableFeaturesSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/TableFeaturesSuite.scala index 2dfb034ea99..a98d39840d0 100644 --- a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/TableFeaturesSuite.scala +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/TableFeaturesSuite.scala @@ -68,7 +68,8 @@ class TableFeaturesSuite extends AnyFunSuite { checkSupported(createTestProtocol(minWriterVersion = 7)) } - Seq("appendOnly", "inCommitTimestamp", "columnMapping", "typeWidening-preview", "typeWidening") + Seq("appendOnly", "inCommitTimestamp", "columnMapping", "typeWidening-preview", "typeWidening", + "domainMetadata") .foreach { supportedWriterFeature => test(s"validateWriteSupported: protocol 7 with $supportedWriterFeature") { checkSupported(createTestProtocol(minWriterVersion = 7, supportedWriterFeature)) @@ -77,7 +78,7 @@ class TableFeaturesSuite extends AnyFunSuite { Seq("invariants", "checkConstraints", "generatedColumns", "allowColumnDefaults", "changeDataFeed", "identityColumns", "deletionVectors", "rowTracking", "timestampNtz", - "domainMetadata", "v2Checkpoint", "icebergCompatV1", "icebergCompatV2", "clustering", + "v2Checkpoint", "icebergCompatV1", "icebergCompatV2", "clustering", "vacuumProtocolCheck").foreach { unsupportedWriterFeature => test(s"validateWriteSupported: protocol 7 with $unsupportedWriterFeature") { checkUnsupported(createTestProtocol(minWriterVersion = 7, unsupportedWriterFeature)) diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DomainMetadataSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DomainMetadataSuite.scala new file mode 100644 index 00000000000..96cfc515ff8 --- /dev/null +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DomainMetadataSuite.scala @@ -0,0 +1,523 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.defaults + +import io.delta.kernel._ +import io.delta.kernel.defaults.internal.parquet.ParquetSuiteBase +import io.delta.kernel.engine.Engine +import io.delta.kernel.exceptions._ +import io.delta.kernel.internal.{SnapshotImpl, TableImpl, TransactionBuilderImpl, TransactionImpl} +import io.delta.kernel.internal.actions.{DomainMetadata, Protocol, SingleAction} +import io.delta.kernel.internal.util.Utils.toCloseableIterator +import io.delta.kernel.internal.rowtracking.RowTrackingMetadataDomain +import io.delta.kernel.utils.CloseableIterable.{emptyIterable, inMemoryIterable} +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.delta.DeltaLog +import org.apache.spark.sql.delta.actions.{DomainMetadata => SparkDomainMetadata} +import org.apache.spark.sql.delta.test.DeltaTestImplicits.OptimisticTxnTestHelper + +import java.util.Collections +import scala.collection.JavaConverters._ +import scala.collection.immutable.Seq + +class DomainMetadataSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBase { + + private def assertDomainMetadata( + snapshot: SnapshotImpl, + expectedValue: Map[String, DomainMetadata]): Unit = { + assert(expectedValue === snapshot.getDomainMetadataMap.asScala) + } + + private def assertDomainMetadata( + table: Table, + engine: Engine, + expectedValue: Map[String, DomainMetadata]): Unit = { + // Get the latest snapshot of the table + val snapshot = table.getLatestSnapshot(engine).asInstanceOf[SnapshotImpl] + assertDomainMetadata(snapshot, expectedValue) + } + + private def createTxnWithDomainMetadatas( + engine: Engine, + tablePath: String, + domainMetadatas: Seq[DomainMetadata]): Transaction = { + + val txnBuilder = createWriteTxnBuilder(TableImpl.forPath(engine, tablePath)) + .asInstanceOf[TransactionBuilderImpl] + + val txn = txnBuilder.build(engine).asInstanceOf[TransactionImpl] + txn.addDomainMetadatas(domainMetadatas.asJava) + txn + } + + private def commitDomainMetadataAndVerify( + engine: Engine, + tablePath: String, + domainMetadatas: Seq[DomainMetadata], + expectedValue: Map[String, DomainMetadata]): Unit = { + // Create the transaction with domain metadata and commit + val txn = createTxnWithDomainMetadatas(engine, tablePath, domainMetadatas) + txn.commit(engine, emptyIterable()) + + // Verify the final state includes the expected domain metadata + val table = Table.forPath(engine, tablePath) + assertDomainMetadata(table, engine, expectedValue) + } + + private def setDomainMetadataSupport(engine: Engine, tablePath: String): Unit = { + val protocol = new Protocol( + 1, // minReaderVersion + 7, // minWriterVersion + Collections.emptyList(), // readerFeatures + Seq("domainMetadata").asJava // writerFeatures + ) + + val protocolAction = SingleAction.createProtocolSingleAction(protocol.toRow) + val txn = createTxn(engine, tablePath, isNewTable = false, testSchema, Seq.empty) + txn.commit(engine, inMemoryIterable(toCloseableIterator(Seq(protocolAction).asJava.iterator()))) + } + + private def createTableWithDomainMetadataSupported(engine: Engine, tablePath: String): Unit = { + // Create an empty table + createTxn(engine, tablePath, isNewTable = true, testSchema, Seq.empty) + .commit(engine, emptyIterable()) + + // Set writer version and writer feature to support domain metadata + setDomainMetadataSupport(engine, tablePath) + } + + private def validateDomainMetadataConflictResolution( + engine: Engine, + tablePath: String, + currentTxn1DomainMetadatas: Seq[DomainMetadata], + winningTxn2DomainMetadatas: Seq[DomainMetadata], + winningTxn3DomainMetadatas: Seq[DomainMetadata], + expectedConflict: Boolean): Unit = { + // Create table with domain metadata support + createTableWithDomainMetadataSupported(engine, tablePath) + val table = Table.forPath(engine, tablePath) + + /** + * Txn1: i.e. the current transaction that comes later than winning transactions. + * Txn2: i.e. the winning transaction that was committed first. + * Txn3: i.e. the winning transaction that was committed secondly. + * + * Note tx is the timestamp. + * + * t1 ------------------------ Txn1 starts. + * t2 ------- Txn2 starts. + * t3 ------- Txn2 commits. + * t4 ------- Txn3 starts. + * t5 ------- Txn3 commits. + * t6 ------------------------ Txn1 commits (SUCCESS or FAIL). + */ + val txn1 = createTxnWithDomainMetadatas(engine, tablePath, currentTxn1DomainMetadatas) + + val txn2 = createTxnWithDomainMetadatas(engine, tablePath, winningTxn2DomainMetadatas) + txn2.commit(engine, emptyIterable()) + + val txn3 = createTxnWithDomainMetadatas(engine, tablePath, winningTxn3DomainMetadatas) + txn3.commit(engine, emptyIterable()) + + if (expectedConflict) { + // We expect the commit of txn1 to fail because of the conflicting DM actions + val ex = intercept[KernelException] { + txn1.commit(engine, emptyIterable()) + } + assert( + ex.getMessage.contains( + "A concurrent writer added a domainMetadata action for the same domain" + ) + ) + } else { + // We expect the commit of txn1 to succeed + txn1.commit(engine, emptyIterable()) + // Verify the final state includes merged domain metadata + val expectedMetadata = + (winningTxn2DomainMetadatas ++ winningTxn3DomainMetadatas ++ currentTxn1DomainMetadatas) + .groupBy(_.getDomain) + .mapValues(_.last) + assertDomainMetadata(table, engine, expectedMetadata) + } + } + + test("create table w/o domain metadata") { + withTempDirAndEngine { (tablePath, engine) => + val table = Table.forPath(engine, tablePath) + + // Create an empty table + createTxn(engine, tablePath, isNewTable = true, testSchema, Seq.empty) + .commit(engine, emptyIterable()) + + // Verify that the table doesn't have any domain metadata + assertDomainMetadata(table, engine, Map.empty) + } + } + + test("table w/o domain metadata support fails domain metadata commits") { + withTempDirAndEngine { (tablePath, engine) => + // Create an empty table + // Its minWriterVersion is 2 and doesn't have 'domainMetadata' in its writerFeatures + createTxn(engine, tablePath, isNewTable = true, testSchema, Seq.empty) + .commit(engine, emptyIterable()) + + val dm1 = new DomainMetadata("domain1", "", false) + val txn1 = createTxnWithDomainMetadatas(engine, tablePath, List(dm1)) + + // We expect the commit to fail because the table doesn't support domain metadata + val e = intercept[KernelException] { + txn1.commit(engine, emptyIterable()) + } + assert( + e.getMessage + .contains( + "Cannot commit DomainMetadata action(s) because the feature 'domainMetadata' " + + "is not supported on this table." + ) + ) + + // Set writer version and writer feature to support domain metadata + setDomainMetadataSupport(engine, tablePath) + + // Commit domain metadata again and expect success + val txn2 = createTxnWithDomainMetadatas(engine, tablePath, List(dm1)) + txn2.commit(engine, emptyIterable()) + } + } + + test("multiple DomainMetadatas for the same domain should fail in single transaction") { + withTempDirAndEngine { (tablePath, engine) => + createTableWithDomainMetadataSupported(engine, tablePath) + + val dm1_1 = new DomainMetadata("domain1", """{"key1":"1"}""", false) + val dm2 = new DomainMetadata("domain2", "", false) + val dm1_2 = new DomainMetadata("domain1", """{"key1":"10"}"""", false) + + val txn = createTxnWithDomainMetadatas(engine, tablePath, List(dm1_1, dm2, dm1_2)) + + val e = intercept[KernelException] { + txn.commit(engine, emptyIterable()) + } + assert( + e.getMessage.contains( + "Multiple actions detected for domain 'domain1' in single transaction" + ) + ) + } + } + + test("latest domain metadata overwriting existing ones") { + withTempDirAndEngine { (tablePath, engine) => + createTableWithDomainMetadataSupported(engine, tablePath) + + val dm1 = new DomainMetadata("domain1", """{"key1":"1"}, {"key2":"2"}""", false) + val dm2 = new DomainMetadata("domain2", "", false) + val dm3 = new DomainMetadata("domain3", """{"key3":"3"}""", false) + + val dm1_2 = new DomainMetadata("domain1", """{"key1":"10"}""", false) + val dm3_2 = new DomainMetadata("domain3", """{"key3":"30"}""", false) + + Seq( + (Seq(dm1), Map("domain1" -> dm1)), + (Seq(dm2, dm3, dm1_2), Map("domain1" -> dm1_2, "domain2" -> dm2, "domain3" -> dm3)), + (Seq(dm3_2), Map("domain1" -> dm1_2, "domain2" -> dm2, "domain3" -> dm3_2)) + ).foreach { + case (domainMetadatas, expectedValue) => + commitDomainMetadataAndVerify(engine, tablePath, domainMetadatas, expectedValue) + } + } + } + + test("domain metadata persistence across log replay") { + withTempDirAndEngine { (tablePath, engine) => + createTableWithDomainMetadataSupported(engine, tablePath) + + val dm1 = new DomainMetadata("domain1", """{"key1":"1"}, {"key2":"2"}""", false) + val dm2 = new DomainMetadata("domain2", "", false) + + commitDomainMetadataAndVerify( + engine, + tablePath, + domainMetadatas = Seq(dm1, dm2), + expectedValue = Map("domain1" -> dm1, "domain2" -> dm2) + ) + + // Restart the table and verify the domain metadata + val table2 = Table.forPath(engine, tablePath) + assertDomainMetadata(table2, engine, Map("domain1" -> dm1, "domain2" -> dm2)) + } + } + + test("only the latest domain metadata per domain is stored in checkpoints") { + withTempDirAndEngine { (tablePath, engine) => + val table = Table.forPath(engine, tablePath) + createTableWithDomainMetadataSupported(engine, tablePath) + + val dm1 = new DomainMetadata("domain1", """{"key1":"1"}, {"key2":"2"}""", false) + val dm2 = new DomainMetadata("domain2", "", false) + val dm3 = new DomainMetadata("domain3", """{"key3":"3"}""", false) + val dm1_2 = new DomainMetadata("domain1", """{"key1":"10"}""", false) + val dm3_2 = new DomainMetadata("domain3", """{"key3":"30"}""", true) + + Seq( + (Seq(dm1), Map("domain1" -> dm1)), + (Seq(dm2), Map("domain1" -> dm1, "domain2" -> dm2)), + (Seq(dm3), Map("domain1" -> dm1, "domain2" -> dm2, "domain3" -> dm3)), + (Seq(dm1_2, dm3_2), Map("domain1" -> dm1_2, "domain2" -> dm2, "domain3" -> dm3_2)) + ).foreach { + case (domainMetadatas, expectedValue) => + commitDomainMetadataAndVerify(engine, tablePath, domainMetadatas, expectedValue) + } + + // Checkpoint the table + val latestVersion = table.getLatestSnapshot(engine).getVersion(engine) + table.checkpoint(engine, latestVersion) + + // Verify that only the latest domain metadata is persisted in the checkpoint + val table2 = Table.forPath(engine, tablePath) + assertDomainMetadata( + table2, + engine, + Map("domain1" -> dm1_2, "domain2" -> dm2, "domain3" -> dm3_2) + ) + } + } + + test("Conflict resolution - one of three concurrent txns has DomainMetadata") { + withTempDirAndEngine { (tablePath, engine) => + /** + * Txn1: include DomainMetadata action. + * Txn2: does NOT include DomainMetadata action. + * Txn3: does NOT include DomainMetadata action. + * + * t1 ------------------------ Txn1 starts. + * t2 ------- Txn2 starts. + * t3 ------- Txn2 commits. + * t4 ------- Txn3 starts. + * t5 ------- Txn3 commits. + * t6 ------------------------ Txn1 commits (SUCCESS). + */ + val dm1 = new DomainMetadata("domain1", "", false) + + validateDomainMetadataConflictResolution( + engine, + tablePath, + currentTxn1DomainMetadatas = Seq(dm1), + winningTxn2DomainMetadatas = Seq.empty, + winningTxn3DomainMetadatas = Seq.empty, + expectedConflict = false + ) + } + } + + test( + "Conflict resolution - three concurrent txns have DomainMetadata w/o conflicting domains" + ) { + withTempDirAndEngine { (tablePath, engine) => + /** + * Txn1: include DomainMetadata action for "domain1". + * Txn2: include DomainMetadata action for "domain2". + * Txn3: include DomainMetadata action for "domain3". + * + * t1 ------------------------ Txn1 starts. + * t2 ------- Txn2 starts. + * t3 ------- Txn2 commits. + * t4 ------- Txn3 starts. + * t5 ------- Txn3 commits. + * t6 ------------------------ Txn1 commits (SUCCESS). + */ + val dm1 = new DomainMetadata("domain1", "", false) + val dm2 = new DomainMetadata("domain2", "", false) + val dm3 = new DomainMetadata("domain3", "", false) + + validateDomainMetadataConflictResolution( + engine, + tablePath, + currentTxn1DomainMetadatas = Seq(dm1), + winningTxn2DomainMetadatas = Seq(dm2), + winningTxn3DomainMetadatas = Seq(dm3), + expectedConflict = false + ) + } + } + + test( + "Conflict resolution - three concurrent txns have DomainMetadata w/ conflicting domains" + ) { + withTempDirAndEngine { (tablePath, engine) => + /** + * Txn1: include DomainMetadata action for "domain1". + * Txn2: include DomainMetadata action for "domain2". + * Txn3: include DomainMetadata action for "domain1". + * + * t1 ------------------------ Txn1 starts. + * t2 ------- Txn2 starts. + * t3 ------- Txn2 commits. + * t4 ------- Txn3 starts. + * t5 ------- Txn3 commits. + * t6 ------------------------ Txn1 commits (FAIL). + */ + val dm1 = new DomainMetadata("domain1", "", false) + val dm2 = new DomainMetadata("domain2", "", false) + val dm3 = new DomainMetadata("domain1", "", false) + + validateDomainMetadataConflictResolution( + engine, + tablePath, + currentTxn1DomainMetadatas = Seq(dm1), + winningTxn2DomainMetadatas = Seq(dm2), + winningTxn3DomainMetadatas = Seq(dm3), + expectedConflict = true + ) + } + } + + test( + "Conflict resolution - three concurrent txns have DomainMetadata w/ conflict domains - 2" + ) { + withTempDirAndEngine { (tablePath, engine) => + /** + * Txn1: include DomainMetadata action for "domain1". + * Txn2: include DomainMetadata action for "domain1". + * Txn3: include DomainMetadata action for "domain2". + * + * t1 ------------------------ Txn1 starts. + * t2 ------- Txn2 starts. + * t3 ------- Txn2 commits. + * t4 ------- Txn3 starts. + * t5 ------- Txn3 commits. + * t6 ------------------------ Txn1 commits (FAIL). + */ + val dm1 = new DomainMetadata("domain1", "", false) + val dm2 = new DomainMetadata("domain1", "", false) + val dm3 = new DomainMetadata("domain2", "", false) + + validateDomainMetadataConflictResolution( + engine, + tablePath, + currentTxn1DomainMetadatas = Seq(dm1), + winningTxn2DomainMetadatas = Seq(dm2), + winningTxn3DomainMetadatas = Seq(dm3), + expectedConflict = true + ) + } + } + + test("Integration test - create a table with Spark and read its domain metadata using Kernel") { + withTempDir(dir => { + val tbl = "tbl" + withTable(tbl) { + val tablePath = dir.getCanonicalPath + // Create table with domain metadata enabled + spark.sql(s"CREATE TABLE $tbl (id LONG) USING delta LOCATION '$tablePath'") + spark.sql( + s"ALTER TABLE $tbl SET TBLPROPERTIES(" + + s"'delta.feature.domainMetadata' = 'enabled'," + + s"'delta.checkpointInterval' = '3')" + ) + + // Manually commit domain metadata actions. This will create 02.json + val deltaLog = DeltaLog.forTable(spark, new Path(tablePath)) + deltaLog + .startTransaction() + .commitManually( + List( + SparkDomainMetadata("testDomain1", "{\"key1\":\"1\"}", removed = false), + SparkDomainMetadata("testDomain2", "", removed = false), + SparkDomainMetadata("testDomain3", "", removed = false) + ): _* + ) + + // This will create 03.json and 03.checkpoint + spark.range(0, 2).write.format("delta").mode("append").save(tablePath) + + // Manually commit domain metadata actions. This will create 04.json + deltaLog + .startTransaction() + .commitManually( + List( + SparkDomainMetadata("testDomain1", "{\"key1\":\"10\"}", removed = false), + SparkDomainMetadata("testDomain2", "", removed = true) + ): _* + ) + + // Use Delta Kernel to read the table's domain metadata and verify the result. + // We will need to read 1 checkpoint file and 1 log file to replay the table. + // The state of the domain metadata should be: + // testDomain1: "{\"key1\":\"10\"}", removed = false (from 03.checkpoint) + // testDomain2: "", removed = true (from 03.checkpoint) + // testDomain3: "", removed = false (from 04.json) + + val dm1 = new DomainMetadata("testDomain1", """{"key1":"10"}""", false) + val dm2 = new DomainMetadata("testDomain2", "", true) + val dm3 = new DomainMetadata("testDomain3", "", false) + + val snapshot = latestSnapshot(tablePath).asInstanceOf[SnapshotImpl] + assertDomainMetadata( + snapshot, + Map("testDomain1" -> dm1, "testDomain2" -> dm2, "testDomain3" -> dm3) + ) + } + }) + } + + test("RowTrackingMetadataDomain is serializable and deserializable") { + withTempDirAndEngine((tablePath, engine) => { + // Create a RowTrackingMetadataDomain + val rowTrackingMetadataDomain = new RowTrackingMetadataDomain(10) + + // Generate a DomainMetadata action from it and verify. Its configuration should be + // a JSON serialization of the rowTrackingMetadataDomain + val dm = rowTrackingMetadataDomain.toDomainMetadata + assert(dm.getDomain === rowTrackingMetadataDomain.getDomainName) + assert(dm.getConfiguration === """{"rowIdHighWaterMark":10}""") + + // Verify the deserialization from DomainMetadata action into concrete domain object + val deserializedDomain = RowTrackingMetadataDomain.fromJsonConfiguration(dm.getConfiguration) + assert(deserializedDomain.getDomainName === rowTrackingMetadataDomain.getDomainName) + assert( + rowTrackingMetadataDomain.getRowIdHighWaterMark + === deserializedDomain.getRowIdHighWaterMark + ) + + // Verify the domainMetadata can be committed and read back + createTableWithDomainMetadataSupported(engine, tablePath) + // Commit the domain metadata and verify + commitDomainMetadataAndVerify( + engine, + tablePath, + domainMetadatas = Seq(dm), + expectedValue = Map(rowTrackingMetadataDomain.getDomainName -> dm) + ) + + // Read the domain metadata back from the table snapshot + val table = Table.forPath(engine, tablePath) + val snapshot = table.getLatestSnapshot(engine).asInstanceOf[SnapshotImpl] + val rowTrackingMetadataDomainFromSnapshot = + RowTrackingMetadataDomain.fromSnapshot(snapshot).get + + // Verify the domain metadata read back from the snapshot + assert( + rowTrackingMetadataDomain.getDomainName === + rowTrackingMetadataDomainFromSnapshot.getDomainName + ) + assert( + rowTrackingMetadataDomain.getRowIdHighWaterMark === + rowTrackingMetadataDomainFromSnapshot.getRowIdHighWaterMark + ) + }) + } +}