Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support write multi fragments or empty fragment in one spark task #3183

Merged
merged 6 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 1 addition & 14 deletions java/core/lance-jni/src/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use lance_datafusion::utils::StreamingWriteSource;
use crate::error::{Error, Result};
use crate::{
blocking_dataset::{BlockingDataset, NATIVE_DATASET},
ffi::JNIEnvExt,
traits::FromJString,
utils::extract_write_params,
RT,
Expand Down Expand Up @@ -77,7 +76,6 @@ pub extern "system" fn Java_com_lancedb_lance_Fragment_createWithFfiArray<'local
dataset_uri: JString,
arrow_array_addr: jlong,
arrow_schema_addr: jlong,
fragment_id: JObject, // Optional<Integer>
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
Expand All @@ -91,7 +89,6 @@ pub extern "system" fn Java_com_lancedb_lance_Fragment_createWithFfiArray<'local
dataset_uri,
arrow_array_addr,
arrow_schema_addr,
fragment_id,
max_rows_per_file,
max_rows_per_group,
max_bytes_per_file,
Expand All @@ -108,7 +105,6 @@ fn inner_create_with_ffi_array<'local>(
dataset_uri: JString,
arrow_array_addr: jlong,
arrow_schema_addr: jlong,
fragment_id: JObject, // Optional<Integer>
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
Expand All @@ -131,7 +127,6 @@ fn inner_create_with_ffi_array<'local>(
create_fragment(
env,
dataset_uri,
fragment_id,
max_rows_per_file,
max_rows_per_group,
max_bytes_per_file,
Expand All @@ -147,7 +142,6 @@ pub extern "system" fn Java_com_lancedb_lance_Fragment_createWithFfiStream<'a>(
_obj: JObject,
dataset_uri: JString,
arrow_array_stream_addr: jlong,
fragment_id: JObject, // Optional<Integer>
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
Expand All @@ -160,7 +154,6 @@ pub extern "system" fn Java_com_lancedb_lance_Fragment_createWithFfiStream<'a>(
&mut env,
dataset_uri,
arrow_array_stream_addr,
fragment_id,
max_rows_per_file,
max_rows_per_group,
max_bytes_per_file,
Expand All @@ -176,7 +169,6 @@ fn inner_create_with_ffi_stream<'local>(
env: &mut JNIEnv<'local>,
dataset_uri: JString,
arrow_array_stream_addr: jlong,
fragment_id: JObject, // Optional<Integer>
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
Expand All @@ -189,7 +181,6 @@ fn inner_create_with_ffi_stream<'local>(
create_fragment(
env,
dataset_uri,
fragment_id,
max_rows_per_file,
max_rows_per_group,
max_bytes_per_file,
Expand All @@ -203,7 +194,6 @@ fn inner_create_with_ffi_stream<'local>(
fn create_fragment<'a>(
env: &mut JNIEnv<'a>,
dataset_uri: JString,
fragment_id: JObject, // Optional<Integer>
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
Expand All @@ -213,8 +203,6 @@ fn create_fragment<'a>(
) -> Result<JString<'a>> {
let path_str = dataset_uri.extract(env)?;

let fragment_id_opts = env.get_int_opt(&fragment_id)?;

let write_params = extract_write_params(
env,
&max_rows_per_file,
Expand All @@ -223,9 +211,8 @@ fn create_fragment<'a>(
&mode,
&storage_options_obj,
)?;
let fragment = RT.block_on(FileFragment::create(
let fragment = RT.block_on(FileFragment::create_fragments(
&path_str,
fragment_id_opts.unwrap_or(0) as usize,
source,
Some(write_params),
))?;
Expand Down
4 changes: 2 additions & 2 deletions java/core/lance-jni/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ pub fn extract_write_params(
if let Some(mode_val) = env.get_string_opt(mode)? {
write_params.mode = WriteMode::try_from(mode_val.as_str())?;
}
// Java code always sets the data storage version to Legacy for now
write_params.data_storage_version = Some(LanceFileVersion::Legacy);
// Java code always sets the data storage version to stable for now
write_params.data_storage_version = Some(LanceFileVersion::Stable);
let jmap = JMap::from_env(env, storage_options_obj)?;
let storage_options: HashMap<String, String> = env.with_local_frame(16, |env| {
let mut map = HashMap::new();
Expand Down
29 changes: 13 additions & 16 deletions java/core/src/main/java/com/lancedb/lance/Fragment.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.lancedb.lance;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.arrow.c.ArrowArray;
Expand All @@ -36,43 +37,39 @@ public class Fragment {
* @param datasetUri the dataset uri
* @param allocator the buffer allocator
* @param root the vector schema root
* @param fragmentId the fragment id
* @param params the write params
* @return the fragment metadata
*/
public static FragmentMetadata create(String datasetUri, BufferAllocator allocator,
VectorSchemaRoot root, Optional<Integer> fragmentId, WriteParams params) {
public static List<FragmentMetadata> create(String datasetUri, BufferAllocator allocator,
VectorSchemaRoot root, WriteParams params) {
Preconditions.checkNotNull(datasetUri);
Preconditions.checkNotNull(allocator);
Preconditions.checkNotNull(root);
Preconditions.checkNotNull(fragmentId);
Preconditions.checkNotNull(params);
try (ArrowSchema arrowSchema = ArrowSchema.allocateNew(allocator);
ArrowArray arrowArray = ArrowArray.allocateNew(allocator)) {
Data.exportVectorSchemaRoot(allocator, root, null, arrowArray, arrowSchema);
return FragmentMetadata.fromJson(createWithFfiArray(datasetUri, arrowArray.memoryAddress(),
arrowSchema.memoryAddress(), fragmentId, params.getMaxRowsPerFile(),
params.getMaxRowsPerGroup(), params.getMaxBytesPerFile(), params.getMode(),
params.getStorageOptions()));
return FragmentMetadata.fromJsonArray(createWithFfiArray(datasetUri,
arrowArray.memoryAddress(), arrowSchema.memoryAddress(),
params.getMaxRowsPerFile(), params.getMaxRowsPerGroup(), params.getMaxBytesPerFile(),
params.getMode(), params.getStorageOptions()));
}
}

/**
* Create a fragment from the given arrow stream.
* @param datasetUri the dataset uri
* @param stream the arrow stream
* @param fragmentId the fragment id
* @param params the write params
* @return the fragment metadata
*/
public static FragmentMetadata create(String datasetUri, ArrowArrayStream stream,
Optional<Integer> fragmentId, WriteParams params) {
public static List<FragmentMetadata> create(String datasetUri, ArrowArrayStream stream,
WriteParams params) {
Preconditions.checkNotNull(datasetUri);
Preconditions.checkNotNull(stream);
Preconditions.checkNotNull(fragmentId);
Preconditions.checkNotNull(params);
return FragmentMetadata.fromJson(createWithFfiStream(datasetUri,
stream.memoryAddress(), fragmentId,
return FragmentMetadata.fromJsonArray(createWithFfiStream(datasetUri,
stream.memoryAddress(),
params.getMaxRowsPerFile(), params.getMaxRowsPerGroup(),
params.getMaxBytesPerFile(), params.getMode(), params.getStorageOptions()));
}
Expand All @@ -83,7 +80,7 @@ public static FragmentMetadata create(String datasetUri, ArrowArrayStream stream
* @return the json serialized fragment metadata
*/
private static native String createWithFfiArray(String datasetUri,
long arrowArrayMemoryAddress, long arrowSchemaMemoryAddress, Optional<Integer> fragmentId,
long arrowArrayMemoryAddress, long arrowSchemaMemoryAddress,
Optional<Integer> maxRowsPerFile, Optional<Integer> maxRowsPerGroup,
Optional<Long> maxBytesPerFile, Optional<String> mode, Map<String, String> storageOptions);

Expand All @@ -93,7 +90,7 @@ private static native String createWithFfiArray(String datasetUri,
* @return the json serialized fragment metadata
*/
private static native String createWithFfiStream(String datasetUri, long arrowStreamMemoryAddress,
Optional<Integer> fragmentId, Optional<Integer> maxRowsPerFile,
Optional<Integer> maxRowsPerFile,
Optional<Integer> maxRowsPerGroup, Optional<Long> maxBytesPerFile,
Optional<String> mode, Map<String, String> storageOptions);
}
27 changes: 27 additions & 0 deletions java/core/src/main/java/com/lancedb/lance/FragmentMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
package com.lancedb.lance;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.arrow.util.Preconditions;
import org.json.JSONArray;
import org.json.JSONObject;
import org.apache.commons.lang3.builder.ToStringBuilder;

Expand Down Expand Up @@ -75,4 +79,27 @@ public static FragmentMetadata fromJson(String jsonMetadata) {
return new FragmentMetadata(jsonMetadata, metadata.getInt(ID_KEY),
metadata.getLong(PHYSICAL_ROWS_KEY));
}

/**
* Converts a JSON array string into a list of FragmentMetadata objects.
*
* @param jsonMetadata A JSON array string containing fragment metadata.
* @return A list of FragmentMetadata objects.
*/
public static List<FragmentMetadata> fromJsonArray(String jsonMetadata) {
Preconditions.checkNotNull(jsonMetadata);
JSONArray metadatas = new JSONArray(jsonMetadata);
List<FragmentMetadata> fragmentMetadataList = new ArrayList<>();
for (Object object : metadatas) {
JSONObject metadata = (JSONObject) object;
if (!metadata.has(ID_KEY) || !metadata.has(PHYSICAL_ROWS_KEY)) {
throw new IllegalArgumentException(
String.format("Fragment metadata must have {} and {} but is {}",
ID_KEY, PHYSICAL_ROWS_KEY, jsonMetadata));
}
fragmentMetadataList.add(new FragmentMetadata(metadata.toString(), metadata.getInt(ID_KEY),
metadata.getLong(PHYSICAL_ROWS_KEY)));
}
return fragmentMetadataList;
}
}
36 changes: 28 additions & 8 deletions java/core/src/test/java/com/lancedb/lance/FragmentTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.types.pojo.Schema;
Expand All @@ -37,7 +37,7 @@ void testFragmentCreateFfiArray() {
try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
TestUtils.SimpleTestDataset testDataset = new TestUtils.SimpleTestDataset(allocator, datasetPath);
testDataset.createEmptyDataset().close();
testDataset.createNewFragment(123, 20);
testDataset.createNewFragment(20);
}
}

Expand All @@ -47,9 +47,8 @@ void testFragmentCreate() throws Exception {
try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
TestUtils.SimpleTestDataset testDataset = new TestUtils.SimpleTestDataset(allocator, datasetPath);
testDataset.createEmptyDataset().close();
int fragmentId = 312;
int rowCount = 21;
FragmentMetadata fragmentMeta = testDataset.createNewFragment(fragmentId, rowCount);
FragmentMetadata fragmentMeta = testDataset.createNewFragment(rowCount);

// Commit fragment
FragmentOperation.Append appendOp = new FragmentOperation.Append(Arrays.asList(fragmentMeta));
Expand All @@ -58,8 +57,7 @@ void testFragmentCreate() throws Exception {
assertEquals(2, dataset.latestVersion());
assertEquals(rowCount, dataset.countRows());
DatasetFragment fragment = dataset.getFragments().get(0);
assertEquals(fragmentId, fragment.getId());


try (LanceScanner scanner = fragment.newScan()) {
Schema schemaRes = scanner.schema();
assertEquals(testDataset.getSchema(), schemaRes);
Expand All @@ -74,7 +72,7 @@ void commitWithoutVersion() {
try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
TestUtils.SimpleTestDataset testDataset = new TestUtils.SimpleTestDataset(allocator, datasetPath);
testDataset.createEmptyDataset().close();
FragmentMetadata meta = testDataset.createNewFragment(123, 20);
FragmentMetadata meta = testDataset.createNewFragment(20);
FragmentOperation.Append appendOp = new FragmentOperation.Append(Arrays.asList(meta));
assertThrows(IllegalArgumentException.class, () -> {
Dataset.commit(allocator, datasetPath, appendOp, Optional.empty());
Expand All @@ -88,7 +86,7 @@ void commitOldVersion() {
try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
TestUtils.SimpleTestDataset testDataset = new TestUtils.SimpleTestDataset(allocator, datasetPath);
testDataset.createEmptyDataset().close();
FragmentMetadata meta = testDataset.createNewFragment(123, 20);
FragmentMetadata meta = testDataset.createNewFragment(20);
FragmentOperation.Append appendOp = new FragmentOperation.Append(Arrays.asList(meta));
assertThrows(IllegalArgumentException.class, () -> {
Dataset.commit(allocator, datasetPath, appendOp, Optional.of(0L));
Expand All @@ -107,4 +105,26 @@ void appendWithoutFragment() {
});
}
}

@Test
void testEmptyFragments() {
String datasetPath = tempDir.resolve("testEmptyFragments").toString();
try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
TestUtils.SimpleTestDataset testDataset = new TestUtils.SimpleTestDataset(allocator, datasetPath);
testDataset.createEmptyDataset().close();
List<FragmentMetadata> fragments = testDataset.createNewFragment(0, 10);
assertEquals(0, fragments.size());
}
}

@Test
void testMultiFragments() {
String datasetPath = tempDir.resolve("testMultiFragments").toString();
try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
TestUtils.SimpleTestDataset testDataset = new TestUtils.SimpleTestDataset(allocator, datasetPath);
testDataset.createEmptyDataset().close();
List<FragmentMetadata> fragments = testDataset.createNewFragment(20, 10);
assertEquals(2, fragments.size());
}
}
}
30 changes: 14 additions & 16 deletions java/core/src/test/java/com/lancedb/lance/ScannerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -225,17 +225,16 @@ void testScanFragment() throws Exception {
try (BufferAllocator allocator = new RootAllocator()) {
TestUtils.SimpleTestDataset testDataset = new TestUtils.SimpleTestDataset(allocator, datasetPath);
testDataset.createEmptyDataset().close();
int[] fragment0 = new int[]{0, 3};
int[] fragment1 = new int[]{1, 5};
int[] fragment2 = new int[]{2, 7};
FragmentMetadata metadata0 = testDataset.createNewFragment(fragment0[0], fragment0[1]);
FragmentMetadata metadata1 = testDataset.createNewFragment(fragment1[0], fragment1[1]);
FragmentMetadata metadata2 = testDataset.createNewFragment(fragment2[0], fragment2[1]);
FragmentMetadata metadata0 = testDataset.createNewFragment(3);
FragmentMetadata metadata1 = testDataset.createNewFragment(5);
FragmentMetadata metadata2 = testDataset.createNewFragment(7);
FragmentOperation.Append appendOp = new FragmentOperation.Append(Arrays.asList(metadata0, metadata1, metadata2));
try (Dataset dataset = Dataset.commit(allocator, datasetPath, appendOp, Optional.of(1L))) {
validScanResult(dataset, fragment0[0], fragment0[1]);
validScanResult(dataset, fragment1[0], fragment1[1]);
validScanResult(dataset, fragment2[0], fragment2[1]);
List<DatasetFragment> frags = dataset.getFragments();
assertEquals(3, frags.size());
validScanResult(dataset, frags.get(0).getId(), 3);
validScanResult(dataset, frags.get(1).getId(), 5);
validScanResult(dataset, frags.get(2).getId(), 7);
}
}
}
Expand All @@ -246,15 +245,14 @@ void testScanFragments() throws Exception {
try (BufferAllocator allocator = new RootAllocator()) {
TestUtils.SimpleTestDataset testDataset = new TestUtils.SimpleTestDataset(allocator, datasetPath);
testDataset.createEmptyDataset().close();
int[] fragment0 = new int[]{0, 3};
int[] fragment1 = new int[]{1, 5};
int[] fragment2 = new int[]{2, 7};
FragmentMetadata metadata0 = testDataset.createNewFragment(fragment0[0], fragment0[1]);
FragmentMetadata metadata1 = testDataset.createNewFragment(fragment1[0], fragment1[1]);
FragmentMetadata metadata2 = testDataset.createNewFragment(fragment2[0], fragment2[1]);
FragmentMetadata metadata0 = testDataset.createNewFragment(3);
FragmentMetadata metadata1 = testDataset.createNewFragment(5);
FragmentMetadata metadata2 = testDataset.createNewFragment(7);
FragmentOperation.Append appendOp = new FragmentOperation.Append(Arrays.asList(metadata0, metadata1, metadata2));
try (Dataset dataset = Dataset.commit(allocator, datasetPath, appendOp, Optional.of(1L))) {
try (Scanner scanner = dataset.newScan(new ScanOptions.Builder().batchSize(1024).fragmentIds(Arrays.asList(1, 2)).build())) {
List<DatasetFragment> frags = dataset.getFragments();
assertEquals(3, frags.size());
try (Scanner scanner = dataset.newScan(new ScanOptions.Builder().batchSize(1024).fragmentIds(Arrays.asList(frags.get(1).getId(), frags.get(2).getId())).build())) {
try (ArrowReader reader = scanner.scanBatches()) {
assertEquals(dataset.getSchema().getFields(), reader.getVectorSchemaRoot().getSchema().getFields());
int rowcount = 0;
Expand Down
Loading
Loading