Skip to content

Commit

Permalink
Update naming in Iceberg Trino/Spark compatibility test
Browse files Browse the repository at this point in the history
  • Loading branch information
findepi committed Aug 2, 2021
1 parent 61077e6 commit adb45f3
Showing 1 changed file with 68 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,24 @@
import static java.lang.String.format;
import static org.testng.Assert.assertEquals;

public class TestSparkCompatibility
public class TestIcebergSparkCompatibility
extends ProductTest
{
// TODO: Spark SQL doesn't yet support decimal. When it does add it to the test.
// TODO: Spark SQL only stores TIMESTAMP WITH TIME ZONE, and Iceberg only supports
// TIMESTAMP with no time zone. The Spark writes/Presto reads test can pass by
// TIMESTAMP with no time zone. The Spark writes/Trino reads test can pass by
// stripping off the UTC. However, I haven't been able to get the
// Presto writes/Spark reads test TIMESTAMPs to match.
// Trino writes/Spark reads test TIMESTAMPs to match.

// see spark-defaults.conf
private static final String SPARK_CATALOG = "iceberg_test";
private static final String PRESTO_CATALOG = "iceberg";
private static final String TRINO_CATALOG = "iceberg";
private static final String TEST_SCHEMA_NAME = "default";

@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS})
public void testPrestoReadingSparkData()
public void testTrinoReadingSparkData()
{
String baseTableName = "test_presto_reading_primitive_types";
String baseTableName = "test_trino_reading_primitive_types";
String sparkTableName = sparkTableName(baseTableName);

String sparkTableDefinition =
Expand All @@ -64,9 +64,9 @@ public void testPrestoReadingSparkData()
onSpark().executeQuery(format(sparkTableDefinition, sparkTableName));

// Validate queries on an empty table created by Spark
String snapshotsTable = prestoTableName("\"" + baseTableName + "$snapshots\"");
String snapshotsTable = trinoTableName("\"" + baseTableName + "$snapshots\"");
assertThat(onTrino().executeQuery(format("SELECT * FROM %s", snapshotsTable))).hasNoRows();
QueryResult emptyResult = onTrino().executeQuery(format("SELECT * FROM %s", prestoTableName(baseTableName)));
QueryResult emptyResult = onTrino().executeQuery(format("SELECT * FROM %s", trinoTableName(baseTableName)));
assertThat(emptyResult).hasNoRows();

String values = "VALUES (" +
Expand Down Expand Up @@ -96,19 +96,19 @@ public void testPrestoReadingSparkData()
QueryResult sparkSelect = onSpark().executeQuery(format("%s, _timestamp, _date FROM %s", startOfSelect, sparkTableName));
assertThat(sparkSelect).containsOnly(row);

QueryResult prestoSelect = onTrino().executeQuery(format("%s, CAST(_timestamp AS TIMESTAMP), _date FROM %s", startOfSelect, prestoTableName(baseTableName)));
assertThat(prestoSelect).containsOnly(row);
QueryResult trinoSelect = onTrino().executeQuery(format("%s, CAST(_timestamp AS TIMESTAMP), _date FROM %s", startOfSelect, trinoTableName(baseTableName)));
assertThat(trinoSelect).containsOnly(row);

onSpark().executeQuery("DROP TABLE " + sparkTableName);
}

@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS})
public void testSparkReadingPrestoData()
public void testSparkReadingTrinoData()
{
String baseTableName = "test_spark_reading_primitive_types";
String prestoTableName = prestoTableName(baseTableName);
String trinoTableName = trinoTableName(baseTableName);

String prestoTableDefinition =
String trinoTableDefinition =
"CREATE TABLE %s (" +
" _string VARCHAR" +
", _bigint BIGINT" +
Expand All @@ -119,7 +119,7 @@ public void testSparkReadingPrestoData()
//", _timestamp TIMESTAMP" +
", _date DATE" +
") WITH (format = 'ORC')";
onTrino().executeQuery(format(prestoTableDefinition, prestoTableName));
onTrino().executeQuery(format(trinoTableDefinition, trinoTableName));

String values = "VALUES (" +
"'a_string'" +
Expand All @@ -131,7 +131,7 @@ public void testSparkReadingPrestoData()
//", TIMESTAMP '2020-06-28 14:16:00.456'" +
", DATE '1950-06-28'" +
")";
String insert = format("INSERT INTO %s %s", prestoTableName, values);
String insert = format("INSERT INTO %s %s", trinoTableName, values);
onTrino().executeQuery(insert);

Row row = row(
Expand All @@ -144,52 +144,52 @@ public void testSparkReadingPrestoData()
//"2020-06-28 14:16:00.456",
"1950-06-28");
String startOfSelect = "SELECT _string, _bigint, _integer, _real, _double, _boolean";
QueryResult prestoSelect = onTrino().executeQuery(format("%s, /* CAST(_timestamp AS VARCHAR),*/ CAST(_date AS VARCHAR) FROM %s", startOfSelect, prestoTableName));
assertThat(prestoSelect).containsOnly(row);
QueryResult trinoSelect = onTrino().executeQuery(format("%s, /* CAST(_timestamp AS VARCHAR),*/ CAST(_date AS VARCHAR) FROM %s", startOfSelect, trinoTableName));
assertThat(trinoSelect).containsOnly(row);

QueryResult sparkSelect = onSpark().executeQuery(format("%s, /* CAST(_timestamp AS STRING),*/ CAST(_date AS STRING) FROM %s", startOfSelect, sparkTableName(baseTableName)));
assertThat(sparkSelect).containsOnly(row);

onTrino().executeQuery("DROP TABLE " + prestoTableName);
onTrino().executeQuery("DROP TABLE " + trinoTableName);
}

@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS})
public void testSparkCreatesPrestoDrops()
public void testSparkCreatesTrinoDrops()
{
String baseTableName = "test_spark_creates_presto_drops";
String baseTableName = "test_spark_creates_trino_drops";
onSpark().executeQuery(format("CREATE TABLE %s (_string STRING, _bigint BIGINT) USING ICEBERG", sparkTableName(baseTableName)));
onTrino().executeQuery("DROP TABLE " + prestoTableName(baseTableName));
onTrino().executeQuery("DROP TABLE " + trinoTableName(baseTableName));
}

@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS})
public void testPrestoCreatesSparkDrops()
public void testTrinoCreatesSparkDrops()
{
String baseTableName = "test_presto_creates_spark_drops";
onTrino().executeQuery(format("CREATE TABLE %s (_string VARCHAR, _bigint BIGINT)", prestoTableName(baseTableName)));
String baseTableName = "test_trino_creates_spark_drops";
onTrino().executeQuery(format("CREATE TABLE %s (_string VARCHAR, _bigint BIGINT)", trinoTableName(baseTableName)));
onSpark().executeQuery("DROP TABLE " + sparkTableName(baseTableName));
}

@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS})
public void testSparkReadsPrestoPartitionedTable()
public void testSparkReadsTrinoPartitionedTable()
{
String baseTableName = "test_spark_reads_presto_partitioned_table";
String prestoTableName = prestoTableName(baseTableName);
onTrino().executeQuery(format("CREATE TABLE %s (_string VARCHAR, _bigint BIGINT) WITH (partitioning = ARRAY['_string'])", prestoTableName));
onTrino().executeQuery(format("INSERT INTO %s VALUES ('a', 1001), ('b', 1002), ('c', 1003)", prestoTableName));
String baseTableName = "test_spark_reads_trino_partitioned_table";
String trinoTableName = trinoTableName(baseTableName);
onTrino().executeQuery(format("CREATE TABLE %s (_string VARCHAR, _bigint BIGINT) WITH (partitioning = ARRAY['_string'])", trinoTableName));
onTrino().executeQuery(format("INSERT INTO %s VALUES ('a', 1001), ('b', 1002), ('c', 1003)", trinoTableName));

Row row = row("b", 1002);
String select = "SELECT * FROM %s WHERE _string = 'b'";
assertThat(onTrino().executeQuery(format(select, prestoTableName)))
assertThat(onTrino().executeQuery(format(select, trinoTableName)))
.containsOnly(row);
assertThat(onSpark().executeQuery(format(select, sparkTableName(baseTableName))))
.containsOnly(row);
onTrino().executeQuery("DROP TABLE " + prestoTableName);
onTrino().executeQuery("DROP TABLE " + trinoTableName);
}

@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS})
public void testPrestoReadsSparkPartitionedTable()
public void testTrinoReadsSparkPartitionedTable()
{
String baseTableName = "test_spark_reads_presto_partitioned_table";
String baseTableName = "test_spark_reads_trino_partitioned_table";
String sparkTableName = sparkTableName(baseTableName);
onSpark().executeQuery(format("CREATE TABLE %s (_string STRING, _bigint BIGINT) USING ICEBERG PARTITIONED BY (_string)", sparkTableName));
onSpark().executeQuery(format("INSERT INTO %s VALUES ('a', 1001), ('b', 1002), ('c', 1003)", sparkTableName));
Expand All @@ -198,16 +198,16 @@ public void testPrestoReadsSparkPartitionedTable()
String select = "SELECT * FROM %s WHERE _string = 'b'";
assertThat(onSpark().executeQuery(format(select, sparkTableName)))
.containsOnly(row);
assertThat(onTrino().executeQuery(format(select, prestoTableName(baseTableName))))
assertThat(onTrino().executeQuery(format(select, trinoTableName(baseTableName))))
.containsOnly(row);

onSpark().executeQuery("DROP TABLE " + sparkTableName);
}

@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS})
public void testPrestoReadingCompositeSparkData()
public void testTrinoReadingCompositeSparkData()
{
String baseTableName = "test_presto_reading_spark_composites";
String baseTableName = "test_trino_reading_spark_composites";
String sparkTableName = sparkTableName(baseTableName);

String sparkTableDefinition = "" +
Expand All @@ -224,30 +224,30 @@ public void testPrestoReadingCompositeSparkData()
"named_struct('name', 'Santa', 'surname', 'Claus','age', 1000,'gender', 'MALE')";
onSpark().executeQuery(format(insert, sparkTableName));

String prestoTableName = prestoTableName(baseTableName);
String prestoSelect = "SELECT doc_id, info['age'], pets[2], user_info.surname FROM " + prestoTableName;
String trinoTableName = trinoTableName(baseTableName);
String trinoSelect = "SELECT doc_id, info['age'], pets[2], user_info.surname FROM " + trinoTableName;

QueryResult prestoResult = onTrino().executeQuery(prestoSelect);
QueryResult trinoResult = onTrino().executeQuery(trinoSelect);
Row row = row("Doc213", 28, "Cat", "Claus");
assertThat(prestoResult).containsOnly(row);
assertThat(trinoResult).containsOnly(row);
}

@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS})
public void testSparkReadingCompositePrestoData()
public void testSparkReadingCompositeTrinoData()
{
String baseTableName = "test_spark_reading_presto_composites";
String prestoTableName = prestoTableName(baseTableName);
String baseTableName = "test_spark_reading_trino_composites";
String trinoTableName = trinoTableName(baseTableName);

String prestoTableDefinition = "" +
String trinoTableDefinition = "" +
"CREATE TABLE %s (" +
" doc_id VARCHAR,\n" +
" info MAP(VARCHAR, INTEGER),\n" +
" pets ARRAY(VARCHAR),\n" +
" user_info ROW(name VARCHAR, surname VARCHAR, age INTEGER, gender VARCHAR))";
onTrino().executeQuery(format(prestoTableDefinition, prestoTableName));
onTrino().executeQuery(format(trinoTableDefinition, trinoTableName));

String insert = "INSERT INTO %s VALUES('Doc213', MAP(ARRAY['age', 'children'], ARRAY[28, 3]), ARRAY['Dog', 'Cat', 'Pig'], ROW('Santa', 'Claus', 1000, 'MALE'))";
onTrino().executeQuery(format(insert, prestoTableName));
onTrino().executeQuery(format(insert, trinoTableName));

String sparkTableName = sparkTableName(baseTableName);
String sparkSelect = "SELECT doc_id, info['age'], pets[1], user_info.surname FROM " + sparkTableName;
Expand All @@ -257,9 +257,9 @@ public void testSparkReadingCompositePrestoData()
}

@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS})
public void testPrestoReadingNestedSparkData()
public void testTrinoReadingNestedSparkData()
{
String baseTableName = "test_presto_reading_nested_spark_data";
String baseTableName = "test_trino_reading_nested_spark_data";
String sparkTableName = sparkTableName(baseTableName);

String sparkTableDefinition = "" +
Expand Down Expand Up @@ -297,8 +297,8 @@ public void testPrestoReadingNestedSparkData()
QueryResult sparkResult = onSpark().executeQuery(sparkSelect + sparkTableName);
assertThat(sparkResult).containsOnly(row);

String prestoTableName = prestoTableName(baseTableName);
String prestoSelect = "SELECT" +
String trinoTableName = trinoTableName(baseTableName);
String trinoSelect = "SELECT" +
" doc_id" +
", nested_map['s1'][2].sname" +
", nested_map['s1'][1].snumber" +
Expand All @@ -308,23 +308,23 @@ public void testPrestoReadingNestedSparkData()
", nested_struct.complicated[2]['m2'][2].mnumber" +
" FROM ";

QueryResult prestoResult = onTrino().executeQuery(prestoSelect + prestoTableName);
assertThat(prestoResult).containsOnly(row);
QueryResult trinoResult = onTrino().executeQuery(trinoSelect + trinoTableName);
assertThat(trinoResult).containsOnly(row);
}

@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS})
public void testSparkReadingNestedPrestoData()
public void testSparkReadingNestedTrinoData()
{
String baseTableName = "test_spark_reading_nested_presto_data";
String prestoTableName = prestoTableName(baseTableName);
String baseTableName = "test_spark_reading_nested_trino_data";
String trinoTableName = trinoTableName(baseTableName);

String prestoTableDefinition = "" +
String trinoTableDefinition = "" +
"CREATE TABLE %s (\n" +
" doc_id VARCHAR\n" +
", nested_map MAP(VARCHAR, ARRAY(ROW(sname VARCHAR, snumber INT)))\n" +
", nested_array ARRAY(MAP(VARCHAR, ARRAY(ROW(mname VARCHAR, mnumber INT))))\n" +
", nested_struct ROW(name VARCHAR, complicated ARRAY(MAP(VARCHAR, ARRAY(ROW(mname VARCHAR, mnumber INT))))))";
onTrino().executeQuery(format(prestoTableDefinition, prestoTableName));
onTrino().executeQuery(format(trinoTableDefinition, trinoTableName));

String insert = "" +
"INSERT INTO %s SELECT" +
Expand All @@ -335,11 +335,11 @@ public void testSparkReadingNestedPrestoData()
", row('S1'" +
" ,array[map(array['m1'], array[array[row('SAMA1Name1', 301), row('SAMA1Name2', 302)]])" +
" ,map(array['m2'], array[array[row('SAMA2Name1', 401), row('SAMA2Name2', 402)]])])";
onTrino().executeQuery(format(insert, prestoTableName));
onTrino().executeQuery(format(insert, trinoTableName));

Row row = row("Doc213", "ASName2", 201, "MAS2Name1", 302, "SAMA1Name1", 402);

String prestoSelect = "SELECT" +
String trinoSelect = "SELECT" +
" doc_id" +
", nested_map['s1'][2].sname" +
", nested_map['s1'][1].snumber" +
Expand All @@ -349,8 +349,8 @@ public void testSparkReadingNestedPrestoData()
", nested_struct.complicated[2]['m2'][2].mnumber" +
" FROM ";

QueryResult prestoResult = onTrino().executeQuery(prestoSelect + prestoTableName);
assertThat(prestoResult).containsOnly(row);
QueryResult trinoResult = onTrino().executeQuery(trinoSelect + trinoTableName);
assertThat(trinoResult).containsOnly(row);

String sparkTableName = sparkTableName(baseTableName);
String sparkSelect = "SELECT" +
Expand All @@ -371,7 +371,7 @@ public void testSparkReadingNestedPrestoData()
public void testIdBasedFieldMapping()
{
String baseTableName = "test_schema_evolution_for_nested_fields";
String prestoTableName = prestoTableName(baseTableName);
String trinoTableName = trinoTableName(baseTableName);
String sparkTableName = sparkTableName(baseTableName);

onSpark().executeQuery(format(
Expand All @@ -387,7 +387,7 @@ public void testIdBasedFieldMapping()
+ "1001",
sparkTableName));

// Alter nested fields using Spark. Presto does not support this yet.
// Alter nested fields using Spark. Trino does not support this yet.
onSpark().executeQuery(format("ALTER TABLE %s RENAME COLUMN _struct.rename TO renamed", sparkTableName));
onSpark().executeQuery(format("ALTER TABLE %s DROP COLUMN _struct.drop_and_add", sparkTableName));
onSpark().executeQuery(format("ALTER TABLE %s ADD COLUMN _struct.drop_and_add BIGINT", sparkTableName));
Expand All @@ -403,7 +403,7 @@ public void testIdBasedFieldMapping()
.build(),
1001);

QueryResult result = onTrino().executeQuery(format("SELECT * FROM %s", prestoTableName));
QueryResult result = onTrino().executeQuery(format("SELECT * FROM %s", trinoTableName));
assertEquals(result.column(1).get(0), expected.getValues().get(0));
}

Expand All @@ -414,23 +414,23 @@ public void testTrinoShowingSparkCreatedTables()
String trinoTable = "test_table_listing_for_trino";

onSpark().executeQuery(format("CREATE TABLE %s (_integer INTEGER ) USING ICEBERG", sparkTableName(sparkTable)));
onTrino().executeQuery(format("CREATE TABLE %s (_integer INTEGER )", prestoTableName(trinoTable)));
onTrino().executeQuery(format("CREATE TABLE %s (_integer INTEGER )", trinoTableName(trinoTable)));

assertThat(onTrino().executeQuery(format("SHOW TABLES FROM %s LIKE '%s'", TEST_SCHEMA_NAME, "test_table_listing_for_%")))
.containsOnly(row(sparkTable), row(trinoTable));

onSpark().executeQuery("DROP TABLE " + sparkTableName(sparkTable));
onTrino().executeQuery("DROP TABLE " + prestoTableName(trinoTable));
onTrino().executeQuery("DROP TABLE " + trinoTableName(trinoTable));
}

private static String sparkTableName(String tableName)
{
return format("%s.%s.%s", SPARK_CATALOG, TEST_SCHEMA_NAME, tableName);
}

private static String prestoTableName(String tableName)
private static String trinoTableName(String tableName)
{
return format("%s.%s.%s", PRESTO_CATALOG, TEST_SCHEMA_NAME, tableName);
return format("%s.%s.%s", TRINO_CATALOG, TEST_SCHEMA_NAME, tableName);
}

private io.trino.jdbc.Row.Builder rowBuilder()
Expand Down

0 comments on commit adb45f3

Please sign in to comment.