Skip to content

Commit

Permalink
Fix formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
sopel39 committed Oct 29, 2024
1 parent babd228 commit f2ec813
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -338,8 +338,8 @@ public static Map<String, Object> getIcebergTableProperties(Table icebergTable)
public static Optional<String> getOrcBloomFilterColumns(Map<String, String> properties)
{
Optional<String> orcBloomFilterColumns = Stream.of(
properties.get(ORC_BLOOM_FILTER_COLUMNS),
properties.get(BROKEN_ORC_BLOOM_FILTER_COLUMNS_KEY))
properties.get(ORC_BLOOM_FILTER_COLUMNS),
properties.get(BROKEN_ORC_BLOOM_FILTER_COLUMNS_KEY))
.filter(Objects::nonNull)
.findFirst();
return orcBloomFilterColumns;
Expand All @@ -356,8 +356,8 @@ public static Set<String> getParquetBloomFilterColumns(Map<String, String> prope
public static Optional<String> getOrcBloomFilterFpp(Map<String, String> properties)
{
return Stream.of(
properties.get(ORC_BLOOM_FILTER_FPP),
properties.get(BROKEN_ORC_BLOOM_FILTER_FPP_KEY))
properties.get(ORC_BLOOM_FILTER_FPP),
properties.get(BROKEN_ORC_BLOOM_FILTER_FPP_KEY))
.filter(Objects::nonNull)
.findFirst();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1460,35 +1460,35 @@ public void testTrinoSparkConcurrentInsert()
QueryExecutor onTrino = onTrino();
QueryExecutor onSpark = onSpark();
List<Row> allInserted = executor.invokeAll(
Stream.of(Engine.TRINO, Engine.SPARK)
.map(engine -> (Callable<List<Row>>) () -> {
List<Row> inserted = new ArrayList<>();
for (int i = 0; i < insertsPerEngine; i++) {
barrier.await(20, SECONDS);
String engineName = engine.name().toLowerCase(ENGLISH);
long value = i;
switch (engine) {
case TRINO:
try {
onTrino.executeQuery(format("INSERT INTO %s VALUES ('%s', %d)", trinoTableName, engineName, value));
Stream.of(Engine.TRINO, Engine.SPARK)
.map(engine -> (Callable<List<Row>>) () -> {
List<Row> inserted = new ArrayList<>();
for (int i = 0; i < insertsPerEngine; i++) {
barrier.await(20, SECONDS);
String engineName = engine.name().toLowerCase(ENGLISH);
long value = i;
switch (engine) {
case TRINO:
try {
onTrino.executeQuery(format("INSERT INTO %s VALUES ('%s', %d)", trinoTableName, engineName, value));
}
catch (QueryExecutionException queryExecutionException) {
// failed to insert
continue; // next loop iteration
}
break;
case SPARK:
onSpark.executeQuery(format("INSERT INTO %s VALUES ('%s', %d)", sparkTableName, engineName, value));
break;
default:
throw new UnsupportedOperationException("Unexpected engine: " + engine);
}
catch (QueryExecutionException queryExecutionException) {
// failed to insert
continue; // next loop iteration
}
break;
case SPARK:
onSpark.executeQuery(format("INSERT INTO %s VALUES ('%s', %d)", sparkTableName, engineName, value));
break;
default:
throw new UnsupportedOperationException("Unexpected engine: " + engine);
}

inserted.add(row(engineName, value));
}
return inserted;
})
.collect(toImmutableList())).stream()

inserted.add(row(engineName, value));
}
return inserted;
})
.collect(toImmutableList())).stream()
.map(MoreFutures::getDone)
.flatMap(List::stream)
.collect(toImmutableList());
Expand Down Expand Up @@ -2609,19 +2609,19 @@ public void testMetadataCompressionCodecGzip()
private void validatePartitioning(String baseTableName, String sparkTableName, List<Map<String, String>> expectedValues)
{
List<String> trinoResult = expectedValues.stream().map(m ->
m.entrySet().stream()
.map(entry -> format("%s=%s", entry.getKey(), entry.getValue()))
.collect(Collectors.joining(", ", "{", "}")))
m.entrySet().stream()
.map(entry -> format("%s=%s", entry.getKey(), entry.getValue()))
.collect(Collectors.joining(", ", "{", "}")))
.collect(toImmutableList());
List<Object> partitioning = onTrino().executeQuery(format("SELECT partition, record_count FROM iceberg.default.\"%s$partitions\"", baseTableName))
.column(1);
Set<String> partitions = partitioning.stream().map(String::valueOf).collect(toUnmodifiableSet());
assertThat(partitions.size()).isEqualTo(expectedValues.size());
assertThat(partitions).containsAll(trinoResult);
List<String> sparkResult = expectedValues.stream().map(m ->
m.entrySet().stream()
.map(entry -> format("\"%s\":%s", entry.getKey(), entry.getValue()))
.collect(Collectors.joining(",", "{", "}")))
m.entrySet().stream()
.map(entry -> format("\"%s\":%s", entry.getKey(), entry.getValue()))
.collect(Collectors.joining(",", "{", "}")))
.collect(toImmutableList());
partitioning = onSpark().executeQuery(format("SELECT partition from %s.files", sparkTableName)).column(1);
partitions = partitioning.stream().map(String::valueOf).collect(toUnmodifiableSet());
Expand Down

0 comments on commit f2ec813

Please sign in to comment.