Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel] Add JsonMetadataDomain and RowTrackingMetadataDomain #3893

Open
wants to merge 36 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
94260ef
Added Domain Metadata support to Delta Kernel
qiyuandong-db Oct 31, 2024
7612613
Lazily load domain metadata during log replay.
qiyuandong-db Oct 31, 2024
2ac3985
Use an iterator to wrap the data action iterator for DM duplicate det…
qiyuandong-db Oct 31, 2024
807c896
Update kernel/kernel-api/src/main/java/io/delta/kernel/internal/Delta…
qiyuandong-db Oct 31, 2024
84d4ae6
Improve error message
qiyuandong-db Oct 31, 2024
509bc27
Rename a unit test
qiyuandong-db Oct 31, 2024
83f3b1e
Don't allow duplicate DMs when reading actions from the winning txn d…
qiyuandong-db Nov 1, 2024
7d7032e
Update error messages in the tests
qiyuandong-db Nov 1, 2024
dadc5a6
Fix typos in comments
qiyuandong-db Nov 3, 2024
0c188c7
Add an integration test with spark
qiyuandong-db Nov 4, 2024
7948fdf
Fix the JavadocGenerationFailed error in the Delta Kernel CI job.
qiyuandong-db Nov 4, 2024
6195e7f
Resolve PR comments.
qiyuandong-db Nov 5, 2024
fd06f6d
Update util method extractDomainMetadataMap.
qiyuandong-db Nov 5, 2024
3e30a41
Remove blank lines.
qiyuandong-db Nov 5, 2024
cec85cf
Address PR comments
qiyuandong-db Nov 6, 2024
8edcc9a
Address PR comments
qiyuandong-db Nov 7, 2024
baca1c5
Move domain metadata actions out of dataActions
qiyuandong-db Nov 7, 2024
7e0a172
Fix javafmt
qiyuandong-db Nov 7, 2024
127de8c
Use a set to check for unsupported writer features
qiyuandong-db Nov 10, 2024
c5f3672
Resolve PR comments
qiyuandong-db Nov 11, 2024
b2d6546
Move golden table to kernel tests.
qiyuandong-db Nov 11, 2024
cd7ddeb
Use getTestResourceFilePath in test to get golden table path.
qiyuandong-db Nov 11, 2024
68c77d8
Resolve git comments
qiyuandong-db Nov 12, 2024
5afec36
Move resolveDomainMetadataConflict into handleDomainMetadata
qiyuandong-db Nov 12, 2024
e358878
Move addDomainMetadata from TransactionImpl to TransactionBuilderImpl
qiyuandong-db Nov 12, 2024
4f56a2f
Use SUPPORTED_WRITER_FEATURES in validateWriteSupportedTable
qiyuandong-db Nov 12, 2024
189ec75
Remove the duplicate check when reading winning txn's DM. Change extr…
qiyuandong-db Nov 13, 2024
a4e3104
Fix nit
qiyuandong-db Nov 13, 2024
b0e4a65
Rename populateDomainMetadataMap
qiyuandong-db Nov 13, 2024
f89199c
Remove unused imports
qiyuandong-db Nov 13, 2024
a5c5726
Resolve PR comments
qiyuandong-db Nov 15, 2024
a893fa0
Create the integration test table using Spark
qiyuandong-db Nov 15, 2024
73faf8f
Move addDomainMetadata from TransactionBuilderImpl to TransactionImpl
qiyuandong-db Nov 20, 2024
9ce9b56
Update domainMetadataUnsupported error message
qiyuandong-db Nov 20, 2024
60a29dd
Update the error message in the test
qiyuandong-db Nov 20, 2024
b32c56a
Add JsonMetadataDomain & RowTrackingMetadataDomain
qiyuandong-db Nov 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.metadatadomain;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.*;
import io.delta.kernel.exceptions.KernelException;
import io.delta.kernel.internal.SnapshotImpl;
import io.delta.kernel.internal.actions.DomainMetadata;
import io.delta.kernel.internal.rowtracking.RowTrackingMetadataDomain;
import java.util.Optional;

/**
* Abstract class representing a JSON metadata domain, whose configuration string is a JSON
* serialization of a domain object. This class provides methods to serialize and deserialize a
* metadata domain to and from JSON. Concrete implementations, such as {@link
* RowTrackingMetadataDomain}, should extend this class to define a specific metadata domain.
*/
public abstract class JsonMetadataDomain {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to review the default configuration used for serializing/deserializing.
E.g. does it correctly fail if the json string has missing fields / extra fields


/**
* Deserializes a JSON string into an instance of the specified metadata domain.
*
* @param json the JSON string to deserialize
* @param clazz the concrete class of the metadata domain object to deserialize into
* @param <T> the type of the object
* @return the deserialized object
* @throws KernelException if the JSON string cannot be parsed
*/
protected static <T> T fromJsonConfiguration(String json, Class<T> clazz) {
try {
return OBJECT_MAPPER.readValue(json, clazz);
} catch (JsonProcessingException e) {
throw new KernelException("Could not parse a JSON to a JsonMetadataDomain object", e);
}
}

/**
* Retrieves the domain metadata from a snapshot for a given domain, and deserializes it into an
* instance of the specified metadata domain class.
*
* @param snapshot the snapshot to read from
* @param clazz the metadata domain class of the object to deserialize into
* @param domainName the name of the domain
* @param <T> the type of the metadata domain object
* @return an Optional containing the deserialized object if the domain metadata is found,
* otherwise an empty Optional
*/
protected static <T> Optional<T> fromSnapshot(
SnapshotImpl snapshot, Class<T> clazz, String domainName) {
return Optional.ofNullable(snapshot.getDomainMetadataMap().get(domainName))
.map(domainMetadata -> fromJsonConfiguration(domainMetadata.getConfiguration(), clazz));
}

/**
* Returns the name of the domain.
*
* @return the domain name
*/
@JsonIgnore
public abstract String getDomainName();

/**
* Serializes this object into a JSON string.
*
* @return the JSON string representation of this object
* @throws KernelException if the object cannot be serialized
*/
public String toJsonConfiguration() {
try {
return OBJECT_MAPPER.writeValueAsString(this);
} catch (JsonProcessingException e) {
throw new KernelException("Could not serialize a JsonMetadataDomain object to JSON", e);
}
}

/**
* Generate a {@link DomainMetadata} action from this metadata domain.
*
* @return the DomainMetadata action instance
*/
public DomainMetadata toDomainMetadata() {
return new DomainMetadata(getDomainName(), toJsonConfiguration(), false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.rowtracking;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.delta.kernel.internal.SnapshotImpl;
import io.delta.kernel.internal.metadatadomain.JsonMetadataDomain;
import java.util.Optional;

/** Represents the metadata domain for row tracking. */
public class RowTrackingMetadataDomain extends JsonMetadataDomain {

/**
* Creates an instance of {@link RowTrackingMetadataDomain} from a JSON configuration string.
*
* @param json the JSON configuration string
* @return an instance of {@link RowTrackingMetadataDomain}
*/
public static RowTrackingMetadataDomain fromJsonConfiguration(String json) {
return JsonMetadataDomain.fromJsonConfiguration(json, RowTrackingMetadataDomain.class);
}

/**
* Creates an instance of {@link RowTrackingMetadataDomain} from a {@link SnapshotImpl}.
*
* @param snapshot the snapshot instance
* @return an {@link Optional} containing the {@link RowTrackingMetadataDomain} if present
*/
public static Optional<RowTrackingMetadataDomain> fromSnapshot(SnapshotImpl snapshot) {
return JsonMetadataDomain.fromSnapshot(snapshot, RowTrackingMetadataDomain.class, DOMAIN_NAME);
}
Comment on lines +27 to +45
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move these either below the constructor or at the end of the class


public static final String DOMAIN_NAME = "delta.rowTracking";

/** Default value for row ID high watermark when it is missing in the table */
public static final long MISSING_ROW_ID_HIGH_WATERMARK = -1L;
Comment on lines +49 to +50
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't used, add it in the following change for populating row ids


/** The highest assigned fresh row id for the table */
private long rowIdHighWaterMark;

/**
* Constructs a RowTrackingMetadataDomain with the specified row ID high water mark.
*
* @param rowIdHighWaterMark the row ID high water mark
*/
@JsonCreator
public RowTrackingMetadataDomain(@JsonProperty("rowIdHighWaterMark") long rowIdHighWaterMark) {
this.rowIdHighWaterMark = rowIdHighWaterMark;
}

@Override
public String getDomainName() {
return DOMAIN_NAME;
}

public long getRowIdHighWaterMark() {
return rowIdHighWaterMark;
}

public void setRowIdHighWaterMark(long rowIdHighWaterMark) {
this.rowIdHighWaterMark = rowIdHighWaterMark;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import io.delta.kernel.exceptions._
import io.delta.kernel.internal.{SnapshotImpl, TableImpl, TransactionBuilderImpl, TransactionImpl}
import io.delta.kernel.internal.actions.{DomainMetadata, Protocol, SingleAction}
import io.delta.kernel.internal.util.Utils.toCloseableIterator
import io.delta.kernel.internal.rowtracking.RowTrackingMetadataDomain
import io.delta.kernel.utils.CloseableIterable.{emptyIterable, inMemoryIterable}
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.delta.DeltaLog
Expand Down Expand Up @@ -472,4 +473,51 @@ class DomainMetadataSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBase
}
})
}

test("RowTrackingMetadataDomain is serializable and deserializable") {
withTempDirAndEngine((tablePath, engine) => {
// Create a RowTrackingMetadataDomain
val rowTrackingMetadataDomain = new RowTrackingMetadataDomain(10)

// Generate a DomainMetadata action from it and verify. Its configuration should be
// a JSON serialization of the rowTrackingMetadataDomain
val dm = rowTrackingMetadataDomain.toDomainMetadata
assert(dm.getDomain === rowTrackingMetadataDomain.getDomainName)
assert(dm.getConfiguration === """{"rowIdHighWaterMark":10}""")

// Verify the deserialization from DomainMetadata action into concrete domain object
val deserializedDomain = RowTrackingMetadataDomain.fromJsonConfiguration(dm.getConfiguration)
assert(deserializedDomain.getDomainName === rowTrackingMetadataDomain.getDomainName)
assert(
rowTrackingMetadataDomain.getRowIdHighWaterMark
=== deserializedDomain.getRowIdHighWaterMark
)

// Verify the domainMetadata can be committed and read back
createTableWithDomainMetadataSupported(engine, tablePath)
// Commit the domain metadata and verify
commitDomainMetadataAndVerify(
engine,
tablePath,
domainMetadatas = Seq(dm),
expectedValue = Map(rowTrackingMetadataDomain.getDomainName -> dm)
)

// Read the domain metadata back from the table snapshot
val table = Table.forPath(engine, tablePath)
val snapshot = table.getLatestSnapshot(engine).asInstanceOf[SnapshotImpl]
val rowTrackingMetadataDomainFromSnapshot =
RowTrackingMetadataDomain.fromSnapshot(snapshot).get

// Verify the domain metadata read back from the snapshot
assert(
rowTrackingMetadataDomain.getDomainName ===
rowTrackingMetadataDomainFromSnapshot.getDomainName
)
assert(
rowTrackingMetadataDomain.getRowIdHighWaterMark ===
rowTrackingMetadataDomainFromSnapshot.getRowIdHighWaterMark
)
})
}
}
Loading