Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel] Assign base row ID to AddFile actions #3894

Open
wants to merge 37 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
94260ef
Added Domain Metadata support to Delta Kernel
qiyuandong-db Oct 31, 2024
7612613
Lazily load domain metadata during log replay.
qiyuandong-db Oct 31, 2024
2ac3985
Use an iterator to wrap the data action iterator for DM duplicate det…
qiyuandong-db Oct 31, 2024
807c896
Update kernel/kernel-api/src/main/java/io/delta/kernel/internal/Delta…
qiyuandong-db Oct 31, 2024
84d4ae6
Improve error message
qiyuandong-db Oct 31, 2024
509bc27
Rename a unit test
qiyuandong-db Oct 31, 2024
83f3b1e
Don't allow duplicate DMs when reading actions from the winning txn d…
qiyuandong-db Nov 1, 2024
7d7032e
Update error messages in the tests
qiyuandong-db Nov 1, 2024
dadc5a6
Fix typos in comments
qiyuandong-db Nov 3, 2024
0c188c7
Add an integration test with spark
qiyuandong-db Nov 4, 2024
7948fdf
Fix the JavadocGenerationFailed error in the Delta Kernel CI job.
qiyuandong-db Nov 4, 2024
6195e7f
Resolve PR comments.
qiyuandong-db Nov 5, 2024
fd06f6d
Update util method extractDomainMetadataMap.
qiyuandong-db Nov 5, 2024
3e30a41
Remove blank lines.
qiyuandong-db Nov 5, 2024
cec85cf
Address PR comments
qiyuandong-db Nov 6, 2024
8edcc9a
Address PR comments
qiyuandong-db Nov 7, 2024
baca1c5
Move domain metadata actions out of dataActions
qiyuandong-db Nov 7, 2024
7e0a172
Fix javafmt
qiyuandong-db Nov 7, 2024
127de8c
Use a set to check for unsupported writer features
qiyuandong-db Nov 10, 2024
c5f3672
Resolve PR comments
qiyuandong-db Nov 11, 2024
b2d6546
Move golden table to kernel tests.
qiyuandong-db Nov 11, 2024
cd7ddeb
Use getTestResourceFilePath in test to get golden table path.
qiyuandong-db Nov 11, 2024
68c77d8
Resolve git comments
qiyuandong-db Nov 12, 2024
5afec36
Move resolveDomainMetadataConflict into handleDomainMetadata
qiyuandong-db Nov 12, 2024
e358878
Move addDomainMetadata from TransactionImpl to TransactionBuilderImpl
qiyuandong-db Nov 12, 2024
4f56a2f
Use SUPPORTED_WRITER_FEATURES in validateWriteSupportedTable
qiyuandong-db Nov 12, 2024
189ec75
Remove the duplicate check when reading winning txn's DM. Change extr…
qiyuandong-db Nov 13, 2024
a4e3104
Fix nit
qiyuandong-db Nov 13, 2024
b0e4a65
Rename populateDomainMetadataMap
qiyuandong-db Nov 13, 2024
f89199c
Remove unused imports
qiyuandong-db Nov 13, 2024
a5c5726
Resolve PR comments
qiyuandong-db Nov 15, 2024
a893fa0
Create the integration test table using Spark
qiyuandong-db Nov 15, 2024
73faf8f
Move addDomainMetadata from TransactionBuilderImpl to TransactionImpl
qiyuandong-db Nov 20, 2024
9ce9b56
Update domainMetadataUnsupported error message
qiyuandong-db Nov 20, 2024
60a29dd
Update the error message in the test
qiyuandong-db Nov 20, 2024
b32c56a
Add JsonMetadataDomain & RowTrackingMetadataDomain
qiyuandong-db Nov 14, 2024
75c0005
Assign base row ID to AddFile actions
qiyuandong-db Nov 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static java.lang.String.format;

import io.delta.kernel.exceptions.*;
import io.delta.kernel.internal.actions.DomainMetadata;
import io.delta.kernel.types.DataType;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.DataFileStatus;
Expand Down Expand Up @@ -274,6 +275,40 @@ public static KernelException invalidConfigurationValueException(
return new InvalidConfigurationValueException(key, value, helpMessage);
}

public static KernelException domainMetadataUnsupported() {
String message =
"Cannot commit DomainMetadata action(s) because the feature 'domainMetadata' "
+ "is not supported on this table.";
return new KernelException(message);
}

public static KernelException duplicateDomainMetadataAction(
String domain, DomainMetadata action1, DomainMetadata action2) {
String message =
String.format(
"Multiple actions detected for domain '%s' in single transaction: '%s' and '%s'. "
+ "Only one action per domain is allowed.",
domain, action1.toString(), action2.toString());
return new KernelException(message);
}

public static ConcurrentWriteException concurrentDomainMetadataAction(
DomainMetadata domainMetadataAttempt, DomainMetadata winningDomainMetadata) {
String message =
String.format(
"A concurrent writer added a domainMetadata action for the same domain: %s. "
+ "No domain-specific conflict resolution is available for this domain. "
+ "Attempted domainMetadata: %s. Winning domainMetadata: %s",
domainMetadataAttempt.getDomain(), domainMetadataAttempt, winningDomainMetadata);
return new ConcurrentWriteException(message);
}

public static KernelException rowIDAssignmentWithoutStats() {
return new KernelException(
"Cannot assign baseRowId to add action. "
+ "The number of records in this data file is missing.");
}
Comment on lines +307 to +310
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed, this can be an issue: if connectors don't populate numRecords stats in the addFile action that are committed, the commit will fail if row tracking is supported (note that this is still better than today where we always fail in that case since we don't support row tracking.

Question more for kernel folks: do we some guarantee or requirement that connectors populate numRecords? Are connectors that implement writes today (if any) populating numRecords?

In any case, I would word the exception so that it puts the burden more on the connector, for example:
"All add actions must have statistics that include the number of records when writing to a Delta table with the RowTracking table feature enabled."


/* ------------------------ HELPER METHODS ----------------------------- */
private static String formatTimestamp(long millisSinceEpochUTC) {
return new Timestamp(millisSinceEpochUTC).toInstant().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.delta.kernel.engine.CommitCoordinatorClientHandler;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.internal.actions.CommitInfo;
import io.delta.kernel.internal.actions.DomainMetadata;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.fs.Path;
Expand All @@ -31,6 +32,7 @@
import io.delta.kernel.internal.snapshot.LogSegment;
import io.delta.kernel.internal.snapshot.TableCommitCoordinatorClientHandler;
import io.delta.kernel.types.StructType;
import java.util.Map;
import java.util.Optional;

/** Implementation of {@link Snapshot}. */
Expand Down Expand Up @@ -83,6 +85,17 @@ public Protocol getProtocol() {
return protocol;
}

/**
* Get the domain metadata map from the log replay, which lazily loads and replays a history of
* domain metadata actions, resolving them to produce the current state of the domain metadata.
*
* @return A map where the keys are domain names and the values are {@link DomainMetadata}
* objects.
*/
public Map<String, DomainMetadata> getDomainMetadataMap() {
return logReplay.getDomainMetadataMap();
}

public CreateCheckpointIterator getCreateCheckpointIterator(Engine engine) {
long minFileRetentionTimestampMillis =
System.currentTimeMillis() - TOMBSTONE_RETENTION.fromMetadata(metadata);
Expand Down Expand Up @@ -115,6 +128,15 @@ public Path getDataPath() {
return dataPath;
}

/**
* Returns the log replay object for this snapshot. Visible for testing.
*
* @return the {@link LogReplay} object
*/
public LogReplay getLogReplay() {
return logReplay;
}

/**
* Returns the timestamp of the latest commit of this snapshot. For an uninitialized snapshot,
* this returns -1.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class TableFeatures {
add("columnMapping");
add("typeWidening-preview");
add("typeWidening");
add(DOMAIN_METADATA_FEATURE_NAME);
add(ROW_TRACKING_FEATURE_NAME);
}
});

Expand All @@ -57,6 +59,15 @@ public class TableFeatures {
}
});

/** The feature name for domain metadata. */
public static final String DOMAIN_METADATA_FEATURE_NAME = "domainMetadata";

/** The feature name for row tracking. */
public static final String ROW_TRACKING_FEATURE_NAME = "rowTracking";

/** The minimum writer version required to support domain metadata. */
public static final int TABLE_FEATURES_MIN_WRITER_VERSION = 7;

////////////////////
// Helper Methods //
////////////////////
Expand Down Expand Up @@ -93,7 +104,8 @@ public static void validateReadSupportedTable(
* <li>protocol writer version 1.
* <li>protocol writer version 2 only with appendOnly feature enabled.
* <li>protocol writer version 7 with {@code appendOnly}, {@code inCommitTimestamp}, {@code
* columnMapping}, {@code typeWidening} feature enabled.
* columnMapping}, {@code typeWidening}, {@code domainMetadata}, {@code rowTracking} feature
* enabled.
* </ul>
*
* @param protocol Table protocol
Expand Down Expand Up @@ -125,20 +137,8 @@ public static void validateWriteSupportedTable(
throw unsupportedWriterProtocol(tablePath, minWriterVersion);
case 7:
for (String writerFeature : protocol.getWriterFeatures()) {
switch (writerFeature) {
// Only supported writer features as of today in Kernel
case "appendOnly":
break;
case "inCommitTimestamp":
break;
case "columnMapping":
break;
case "typeWidening-preview":
break;
case "typeWidening":
break;
default:
throw unsupportedWriterFeature(tablePath, writerFeature);
if (!SUPPORTED_WRITER_FEATURES.contains(writerFeature)) {
throw unsupportedWriterFeature(tablePath, writerFeature);
}
}
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,9 @@
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.replay.ConflictChecker;
import io.delta.kernel.internal.replay.ConflictChecker.TransactionRebaseState;
import io.delta.kernel.internal.util.Clock;
import io.delta.kernel.internal.util.ColumnMapping;
import io.delta.kernel.internal.util.FileNames;
import io.delta.kernel.internal.util.InCommitTimestampUtils;
import io.delta.kernel.internal.util.VectorUtils;
import io.delta.kernel.internal.rowtracking.RowIDAssignmentResult;
import io.delta.kernel.internal.rowtracking.RowTracking;
import io.delta.kernel.internal.util.*;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterable;
import io.delta.kernel.utils.CloseableIterator;
Expand Down Expand Up @@ -76,6 +74,12 @@ public class TransactionImpl implements Transaction {
private Metadata metadata;
private boolean shouldUpdateMetadata;

// Contains domain metadata actions known prior to iterating and writing the data actions
private final List<DomainMetadata> domainMetadatas = new ArrayList<>();
// Contains domain metadata actions generated on the fly while writing the data actions
private CloseableIterator<DomainMetadata> domainMetadataIter =
toCloseableIterator(Collections.emptyIterator());
Comment on lines +77 to +81
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I probably will change this. Currently the list is only used in tests, and iterator is only used for row tracking's domain metadata. I’ll look for more use cases of domain metadata in Delta-Spark to see if there is a better way for managing DomainMetadata actions in a transaction.


private boolean closed; // To avoid trying to commit the same transaction again.

public TransactionImpl(
Expand Down Expand Up @@ -120,6 +124,23 @@ public StructType getSchema(Engine engine) {
return readSnapshot.getSchema(engine);
}

public Optional<SetTransaction> getSetTxnOpt() {
return setTxnOpt;
}

/**
* Internal API to add domain metadata actions for this transaction. Visible for testing.
*
* @param domainMetadatas List of domain metadata to be added to the transaction.
*/
public void addDomainMetadatas(List<DomainMetadata> domainMetadatas) {
this.domainMetadatas.addAll(domainMetadatas);
}

public List<DomainMetadata> getDomainMetadatas() {
return domainMetadatas;
}

@Override
public TransactionCommitResult commit(Engine engine, CloseableIterable<Row> dataActions)
throws ConcurrentWriteException {
Expand All @@ -131,6 +152,16 @@ public TransactionCommitResult commit(Engine engine, CloseableIterable<Row> data
CommitInfo attemptCommitInfo = generateCommitAction(engine);
updateMetadataWithICTIfRequired(
engine, attemptCommitInfo.getInCommitTimestamp(), readSnapshot.getVersion(engine));

// For Row Tracking, we assign base row IDs to all AddFiles inside dataActions that
// do not have it yet. If the high water mark has changed, we also emit a
// DomainMetadata action with the new high water mark.
RowIDAssignmentResult rowIDAssignmentResult =
RowTracking.assignBaseRowId(protocol, readSnapshot, FULL_SCHEMA, dataActions);
dataActions = rowIDAssignmentResult.getDataActions();
domainMetadataIter =
domainMetadataIter.combine(rowIDAssignmentResult.getDomainMetadatasIter());
Comment on lines +159 to +163
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done in this way because I remember we discussed that we want to keep dataActions only for data-related actions (AddFile, RemoveFile). If it’s acceptable to include DomainMetadata actions in dataActions, we could simplify this by appending potential DomainMetadata actions directly to dataActions.


int numRetries = 0;
do {
logger.info("Committing transaction as version = {}.", commitAsVersion);
Expand Down Expand Up @@ -223,9 +254,12 @@ private TransactionCommitResult doCommit(

try (CloseableIterator<Row> stageDataIter = dataActions.iterator()) {
// Create a new CloseableIterator that will return the metadata actions followed by the
// data actions.
// data actions, and then DomainMetadata actions. The order is crucial as DomainMetadata
// actions may depend on the data actions.
CloseableIterator<Row> dataAndMetadataActions =
toCloseableIterator(metadataActions.iterator()).combine(stageDataIter);
toCloseableIterator(metadataActions.iterator())
.combine(stageDataIter)
.combine(getDomainMetadataActions());

if (commitAsVersion == 0) {
// New table, create a delta log directory
Expand Down Expand Up @@ -265,10 +299,6 @@ public boolean isBlindAppend() {
return true;
}

public Optional<SetTransaction> getSetTxnOpt() {
return setTxnOpt;
}

/**
* Generates a timestamp which is greater than the commit timestamp of the readSnapshot. This can
* result in an additional file read and that this will only happen if ICT is enabled.
Expand Down Expand Up @@ -313,6 +343,29 @@ private Map<String, String> getOperationParameters() {
return Collections.emptyMap();
}

/**
* Returns an iterator of all domain metadata actions to be committed. Domain metadata actions
* exist in two places:
*
* <ol>
* <li>{@code List<DomainMetadata> domainMetadatas}: Contains domain metadata actions known
* prior to iterating and writing the data actions.
* <li>{@code CloseableIterator<DomainMetadata> domainMetadataIter}`: Contains domain metadata
* actions generated on the fly while writing the data actions.
* </ol>
*
* This method combines both sources of domain metadata actions and returns an iterator of all
* domain metadata action rows to be committed.
*
* @return a {@link CloseableIterator} of domain metadata action rows
*/
private CloseableIterator<Row> getDomainMetadataActions() {
// TODO: Implement the check for duplicate domain metadata and if the protocol supports here
return toCloseableIterator(domainMetadatas.listIterator())
.combine(domainMetadataIter)
.map(domainMetadata -> createDomainMetadataSingleAction(domainMetadata.toRow()));
}

/**
* Get the part of the schema of the table that needs the statistics to be collected per file.
*
Expand Down
Loading
Loading