Skip to content

Commit

Permalink
Merge pull request #159 from keboola/jirka-add-test-for-inc-load-with…
Browse files Browse the repository at this point in the history
…-null

TER-71 add test for inc load with null
  • Loading branch information
jirkasemmler authored Nov 30, 2022
2 parents 9c6c256 + 78884b8 commit 9304242
Show file tree
Hide file tree
Showing 12 changed files with 175 additions and 47 deletions.
21 changes: 13 additions & 8 deletions src/Backend/Teradata/ToFinalTable/SqlBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public function getInsertAllIntoTargetTableCommand(
);
}
if (in_array($columnDefinition->getColumnName(), $importOptions->getConvertEmptyValuesToNull(), true)) {
// use nullif only for string base type
// use nullif only for string base type -> OM
if ($columnDefinition->getColumnDefinition()->getBasetype() === BaseType::STRING) {
$columnsSetSql[] = sprintf(
'NULLIF(%s, \'\')',
Expand Down Expand Up @@ -252,7 +252,7 @@ public function getDeleteOldItemsCommand(
$destinationTableDefinition->getPrimaryKeysNames(),
$importOptions,
$stagingTable,
'"joined"'
TeradataQuote::quoteSingleIdentifier('joined')
)
);
}
Expand Down Expand Up @@ -313,7 +313,12 @@ public function getUpdateWithPkCommand(
$destinationDefinition->getPrimaryKeysNames()
),
implode(', ', $columnsSet),
$this->getPrimayKeyWhereConditions($destinationDefinition->getPrimaryKeysNames(), $importOptions, $dest)
$this->getPrimayKeyWhereConditions(
$destinationDefinition->getPrimaryKeysNames(),
$importOptions,
TeradataQuote::quoteSingleIdentifier('src'),
$dest
)
);
}

Expand All @@ -323,19 +328,19 @@ public function getUpdateWithPkCommand(
private function getPrimayKeyWhereConditions(
array $primaryKeys,
TeradataImportOptions $importOptions,
string $dest,
string $alias = '"src"'
string $coalescedTable,
string $leftTable
): string {
$pkWhereSql = array_map(function (string $col) use ($importOptions, $dest, $alias) {
$pkWhereSql = array_map(function (string $col) use ($importOptions, $coalescedTable, $leftTable) {
$str = 'TRIM(%s.%s) = COALESCE(TRIM(%s.%s), \'\')';
if (!$importOptions->isNullManipulationEnabled()) {
$str = 'TRIM(%s.%s) = TRIM(%s.%s)';
}
return sprintf(
$str,
$dest,
$leftTable,
TeradataQuote::quoteSingleIdentifier($col),
$alias,
$coalescedTable,
TeradataQuote::quoteSingleIdentifier($col)
);
}, $primaryKeys);
Expand Down
3 changes: 2 additions & 1 deletion tests/data/multi-pk.increment.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
VisitID,Value,MenuItem,Something,Other
134,"nothing","","new","value"
140,"my","ddd","abc","rest"
200,,"ukulele","nova","nova"
200,,"ukulele","nova","nova"
200,,"ukulele","nova","nova"
2 changes: 1 addition & 1 deletion tests/functional/Snowflake/IncrementalImportTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public function incrementalImportData(): array
$this->getSimpleIncrementalImportOptions(),
new Storage\Snowflake\Table($this->getDestinationSchemaName(), 'multi-pk'),
$expectedMultiPkRows,
3,
4,
];
return $tests;
}
Expand Down
30 changes: 30 additions & 0 deletions tests/functional/Snowflake/ToFinal/IncrementalImportTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public function incrementalImportData(): Generator
{
$accountsStub = $this->getParseCsvStub('expectation.tw_accounts.increment.csv');
$multiPKStub = $this->getParseCsvStub('expectation.multi-pk_not-null.increment.csv');
$multiPKWithNullStub = $this->getParseCsvStub('expectation.multi-pk.increment.csv');

$tests = [];
yield 'simple' => [
Expand Down Expand Up @@ -127,6 +128,35 @@ public function incrementalImportData(): Generator
3,
self::TABLE_MULTI_PK_WITH_TS,
];

yield 'multi pk with null' => [
$this->getSourceInstance(
'multi-pk.csv',
$multiPKWithNullStub->getColumns(),
false,
false,
['VisitID', 'Value', 'MenuItem']
),
new SnowflakeImportOptions(
[],
true, // incremental
false, // disable timestamp
ImportOptions::SKIP_FIRST_LINE
),
$this->getSourceInstance(
'multi-pk.increment.csv',
$multiPKWithNullStub->getColumns(),
false,
false,
['VisitID', 'Value', 'MenuItem']
),
$this->getSnowflakeIncrementalImportOptions(),
[$this->getDestinationSchemaName(), self::TABLE_MULTI_PK_WITH_TS],
$multiPKWithNullStub->getRows(),
4,
self::TABLE_MULTI_PK_WITH_TS,
];

return $tests;
}

Expand Down
2 changes: 1 addition & 1 deletion tests/functional/Synapse/IncrementalImportNoTypesTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public function incrementalImportData(): Generator
// Synapse is not comparing empty strings and null same as other backends
// this can't be tested as dedup in full import is not deterministic, so we test only expected row count
6,
3,
4,
[self::TABLE_MULTI_PK],
];
return $tests;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public function incrementalImportData(): Generator
// Synapse is not comparing empty strings and null same as other backends
// this can't be tested as dedup in full import is not deterministic, so we test only expected row count
6,
3,
4,
[self::TABLE_MULTI_PK],
];
return $tests;
Expand Down
37 changes: 37 additions & 0 deletions tests/functional/Teradata/ToFinal/IncrementalImportTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@ public function incrementalImportData(): Generator
$multiPkColumns = array_shift($expectedMultiPkRows);
$expectedMultiPkRows = array_values($expectedMultiPkRows);

// multi pk nulls
$expectationMultiPkNullFile = new CsvFile(self::DATA_DIR . 'expectation.multi-pk.increment.csv');
$expectedMultiPkNullRows = [];
foreach ($expectationMultiPkNullFile as $row) {
$expectedMultiPkNullRows[] = $row;
}
/** @var string[] $multiPkColumns */
$multiPkColumns = array_shift($expectedMultiPkNullRows);
$expectedMultiPkNullRows = array_values($expectedMultiPkNullRows);

$multiPkExpectationsWithoutPKFile = new CsvFile(self::DATA_DIR . 'multi-pk.csv');
$multiPkExpectationsWithoutPKRows = [];
foreach ($multiPkExpectationsWithoutPKFile as $row) {
Expand Down Expand Up @@ -159,6 +169,33 @@ public function incrementalImportData(): Generator
3,
self::TABLE_MULTI_PK_WITH_TS,
];
yield 'multi pk with null' => [
$this->getSourceInstance(
'multi-pk.csv',
$multiPkColumns,
false,
false,
['VisitID', 'Value', 'MenuItem']
),
$this->getImportOptions(
[],
false,
true, // disable timestamp
ImportOptions::SKIP_FIRST_LINE
),
$this->getSourceInstance(
'multi-pk.increment.csv',
$multiPkColumns,
false,
false,
['VisitID', 'Value', 'MenuItem']
),
$this->getTeradataIncrementalImportOptions(),
[$this->getDestinationSchemaName(), self::TABLE_MULTI_PK_WITH_TS],
$expectedMultiPkNullRows,
4,
self::TABLE_MULTI_PK_WITH_TS,
];
yield 'no pk' => [
$this->getSourceInstance(
'multi-pk.csv',
Expand Down
4 changes: 2 additions & 2 deletions tests/functional/Teradata/ToFinal/SqlBuilderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,7 @@ public function testGetDeleteOldItemsCommand(): void

$expectedSql = sprintf(
// phpcs:ignore
'DELETE %s FROM %s AS "joined" WHERE TRIM(%s."pk1") = COALESCE(TRIM("joined"."pk1"), \'\') AND TRIM(%s."pk2") = COALESCE(TRIM("joined"."pk2"), \'\')',
'DELETE %s FROM %s AS "joined" WHERE TRIM("joined"."pk1") = COALESCE(TRIM(%s."pk1"), \'\') AND TRIM("joined"."pk2") = COALESCE(TRIM(%s."pk2"), \'\')',
$stagingTable,
$storageTable,
$stagingTable,
Expand Down Expand Up @@ -1018,7 +1018,7 @@ public function testGetDeleteOldItemsCommandRequireSameTables(): void

$expectedSql = sprintf(
// phpcs:ignore
'DELETE %s FROM %s AS "joined" WHERE TRIM(%s."pk1") = TRIM("joined"."pk1") AND TRIM(%s."pk2") = TRIM("joined"."pk2")',
'DELETE %s FROM %s AS "joined" WHERE TRIM("joined"."pk1") = TRIM(%s."pk1") AND TRIM("joined"."pk2") = TRIM(%s."pk2")',
$stagingTable,
$storageTable,
$stagingTable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,14 @@ public function testGetCopyCommands(): void
EOT
);

$conn->expects(self::once())->method('fetchOne')
->with('SELECT COUNT(*) AS NumberOfRows FROM "schema"."stagingTable"')
->willReturn(10);
$conn->expects(self::once())->method('fetchAllAssociative')
// phpcs:ignore
->with("SELECT TABLE_TYPE,BYTES,ROW_COUNT FROM information_schema.tables WHERE TABLE_SCHEMA = 'schema' AND TABLE_NAME = 'stagingTable';")
->willReturn([
[
'TABLE_TYPE' => 'BASE TABLE', 'BYTES' => 0, 'ROW_COUNT' => 10,
],
]);

$destination = new SnowflakeTableDefinition(
'schema',
Expand Down Expand Up @@ -78,9 +83,14 @@ public function testGetCopyCommandsRowSkip(): void
);
// @codingStandardsIgnoreEnd

$conn->expects(self::once())->method('fetchOne')
->with('SELECT COUNT(*) AS NumberOfRows FROM "schema"."stagingTable"')
->willReturn(7);
$conn->expects(self::once())->method('fetchAllAssociative')
// phpcs:ignore
->with("SELECT TABLE_TYPE,BYTES,ROW_COUNT FROM information_schema.tables WHERE TABLE_SCHEMA = 'schema' AND TABLE_NAME = 'stagingTable';")
->willReturn([
[
'TABLE_TYPE' => 'BASE TABLE', 'BYTES' => 0, 'ROW_COUNT' => 7,
],
]);

$destination = new SnowflakeTableDefinition(
'schema',
Expand Down Expand Up @@ -129,9 +139,14 @@ public function testGetCopyCommandWithMoreChunksOfFiles(): void
$q2 = sprintf($qTemplate, implode(', ', array_slice($entriesWithoutBucket, 1000, 5)));
$conn->expects(self::exactly(2))->method('executeStatement')->withConsecutive([$q1], [$q2]);

$conn->expects(self::once())->method('fetchOne')
->with('SELECT COUNT(*) AS NumberOfRows FROM "schema"."stagingTable"')
->willReturn(7);
$conn->expects(self::once())->method('fetchAllAssociative')
// phpcs:ignore
->with("SELECT TABLE_TYPE,BYTES,ROW_COUNT FROM information_schema.tables WHERE TABLE_SCHEMA = 'schema' AND TABLE_NAME = 'stagingTable';")
->willReturn([
[
'TABLE_TYPE' => 'BASE TABLE', 'BYTES' => 0, 'ROW_COUNT' => 7,
],
]);

$destination = new SnowflakeTableDefinition(
'schema',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,14 @@ public function testGetCopyCommands(): void
EOT
);

$conn->expects(self::once())->method('fetchOne')
->with('SELECT COUNT(*) AS NumberOfRows FROM "schema"."stagingTable"')
->willReturn(10);
$conn->expects(self::once())->method('fetchAllAssociative')
// phpcs:ignore
->with("SELECT TABLE_TYPE,BYTES,ROW_COUNT FROM information_schema.tables WHERE TABLE_SCHEMA = 'schema' AND TABLE_NAME = 'stagingTable';")
->willReturn([
[
'TABLE_TYPE' => 'BASE TABLE', 'BYTES' => 0, 'ROW_COUNT' => 10,
],
]);

$destination = new SnowflakeTableDefinition(
'schema',
Expand Down Expand Up @@ -86,9 +91,14 @@ public function testGetCopyCommandsRowSkip(): void
EOT
);

$conn->expects(self::once())->method('fetchOne')
->with('SELECT COUNT(*) AS NumberOfRows FROM "schema"."stagingTable"')
->willReturn(7);
$conn->expects(self::once())->method('fetchAllAssociative')
// phpcs:ignore
->with("SELECT TABLE_TYPE,BYTES,ROW_COUNT FROM information_schema.tables WHERE TABLE_SCHEMA = 'schema' AND TABLE_NAME = 'stagingTable';")
->willReturn([
[
'TABLE_TYPE' => 'BASE TABLE', 'BYTES' => 0, 'ROW_COUNT' => 7,
],
]);

$destination = new SnowflakeTableDefinition(
'schema',
Expand Down Expand Up @@ -143,9 +153,14 @@ public function testGetCopyCommandWithMoreChunksOfFiles(): void
$q2 = sprintf($qTemplate, implode(', ', array_slice($entriesWithoutBucket, 1000, 5)));
$conn->expects(self::exactly(2))->method('executeStatement')->withConsecutive([$q1], [$q2]);

$conn->expects(self::once())->method('fetchOne')
->with('SELECT COUNT(*) AS NumberOfRows FROM "schema"."stagingTable"')
->willReturn(7);
$conn->expects(self::once())->method('fetchAllAssociative')
// phpcs:ignore
->with("SELECT TABLE_TYPE,BYTES,ROW_COUNT FROM information_schema.tables WHERE TABLE_SCHEMA = 'schema' AND TABLE_NAME = 'stagingTable';")
->willReturn([
[
'TABLE_TYPE' => 'BASE TABLE', 'BYTES' => 0, 'ROW_COUNT' => 7,
],
]);

$destination = new SnowflakeTableDefinition(
'schema',
Expand Down
33 changes: 24 additions & 9 deletions tests/unit/Backend/Snowflake/ToStage/FromS3CopyIntoAdapterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,14 @@ public function testGetCopyCommands(): void
EOT
);

$conn->expects(self::once())->method('fetchOne')
->with('SELECT COUNT(*) AS NumberOfRows FROM "schema"."stagingTable"')
->willReturn(10);
$conn->expects(self::once())->method('fetchAllAssociative')
// phpcs:ignore
->with("SELECT TABLE_TYPE,BYTES,ROW_COUNT FROM information_schema.tables WHERE TABLE_SCHEMA = 'schema' AND TABLE_NAME = 'stagingTable';")
->willReturn([
[
'TABLE_TYPE' => 'BASE TABLE', 'BYTES' => 0, 'ROW_COUNT' => 10,
],
]);

$destination = new SnowflakeTableDefinition(
'schema',
Expand Down Expand Up @@ -86,9 +91,14 @@ public function testGetCopyCommandsRowSkip(): void
EOT
);

$conn->expects(self::once())->method('fetchOne')
->with('SELECT COUNT(*) AS NumberOfRows FROM "schema"."stagingTable"')
->willReturn(7);
$conn->expects(self::once())->method('fetchAllAssociative')
// phpcs:ignore
->with("SELECT TABLE_TYPE,BYTES,ROW_COUNT FROM information_schema.tables WHERE TABLE_SCHEMA = 'schema' AND TABLE_NAME = 'stagingTable';")
->willReturn([
[
'TABLE_TYPE' => 'BASE TABLE', 'BYTES' => 0, 'ROW_COUNT' => 7,
],
]);

$destination = new SnowflakeTableDefinition(
'schema',
Expand Down Expand Up @@ -143,9 +153,14 @@ public function testGetCopyCommandWithMoreChunksOfFiles(): void
$q2 = sprintf($qTemplate, implode(', ', array_slice($entriesWithoutBucket, 1000, 5)));
$conn->expects(self::exactly(2))->method('executeStatement')->withConsecutive([$q1], [$q2]);

$conn->expects(self::once())->method('fetchOne')
->with('SELECT COUNT(*) AS NumberOfRows FROM "schema"."stagingTable"')
->willReturn(7);
$conn->expects(self::once())->method('fetchAllAssociative')
// phpcs:ignore
->with("SELECT TABLE_TYPE,BYTES,ROW_COUNT FROM information_schema.tables WHERE TABLE_SCHEMA = 'schema' AND TABLE_NAME = 'stagingTable';")
->willReturn([
[
'TABLE_TYPE' => 'BASE TABLE', 'BYTES' => 0, 'ROW_COUNT' => 7,
],
]);

$destination = new SnowflakeTableDefinition(
'schema',
Expand Down
Loading

0 comments on commit 9304242

Please sign in to comment.