diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java index 84820e882ce65b..0a8ca3ec685ab5 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java @@ -338,8 +338,8 @@ public static Map getIcebergTableProperties(Table icebergTable) public static Optional getOrcBloomFilterColumns(Map properties) { Optional orcBloomFilterColumns = Stream.of( - properties.get(ORC_BLOOM_FILTER_COLUMNS), - properties.get(BROKEN_ORC_BLOOM_FILTER_COLUMNS_KEY)) + properties.get(ORC_BLOOM_FILTER_COLUMNS), + properties.get(BROKEN_ORC_BLOOM_FILTER_COLUMNS_KEY)) .filter(Objects::nonNull) .findFirst(); return orcBloomFilterColumns; @@ -356,8 +356,8 @@ public static Set getParquetBloomFilterColumns(Map prope public static Optional getOrcBloomFilterFpp(Map properties) { return Stream.of( - properties.get(ORC_BLOOM_FILTER_FPP), - properties.get(BROKEN_ORC_BLOOM_FILTER_FPP_KEY)) + properties.get(ORC_BLOOM_FILTER_FPP), + properties.get(BROKEN_ORC_BLOOM_FILTER_FPP_KEY)) .filter(Objects::nonNull) .findFirst(); } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java index 4c1b6d62c2a693..3dc13476fefc91 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java @@ -1460,35 +1460,35 @@ public void testTrinoSparkConcurrentInsert() QueryExecutor onTrino = onTrino(); QueryExecutor onSpark = onSpark(); List allInserted = executor.invokeAll( - Stream.of(Engine.TRINO, Engine.SPARK) - .map(engine -> (Callable>) () -> { - List inserted = new ArrayList<>(); - for (int i = 0; i < insertsPerEngine; i++) { - barrier.await(20, SECONDS); - String engineName = engine.name().toLowerCase(ENGLISH); - long value = i; - switch (engine) { - case TRINO: - try { - onTrino.executeQuery(format("INSERT INTO %s VALUES ('%s', %d)", trinoTableName, engineName, value)); + Stream.of(Engine.TRINO, Engine.SPARK) + .map(engine -> (Callable>) () -> { + List inserted = new ArrayList<>(); + for (int i = 0; i < insertsPerEngine; i++) { + barrier.await(20, SECONDS); + String engineName = engine.name().toLowerCase(ENGLISH); + long value = i; + switch (engine) { + case TRINO: + try { + onTrino.executeQuery(format("INSERT INTO %s VALUES ('%s', %d)", trinoTableName, engineName, value)); + } + catch (QueryExecutionException queryExecutionException) { + // failed to insert + continue; // next loop iteration + } + break; + case SPARK: + onSpark.executeQuery(format("INSERT INTO %s VALUES ('%s', %d)", sparkTableName, engineName, value)); + break; + default: + throw new UnsupportedOperationException("Unexpected engine: " + engine); } - catch (QueryExecutionException queryExecutionException) { - // failed to insert - continue; // next loop iteration - } - break; - case SPARK: - onSpark.executeQuery(format("INSERT INTO %s VALUES ('%s', %d)", sparkTableName, engineName, value)); - break; - default: - throw new UnsupportedOperationException("Unexpected engine: " + engine); - } - - inserted.add(row(engineName, value)); - } - return inserted; - }) - .collect(toImmutableList())).stream() + + inserted.add(row(engineName, value)); + } + return inserted; + }) + .collect(toImmutableList())).stream() .map(MoreFutures::getDone) .flatMap(List::stream) .collect(toImmutableList()); @@ -2609,9 +2609,9 @@ public void testMetadataCompressionCodecGzip() private void validatePartitioning(String baseTableName, String sparkTableName, List> expectedValues) { List trinoResult = expectedValues.stream().map(m -> - m.entrySet().stream() - .map(entry -> format("%s=%s", entry.getKey(), entry.getValue())) - .collect(Collectors.joining(", ", "{", "}"))) + m.entrySet().stream() + .map(entry -> format("%s=%s", entry.getKey(), entry.getValue())) + .collect(Collectors.joining(", ", "{", "}"))) .collect(toImmutableList()); List partitioning = onTrino().executeQuery(format("SELECT partition, record_count FROM iceberg.default.\"%s$partitions\"", baseTableName)) .column(1); @@ -2619,9 +2619,9 @@ private void validatePartitioning(String baseTableName, String sparkTableName, L assertThat(partitions.size()).isEqualTo(expectedValues.size()); assertThat(partitions).containsAll(trinoResult); List sparkResult = expectedValues.stream().map(m -> - m.entrySet().stream() - .map(entry -> format("\"%s\":%s", entry.getKey(), entry.getValue())) - .collect(Collectors.joining(",", "{", "}"))) + m.entrySet().stream() + .map(entry -> format("\"%s\":%s", entry.getKey(), entry.getValue())) + .collect(Collectors.joining(",", "{", "}"))) .collect(toImmutableList()); partitioning = onSpark().executeQuery(format("SELECT partition from %s.files", sparkTableName)).column(1); partitions = partitioning.stream().map(String::valueOf).collect(toUnmodifiableSet());