Skip to content

Commit

Permalink
Merge pull request #88 from keboola/zajca-fix-replicate-stage-table
Browse files Browse the repository at this point in the history
fix replicate stage table
  • Loading branch information
zajca authored Dec 7, 2021
2 parents c0fa32c + 8c05d28 commit 74db52f
Show file tree
Hide file tree
Showing 4 changed files with 233 additions and 3 deletions.
34 changes: 33 additions & 1 deletion src/Backend/Synapse/ToFinalTable/SqlBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use Keboola\Datatype\Definition\BaseType;
use Keboola\Datatype\Definition\Synapse;
use Keboola\Db\Import\Exception;
use Keboola\Db\ImportExport\Backend\Synapse\SynapseImportOptions;
use Keboola\Db\ImportExport\Backend\ToStageImporterInterface;
use Keboola\TableBackendUtils\Column\SynapseColumn;
Expand Down Expand Up @@ -539,14 +540,45 @@ private function getCTASColumnsSetSql(
/** @var SynapseColumn $column */
foreach ($sourceTableDefinition->getColumnsDefinitions() as $column) {
$destinationColumn = null;
// case sensitive search
/** @var SynapseColumn $col */
foreach ($destinationTableDefinition->getColumnsDefinitions() as $col) {
if ($col->getColumnName() === $column->getColumnName()) {
$destinationColumn = $col;
break;
}
}
assert($destinationColumn !== null);
if ($destinationColumn === null) {
// case insensitive search
/** @var SynapseColumn $col */
foreach ($destinationTableDefinition->getColumnsDefinitions() as $col) {
if (strtolower($col->getColumnName()) === strtolower($column->getColumnName())) {
if ($destinationColumn !== null) {
throw new Exception(
sprintf(
// phpcs:ignore
'Multiple columns "%s, %s" exists with same name but non exactly match expected "%s".',
$destinationColumn->getColumnName(),
$col->getColumnName(),
$column->getColumnName()
),
Exception::UNKNOWN_ERROR
);
}
$destinationColumn = $col;
}
}
}
if ($destinationColumn === null) {
throw new Exception(
sprintf(
'Columns "%s" can be imported as it was not found between columns "%s" of destination table.',
$column->getColumnName(),
implode(', ', $destinationTableDefinition->getColumnsNames())
),
Exception::UNKNOWN_ERROR
);
}
$columnTypeDefinition = $this->getColumnTypeSqlDefinition($destinationColumn);
if (in_array($column->getColumnName(), $importOptions->getConvertEmptyValuesToNull(), true)) {
$columnsSetSql[] = $this->getCTASColumnSetSQLWithConvertEmptyValues(
Expand Down
17 changes: 15 additions & 2 deletions src/Backend/Synapse/ToStage/StageTableDefinitionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Keboola\Db\ImportExport\Backend\Synapse\Helper\BackendHelper;
use Keboola\TableBackendUtils\Column\ColumnCollection;
use Keboola\TableBackendUtils\Column\SynapseColumn;
use Keboola\TableBackendUtils\Table\Synapse\TableDistributionDefinition;
use Keboola\TableBackendUtils\Table\Synapse\TableIndexDefinition;
use Keboola\TableBackendUtils\Table\SynapseTableDefinition;

Expand Down Expand Up @@ -59,7 +60,7 @@ public static function createStagingTableDefinition(
true,
new ColumnCollection($newDefinitions),
$destination->getPrimaryKeysNames(),
$destination->getTableDistribution(),
self::resolveDistribution($destination->getTableDistribution()),
$indexDefinition ?? new TableIndexDefinition(TableIndexDefinition::TABLE_INDEX_TYPE_HEAP)
);
}
Expand Down Expand Up @@ -105,7 +106,7 @@ public static function createStagingTableDefinitionWithText(
true,
new ColumnCollection($newDefinitions),
$destination->getPrimaryKeysNames(),
$destination->getTableDistribution(),
self::resolveDistribution($destination->getTableDistribution()),
$indexDefinition ?? new TableIndexDefinition(TableIndexDefinition::TABLE_INDEX_TYPE_HEAP)
);
}
Expand All @@ -122,4 +123,16 @@ private static function getClusteredIndexColumns(?TableIndexDefinition $indexDef
}
return null;
}

private static function resolveDistribution(
TableDistributionDefinition $distribution
): TableDistributionDefinition {
//phpcs:ignore
$isReplicateTable = $distribution->getDistributionName() === TableDistributionDefinition::TABLE_DISTRIBUTION_REPLICATE;
if ($isReplicateTable) {
// If table distribution is REPLICATE use ROUND_ROBIN as distribution for temp tables can't be REPLICATE
return new TableDistributionDefinition(TableDistributionDefinition::TABLE_DISTRIBUTION_ROUND_ROBIN);
}
return $distribution;
}
}
101 changes: 101 additions & 0 deletions tests/functional/SynapseNext/SqlBuilderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use DateTime;
use Keboola\Datatype\Definition\Synapse;
use Keboola\Db\Import\Exception;
use Keboola\Db\ImportExport\Backend\Snowflake\Helper\DateTimeHelper;
use Keboola\Db\ImportExport\Backend\Synapse\ToFinalTable\SqlBuilder;
use Keboola\Db\ImportExport\Backend\Synapse\SynapseImportOptions;
Expand Down Expand Up @@ -1023,6 +1024,106 @@ public function testTransaction(): void
$this->connection->exec($sql);
}

/**
* @return \Generator<string, array<mixed>>
*/
public function ctasFunctionsProvider(): \Generator
{
yield 'getCtasDedupCommand' => [
'getCtasDedupCommand',
];
yield 'getCTASInsertAllIntoTargetTableCommand' => [
'getCTASInsertAllIntoTargetTableCommand',
];
}

/**
* @dataProvider ctasFunctionsProvider
*/
public function testCtasCommandFailOnMissingColumns(string $function): void
{
$stage = new SynapseTableDefinition(
self::TEST_SCHEMA,
self::TEST_STAGING_TABLE,
true,
new ColumnCollection([
$this->createGenericColumn('pk1'),
$this->createGenericColumn('pk2'),
]),
[],
new TableDistributionDefinition(TableDistributionDefinition::TABLE_DISTRIBUTION_ROUND_ROBIN),
new TableIndexDefinition(TableIndexDefinition::TABLE_INDEX_TYPE_HEAP)
);

$destination = new SynapseTableDefinition(
'schema',
'tableDest',
false,
new ColumnCollection([
$this->createGenericColumn('pk1'),
$this->createGenericColumn('notExists'),
]),
['pk1'],
$stage->getTableDistribution(),
$stage->getTableIndex()
);

$this->expectException(Exception::class);
$this->expectExceptionMessage(
'Columns "pk2" can be imported as it was not found between columns "pk1, notExists" of destination table.'
);
$this->getBuilder()->$function(
$stage,
$destination,
new SynapseImportOptions(),
'2020-01-01 00:00:00'
);
}

/**
* @dataProvider ctasFunctionsProvider
*/
public function testCtasCommandFailOnMultipleColumns(string $function): void
{
$stage = new SynapseTableDefinition(
self::TEST_SCHEMA,
self::TEST_STAGING_TABLE,
true,
new ColumnCollection([
$this->createGenericColumn('pk1'),
$this->createGenericColumn('pk2'),
]),
[],
new TableDistributionDefinition(TableDistributionDefinition::TABLE_DISTRIBUTION_ROUND_ROBIN),
new TableIndexDefinition(TableIndexDefinition::TABLE_INDEX_TYPE_HEAP)
);

$destination = new SynapseTableDefinition(
'schema',
'tableDest',
false,
new ColumnCollection([
$this->createGenericColumn('pk1'),
$this->createGenericColumn('PK2'),
$this->createGenericColumn('Pk2'),
]),
['pk1'],
$stage->getTableDistribution(),
$stage->getTableIndex()
);

$this->expectException(Exception::class);
$this->expectExceptionMessage(
'Multiple columns "PK2, Pk2" exists with same name but non exactly match expected "pk2".'
);
$this->getBuilder()->$function(
$stage,
$destination,
new SynapseImportOptions(),
'2020-01-01 00:00:00'
);
}

/**
* @dataProvider ctasDedupProvider
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,49 @@ public function testCreateStagingTableDefinition(): void
);
}

public function testCreateStagingTableDefinitionReplicate(): void
{
$definition = new SynapseTableDefinition(
'schema',
'table',
false,
new ColumnCollection([
new SynapseColumn('name', new Synapse(Synapse::TYPE_DATE)),
SynapseColumn::createGenericColumn('id'),
]),
[],
new TableDistributionDefinition(TableDistributionDefinition::TABLE_DISTRIBUTION_REPLICATE),
new TableIndexDefinition(TableIndexDefinition::TABLE_INDEX_TYPE_HEAP)
);
$stageDefinition = StageTableDefinitionFactory::createStagingTableDefinition(
$definition,
['id', 'name', 'notInDef']
);

self::assertSame('schema', $stageDefinition->getSchemaName());
self::assertStringStartsWith('#__temp_csvimport', $stageDefinition->getTableName());
self::assertTrue($stageDefinition->isTemporary());
// order same as source
self::assertSame(['id', 'name', 'notInDef'], $stageDefinition->getColumnsNames());
/** @var SynapseColumn[] $definitions */
$definitions = iterator_to_array($stageDefinition->getColumnsDefinitions());
// id is NVARCHAR
self::assertSame(Synapse::TYPE_NVARCHAR, $definitions[0]->getColumnDefinition()->getType());
// name is DATE
self::assertSame(Synapse::TYPE_DATE, $definitions[1]->getColumnDefinition()->getType());
// notInDef has default NVARCHAR
self::assertSame(Synapse::TYPE_NVARCHAR, $definitions[2]->getColumnDefinition()->getType());
self::assertSame(
TableDistributionDefinition::TABLE_DISTRIBUTION_ROUND_ROBIN,
$stageDefinition->getTableDistribution()->getDistributionName()
);
// index is heap
self::assertSame(
TableIndexDefinition::TABLE_INDEX_TYPE_HEAP,
$stageDefinition->getTableIndex()->getIndexType()
);
}

public function testCreateStagingTableDefinitionWithClusteredColumnstore(): void
{
$definition = new SynapseTableDefinition(
Expand Down Expand Up @@ -196,6 +239,47 @@ public function testCreateStagingTableDefinitionWithText(): void
);
}

public function testCreateStagingTableDefinitionWithTextReplicate(): void
{
$definition = new SynapseTableDefinition(
'schema',
'table',
false,
new ColumnCollection([
new SynapseColumn('name', new Synapse(Synapse::TYPE_DATE)),
SynapseColumn::createGenericColumn('id'),
]),
[],
new TableDistributionDefinition(TableDistributionDefinition::TABLE_DISTRIBUTION_REPLICATE),
new TableIndexDefinition(TableIndexDefinition::TABLE_INDEX_TYPE_HEAP)
);
$stageDefinition = StageTableDefinitionFactory::createStagingTableDefinitionWithText(
$definition,
['id', 'name', 'notInDef']
);

self::assertSame('schema', $stageDefinition->getSchemaName());
self::assertStringStartsWith('#__temp_csvimport', $stageDefinition->getTableName());
self::assertTrue($stageDefinition->isTemporary());
// order same as source
self::assertSame(['id', 'name', 'notInDef'], $stageDefinition->getColumnsNames());
/** @var SynapseColumn[] $definitions */
$definitions = iterator_to_array($stageDefinition->getColumnsDefinitions());
self::assertSame(Synapse::TYPE_NVARCHAR, $definitions[0]->getColumnDefinition()->getType());
// name is in table as NVARCHAR
self::assertSame(Synapse::TYPE_NVARCHAR, $definitions[1]->getColumnDefinition()->getType());
self::assertSame(Synapse::TYPE_NVARCHAR, $definitions[2]->getColumnDefinition()->getType());
self::assertSame(
TableDistributionDefinition::TABLE_DISTRIBUTION_ROUND_ROBIN,
$stageDefinition->getTableDistribution()->getDistributionName()
);
// index is heap
self::assertSame(
TableIndexDefinition::TABLE_INDEX_TYPE_HEAP,
$stageDefinition->getTableIndex()->getIndexType()
);
}

public function testCreateStagingTableDefinitionWithTextWithIndex(): void
{
$definition = new SynapseTableDefinition(
Expand Down

0 comments on commit 74db52f

Please sign in to comment.