diff --git a/src/Backend/Teradata/ToFinalTable/SqlBuilder.php b/src/Backend/Teradata/ToFinalTable/SqlBuilder.php index 6d9a5cbd..30e4e1ee 100644 --- a/src/Backend/Teradata/ToFinalTable/SqlBuilder.php +++ b/src/Backend/Teradata/ToFinalTable/SqlBuilder.php @@ -97,7 +97,7 @@ public function getInsertAllIntoTargetTableCommand( ); } if (in_array($columnDefinition->getColumnName(), $importOptions->getConvertEmptyValuesToNull(), true)) { - // use nullif only for string base type + // use nullif only for string base type -> OM if ($columnDefinition->getColumnDefinition()->getBasetype() === BaseType::STRING) { $columnsSetSql[] = sprintf( 'NULLIF(%s, \'\')', @@ -252,7 +252,7 @@ public function getDeleteOldItemsCommand( $destinationTableDefinition->getPrimaryKeysNames(), $importOptions, $stagingTable, - '"joined"' + TeradataQuote::quoteSingleIdentifier('joined') ) ); } @@ -313,7 +313,12 @@ public function getUpdateWithPkCommand( $destinationDefinition->getPrimaryKeysNames() ), implode(', ', $columnsSet), - $this->getPrimayKeyWhereConditions($destinationDefinition->getPrimaryKeysNames(), $importOptions, $dest) + $this->getPrimayKeyWhereConditions( + $destinationDefinition->getPrimaryKeysNames(), + $importOptions, + TeradataQuote::quoteSingleIdentifier('src'), + $dest + ) ); } @@ -323,19 +328,19 @@ public function getUpdateWithPkCommand( private function getPrimayKeyWhereConditions( array $primaryKeys, TeradataImportOptions $importOptions, - string $dest, - string $alias = '"src"' + string $coalescedTable, + string $leftTable ): string { - $pkWhereSql = array_map(function (string $col) use ($importOptions, $dest, $alias) { + $pkWhereSql = array_map(function (string $col) use ($importOptions, $coalescedTable, $leftTable) { $str = 'TRIM(%s.%s) = COALESCE(TRIM(%s.%s), \'\')'; if (!$importOptions->isNullManipulationEnabled()) { $str = 'TRIM(%s.%s) = TRIM(%s.%s)'; } return sprintf( $str, - $dest, + $leftTable, TeradataQuote::quoteSingleIdentifier($col), - $alias, + $coalescedTable, TeradataQuote::quoteSingleIdentifier($col) ); }, $primaryKeys); diff --git a/tests/data/multi-pk.increment.csv b/tests/data/multi-pk.increment.csv index 485589f0..ee1c5a79 100644 --- a/tests/data/multi-pk.increment.csv +++ b/tests/data/multi-pk.increment.csv @@ -1,4 +1,5 @@ VisitID,Value,MenuItem,Something,Other 134,"nothing","","new","value" 140,"my","ddd","abc","rest" -200,,"ukulele","nova","nova" \ No newline at end of file +200,,"ukulele","nova","nova" +200,,"ukulele","nova","nova" diff --git a/tests/functional/Snowflake/IncrementalImportTest.php b/tests/functional/Snowflake/IncrementalImportTest.php index 9cca7897..434eb83a 100644 --- a/tests/functional/Snowflake/IncrementalImportTest.php +++ b/tests/functional/Snowflake/IncrementalImportTest.php @@ -70,7 +70,7 @@ public function incrementalImportData(): array $this->getSimpleIncrementalImportOptions(), new Storage\Snowflake\Table($this->getDestinationSchemaName(), 'multi-pk'), $expectedMultiPkRows, - 3, + 4, ]; return $tests; } diff --git a/tests/functional/Snowflake/ToFinal/IncrementalImportTest.php b/tests/functional/Snowflake/ToFinal/IncrementalImportTest.php index 52cdc702..1ebf978e 100644 --- a/tests/functional/Snowflake/ToFinal/IncrementalImportTest.php +++ b/tests/functional/Snowflake/ToFinal/IncrementalImportTest.php @@ -49,6 +49,7 @@ public function incrementalImportData(): Generator { $accountsStub = $this->getParseCsvStub('expectation.tw_accounts.increment.csv'); $multiPKStub = $this->getParseCsvStub('expectation.multi-pk_not-null.increment.csv'); + $multiPKWithNullStub = $this->getParseCsvStub('expectation.multi-pk.increment.csv'); $tests = []; yield 'simple' => [ @@ -127,6 +128,35 @@ public function incrementalImportData(): Generator 3, self::TABLE_MULTI_PK_WITH_TS, ]; + + yield 'multi pk with null' => [ + $this->getSourceInstance( + 'multi-pk.csv', + $multiPKWithNullStub->getColumns(), + false, + false, + ['VisitID', 'Value', 'MenuItem'] + ), + new SnowflakeImportOptions( + [], + true, // incremental + false, // disable timestamp + ImportOptions::SKIP_FIRST_LINE + ), + $this->getSourceInstance( + 'multi-pk.increment.csv', + $multiPKWithNullStub->getColumns(), + false, + false, + ['VisitID', 'Value', 'MenuItem'] + ), + $this->getSnowflakeIncrementalImportOptions(), + [$this->getDestinationSchemaName(), self::TABLE_MULTI_PK_WITH_TS], + $multiPKWithNullStub->getRows(), + 4, + self::TABLE_MULTI_PK_WITH_TS, + ]; + return $tests; } diff --git a/tests/functional/Synapse/IncrementalImportNoTypesTest.php b/tests/functional/Synapse/IncrementalImportNoTypesTest.php index c07f6f3c..9e1f5144 100644 --- a/tests/functional/Synapse/IncrementalImportNoTypesTest.php +++ b/tests/functional/Synapse/IncrementalImportNoTypesTest.php @@ -126,7 +126,7 @@ public function incrementalImportData(): Generator // Synapse is not comparing empty strings and null same as other backends // this can't be tested as dedup in full import is not deterministic, so we test only expected row count 6, - 3, + 4, [self::TABLE_MULTI_PK], ]; return $tests; diff --git a/tests/functional/Synapse/IncrementalImportWithTypesTest.php b/tests/functional/Synapse/IncrementalImportWithTypesTest.php index 75b0e392..820d5703 100644 --- a/tests/functional/Synapse/IncrementalImportWithTypesTest.php +++ b/tests/functional/Synapse/IncrementalImportWithTypesTest.php @@ -182,7 +182,7 @@ public function incrementalImportData(): Generator // Synapse is not comparing empty strings and null same as other backends // this can't be tested as dedup in full import is not deterministic, so we test only expected row count 6, - 3, + 4, [self::TABLE_MULTI_PK], ]; return $tests; diff --git a/tests/functional/Teradata/ToFinal/IncrementalImportTest.php b/tests/functional/Teradata/ToFinal/IncrementalImportTest.php index 7f127ab9..441e42d8 100644 --- a/tests/functional/Teradata/ToFinal/IncrementalImportTest.php +++ b/tests/functional/Teradata/ToFinal/IncrementalImportTest.php @@ -69,6 +69,16 @@ public function incrementalImportData(): Generator $multiPkColumns = array_shift($expectedMultiPkRows); $expectedMultiPkRows = array_values($expectedMultiPkRows); + // multi pk nulls + $expectationMultiPkNullFile = new CsvFile(self::DATA_DIR . 'expectation.multi-pk.increment.csv'); + $expectedMultiPkNullRows = []; + foreach ($expectationMultiPkNullFile as $row) { + $expectedMultiPkNullRows[] = $row; + } + /** @var string[] $multiPkColumns */ + $multiPkColumns = array_shift($expectedMultiPkNullRows); + $expectedMultiPkNullRows = array_values($expectedMultiPkNullRows); + $multiPkExpectationsWithoutPKFile = new CsvFile(self::DATA_DIR . 'multi-pk.csv'); $multiPkExpectationsWithoutPKRows = []; foreach ($multiPkExpectationsWithoutPKFile as $row) { @@ -159,6 +169,33 @@ public function incrementalImportData(): Generator 3, self::TABLE_MULTI_PK_WITH_TS, ]; + yield 'multi pk with null' => [ + $this->getSourceInstance( + 'multi-pk.csv', + $multiPkColumns, + false, + false, + ['VisitID', 'Value', 'MenuItem'] + ), + $this->getImportOptions( + [], + false, + true, // disable timestamp + ImportOptions::SKIP_FIRST_LINE + ), + $this->getSourceInstance( + 'multi-pk.increment.csv', + $multiPkColumns, + false, + false, + ['VisitID', 'Value', 'MenuItem'] + ), + $this->getTeradataIncrementalImportOptions(), + [$this->getDestinationSchemaName(), self::TABLE_MULTI_PK_WITH_TS], + $expectedMultiPkNullRows, + 4, + self::TABLE_MULTI_PK_WITH_TS, + ]; yield 'no pk' => [ $this->getSourceInstance( 'multi-pk.csv', diff --git a/tests/functional/Teradata/ToFinal/SqlBuilderTest.php b/tests/functional/Teradata/ToFinal/SqlBuilderTest.php index 6ec7b728..ab7c1c2e 100644 --- a/tests/functional/Teradata/ToFinal/SqlBuilderTest.php +++ b/tests/functional/Teradata/ToFinal/SqlBuilderTest.php @@ -909,7 +909,7 @@ public function testGetDeleteOldItemsCommand(): void $expectedSql = sprintf( // phpcs:ignore - 'DELETE %s FROM %s AS "joined" WHERE TRIM(%s."pk1") = COALESCE(TRIM("joined"."pk1"), \'\') AND TRIM(%s."pk2") = COALESCE(TRIM("joined"."pk2"), \'\')', + 'DELETE %s FROM %s AS "joined" WHERE TRIM("joined"."pk1") = COALESCE(TRIM(%s."pk1"), \'\') AND TRIM("joined"."pk2") = COALESCE(TRIM(%s."pk2"), \'\')', $stagingTable, $storageTable, $stagingTable, @@ -1018,7 +1018,7 @@ public function testGetDeleteOldItemsCommandRequireSameTables(): void $expectedSql = sprintf( // phpcs:ignore - 'DELETE %s FROM %s AS "joined" WHERE TRIM(%s."pk1") = TRIM("joined"."pk1") AND TRIM(%s."pk2") = TRIM("joined"."pk2")', + 'DELETE %s FROM %s AS "joined" WHERE TRIM("joined"."pk1") = TRIM(%s."pk1") AND TRIM("joined"."pk2") = TRIM(%s."pk2")', $stagingTable, $storageTable, $stagingTable, diff --git a/tests/unit/Backend/Snowflake/ToStage/FromABSCopyIntoAdapterTest.php b/tests/unit/Backend/Snowflake/ToStage/FromABSCopyIntoAdapterTest.php index 6e37546e..2b068f63 100644 --- a/tests/unit/Backend/Snowflake/ToStage/FromABSCopyIntoAdapterTest.php +++ b/tests/unit/Backend/Snowflake/ToStage/FromABSCopyIntoAdapterTest.php @@ -36,9 +36,14 @@ public function testGetCopyCommands(): void EOT ); - $conn->expects(self::once())->method('fetchOne') - ->with('SELECT COUNT(*) AS NumberOfRows FROM "schema"."stagingTable"') - ->willReturn(10); + $conn->expects(self::once())->method('fetchAllAssociative') + // phpcs:ignore + ->with("SELECT TABLE_TYPE,BYTES,ROW_COUNT FROM information_schema.tables WHERE TABLE_SCHEMA = 'schema' AND TABLE_NAME = 'stagingTable';") + ->willReturn([ + [ + 'TABLE_TYPE' => 'BASE TABLE', 'BYTES' => 0, 'ROW_COUNT' => 10, + ], + ]); $destination = new SnowflakeTableDefinition( 'schema', @@ -78,9 +83,14 @@ public function testGetCopyCommandsRowSkip(): void ); // @codingStandardsIgnoreEnd - $conn->expects(self::once())->method('fetchOne') - ->with('SELECT COUNT(*) AS NumberOfRows FROM "schema"."stagingTable"') - ->willReturn(7); + $conn->expects(self::once())->method('fetchAllAssociative') + // phpcs:ignore + ->with("SELECT TABLE_TYPE,BYTES,ROW_COUNT FROM information_schema.tables WHERE TABLE_SCHEMA = 'schema' AND TABLE_NAME = 'stagingTable';") + ->willReturn([ + [ + 'TABLE_TYPE' => 'BASE TABLE', 'BYTES' => 0, 'ROW_COUNT' => 7, + ], + ]); $destination = new SnowflakeTableDefinition( 'schema', @@ -129,9 +139,14 @@ public function testGetCopyCommandWithMoreChunksOfFiles(): void $q2 = sprintf($qTemplate, implode(', ', array_slice($entriesWithoutBucket, 1000, 5))); $conn->expects(self::exactly(2))->method('executeStatement')->withConsecutive([$q1], [$q2]); - $conn->expects(self::once())->method('fetchOne') - ->with('SELECT COUNT(*) AS NumberOfRows FROM "schema"."stagingTable"') - ->willReturn(7); + $conn->expects(self::once())->method('fetchAllAssociative') + // phpcs:ignore + ->with("SELECT TABLE_TYPE,BYTES,ROW_COUNT FROM information_schema.tables WHERE TABLE_SCHEMA = 'schema' AND TABLE_NAME = 'stagingTable';") + ->willReturn([ + [ + 'TABLE_TYPE' => 'BASE TABLE', 'BYTES' => 0, 'ROW_COUNT' => 7, + ], + ]); $destination = new SnowflakeTableDefinition( 'schema', diff --git a/tests/unit/Backend/Snowflake/ToStage/FromGCSCopyIntoAdapterTest.php b/tests/unit/Backend/Snowflake/ToStage/FromGCSCopyIntoAdapterTest.php index f5cd635f..c4ed7842 100644 --- a/tests/unit/Backend/Snowflake/ToStage/FromGCSCopyIntoAdapterTest.php +++ b/tests/unit/Backend/Snowflake/ToStage/FromGCSCopyIntoAdapterTest.php @@ -41,9 +41,14 @@ public function testGetCopyCommands(): void EOT ); - $conn->expects(self::once())->method('fetchOne') - ->with('SELECT COUNT(*) AS NumberOfRows FROM "schema"."stagingTable"') - ->willReturn(10); + $conn->expects(self::once())->method('fetchAllAssociative') + // phpcs:ignore + ->with("SELECT TABLE_TYPE,BYTES,ROW_COUNT FROM information_schema.tables WHERE TABLE_SCHEMA = 'schema' AND TABLE_NAME = 'stagingTable';") + ->willReturn([ + [ + 'TABLE_TYPE' => 'BASE TABLE', 'BYTES' => 0, 'ROW_COUNT' => 10, + ], + ]); $destination = new SnowflakeTableDefinition( 'schema', @@ -86,9 +91,14 @@ public function testGetCopyCommandsRowSkip(): void EOT ); - $conn->expects(self::once())->method('fetchOne') - ->with('SELECT COUNT(*) AS NumberOfRows FROM "schema"."stagingTable"') - ->willReturn(7); + $conn->expects(self::once())->method('fetchAllAssociative') + // phpcs:ignore + ->with("SELECT TABLE_TYPE,BYTES,ROW_COUNT FROM information_schema.tables WHERE TABLE_SCHEMA = 'schema' AND TABLE_NAME = 'stagingTable';") + ->willReturn([ + [ + 'TABLE_TYPE' => 'BASE TABLE', 'BYTES' => 0, 'ROW_COUNT' => 7, + ], + ]); $destination = new SnowflakeTableDefinition( 'schema', @@ -143,9 +153,14 @@ public function testGetCopyCommandWithMoreChunksOfFiles(): void $q2 = sprintf($qTemplate, implode(', ', array_slice($entriesWithoutBucket, 1000, 5))); $conn->expects(self::exactly(2))->method('executeStatement')->withConsecutive([$q1], [$q2]); - $conn->expects(self::once())->method('fetchOne') - ->with('SELECT COUNT(*) AS NumberOfRows FROM "schema"."stagingTable"') - ->willReturn(7); + $conn->expects(self::once())->method('fetchAllAssociative') + // phpcs:ignore + ->with("SELECT TABLE_TYPE,BYTES,ROW_COUNT FROM information_schema.tables WHERE TABLE_SCHEMA = 'schema' AND TABLE_NAME = 'stagingTable';") + ->willReturn([ + [ + 'TABLE_TYPE' => 'BASE TABLE', 'BYTES' => 0, 'ROW_COUNT' => 7, + ], + ]); $destination = new SnowflakeTableDefinition( 'schema', diff --git a/tests/unit/Backend/Snowflake/ToStage/FromS3CopyIntoAdapterTest.php b/tests/unit/Backend/Snowflake/ToStage/FromS3CopyIntoAdapterTest.php index 99e1c3ee..ead75253 100644 --- a/tests/unit/Backend/Snowflake/ToStage/FromS3CopyIntoAdapterTest.php +++ b/tests/unit/Backend/Snowflake/ToStage/FromS3CopyIntoAdapterTest.php @@ -41,9 +41,14 @@ public function testGetCopyCommands(): void EOT ); - $conn->expects(self::once())->method('fetchOne') - ->with('SELECT COUNT(*) AS NumberOfRows FROM "schema"."stagingTable"') - ->willReturn(10); + $conn->expects(self::once())->method('fetchAllAssociative') + // phpcs:ignore + ->with("SELECT TABLE_TYPE,BYTES,ROW_COUNT FROM information_schema.tables WHERE TABLE_SCHEMA = 'schema' AND TABLE_NAME = 'stagingTable';") + ->willReturn([ + [ + 'TABLE_TYPE' => 'BASE TABLE', 'BYTES' => 0, 'ROW_COUNT' => 10, + ], + ]); $destination = new SnowflakeTableDefinition( 'schema', @@ -86,9 +91,14 @@ public function testGetCopyCommandsRowSkip(): void EOT ); - $conn->expects(self::once())->method('fetchOne') - ->with('SELECT COUNT(*) AS NumberOfRows FROM "schema"."stagingTable"') - ->willReturn(7); + $conn->expects(self::once())->method('fetchAllAssociative') + // phpcs:ignore + ->with("SELECT TABLE_TYPE,BYTES,ROW_COUNT FROM information_schema.tables WHERE TABLE_SCHEMA = 'schema' AND TABLE_NAME = 'stagingTable';") + ->willReturn([ + [ + 'TABLE_TYPE' => 'BASE TABLE', 'BYTES' => 0, 'ROW_COUNT' => 7, + ], + ]); $destination = new SnowflakeTableDefinition( 'schema', @@ -143,9 +153,14 @@ public function testGetCopyCommandWithMoreChunksOfFiles(): void $q2 = sprintf($qTemplate, implode(', ', array_slice($entriesWithoutBucket, 1000, 5))); $conn->expects(self::exactly(2))->method('executeStatement')->withConsecutive([$q1], [$q2]); - $conn->expects(self::once())->method('fetchOne') - ->with('SELECT COUNT(*) AS NumberOfRows FROM "schema"."stagingTable"') - ->willReturn(7); + $conn->expects(self::once())->method('fetchAllAssociative') + // phpcs:ignore + ->with("SELECT TABLE_TYPE,BYTES,ROW_COUNT FROM information_schema.tables WHERE TABLE_SCHEMA = 'schema' AND TABLE_NAME = 'stagingTable';") + ->willReturn([ + [ + 'TABLE_TYPE' => 'BASE TABLE', 'BYTES' => 0, 'ROW_COUNT' => 7, + ], + ]); $destination = new SnowflakeTableDefinition( 'schema', diff --git a/tests/unit/Backend/Snowflake/ToStage/FromTableInsertIntoAdapterTest.php b/tests/unit/Backend/Snowflake/ToStage/FromTableInsertIntoAdapterTest.php index 7fdb986c..8bcab08e 100644 --- a/tests/unit/Backend/Snowflake/ToStage/FromTableInsertIntoAdapterTest.php +++ b/tests/unit/Backend/Snowflake/ToStage/FromTableInsertIntoAdapterTest.php @@ -26,9 +26,14 @@ public function testGetCopyCommands(): void // phpcs:ignore 'INSERT INTO "test_schema"."stagingTable" ("col1", "col2") SELECT "col1", "col2" FROM "test_schema"."test_table"' ); - $conn->expects(self::once())->method('fetchOne') - ->with('SELECT COUNT(*) AS NumberOfRows FROM "test_schema"."stagingTable"') - ->willReturn(10); + $conn->expects(self::once())->method('fetchAllAssociative') + // phpcs:ignore + ->with("SELECT TABLE_TYPE,BYTES,ROW_COUNT FROM information_schema.tables WHERE TABLE_SCHEMA = 'test_schema' AND TABLE_NAME = 'stagingTable';") + ->willReturn([ + [ + 'TABLE_TYPE' => 'BASE TABLE', 'BYTES' => 0, 'ROW_COUNT' => 10, + ], + ]); $destination = new SnowflakeTableDefinition( 'test_schema', @@ -68,9 +73,14 @@ public function testGetCopyCommandsSelectSource(): void ['bind' => 'val'], [1] ); - $conn->expects(self::once())->method('fetchOne') - ->with('SELECT COUNT(*) AS NumberOfRows FROM "test_schema"."stagingTable"') - ->willReturn(10); + $conn->expects(self::once())->method('fetchAllAssociative') + // phpcs:ignore + ->with("SELECT TABLE_TYPE,BYTES,ROW_COUNT FROM information_schema.tables WHERE TABLE_SCHEMA = 'test_schema' AND TABLE_NAME = 'stagingTable';") + ->willReturn([ + [ + 'TABLE_TYPE' => 'BASE TABLE', 'BYTES' => 0, 'ROW_COUNT' => 10, + ], + ]); $destination = new SnowflakeTableDefinition( 'test_schema',