diff --git a/.env.dist b/.env.dist
index 3652e8ea..ae59a480 100644
--- a/.env.dist
+++ b/.env.dist
@@ -25,3 +25,8 @@ EXASOL_PASSWORD=exasol
# ID of build used in CI only
BUILD_PREFIX=
+
+TERADATA_HOST=
+TERADATA_USERNAME=
+TERADATA_PASSWORD=
+TERADATA_PORT=
diff --git a/Dockerfile b/Dockerfile
index d4d85c11..83090e76 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -1,3 +1,9 @@
+FROM quay.io/keboola/aws-cli
+ARG AWS_SECRET_ACCESS_KEY
+ARG AWS_ACCESS_KEY_ID
+RUN /usr/bin/aws s3 cp s3://keboola-drivers/teradata/tdodbc1710-17.10.00.08-1.x86_64.deb /tmp/teradata/tdodbc.deb
+RUN /usr/bin/aws s3 cp s3://keboola-drivers/teradata/utils/TeradataToolsAndUtilitiesBase__ubuntu_x8664.17.00.34.00.tar.gz /tmp/teradata/tdutils.tar.gz
+
FROM php:7.4-cli
ARG COMPOSER_FLAGS="--prefer-dist --no-interaction"
@@ -89,6 +95,31 @@ RUN set -ex; \
echo "\n[exasol]\nDriver=/opt/exasol/libexaodbc-uo2214lv2.so\n" >> /etc/odbcinst.ini;\
rm -rf /tmp/exasol;
+# Teradata ODBC
+COPY --from=0 /tmp/teradata/tdodbc.deb /tmp/teradata/tdodbc.deb
+COPY docker/teradata/odbc.ini /tmp/teradata/odbc_td.ini
+COPY docker/teradata/odbcinst.ini /tmp/teradata/odbcinst_td.ini
+
+RUN dpkg -i /tmp/teradata/tdodbc.deb \
+ && cat /tmp/teradata/odbc_td.ini >> /etc/odbc.ini \
+ && cat /tmp/teradata/odbcinst_td.ini >> /etc/odbcinst.ini \
+ && rm -r /tmp/teradata \
+ && docker-php-ext-configure pdo_odbc --with-pdo-odbc=unixODBC,/usr \
+ && docker-php-ext-install pdo_odbc
+
+ENV ODBCHOME = /opt/teradata/client/ODBC_64/
+ENV ODBCINI = /opt/teradata/client/ODBC_64/odbc.ini
+ENV ODBCINST = /opt/teradata/client/ODBC_64/odbcinst.ini
+ENV LD_LIBRARY_PATH = /opt/teradata/client/ODBC_64/lib
+
+# Teradata Utils
+COPY --from=0 /tmp/teradata/tdutils.tar.gz /tmp/teradata/tdutils.tar.gz
+RUN cd /tmp/teradata \
+ && tar -xvaf tdutils.tar.gz \
+ && sh /tmp/teradata/TeradataToolsAndUtilitiesBase/.setup.sh tptbase s3axsmod \
+ && rm -rf /var/lib/apt/lists/* \
+ && rm -rf /tmp/teradata
+
#php odbc
RUN docker-php-ext-configure pdo_odbc --with-pdo-odbc=unixODBC,/usr \
&& docker-php-ext-install pdo_odbc
diff --git a/README.md b/README.md
index 12869eae..0ae94b92 100644
--- a/README.md
+++ b/README.md
@@ -18,6 +18,13 @@
## Development
+### Docker
+
+Prepare `.env` (copy of `.env.dist`) and set up AWS keys which has access to `keboola-drivers` bucket in order to build this image. Then run `docker-compose --env-file=.env.local build`
+
+The AWS credentials have to also have access to bucket specified in `AWS_S3_BUCKET`. This bucket has to contain testing data. Run `docker-compose run --rm dev composer loadS3` to load them up.
+
+
### Preparation
#### Azure
@@ -96,6 +103,15 @@ EXASOL_PASSWORD=
Obtain host (with port), username and password from Exasol SaaS for your testing DB and fill it in `.env` as desribed above. Make sure, that your account has enabled network for your IP.
+#### Teradata
+```bash
+TERADATA_HOST=
+TERADATA_USERNAME=
+TERADATA_PASSWORD=JirkaTdPassword+
+TERADATA_PORT=
+```
+
+
### Tests
Run tests with following command.
diff --git a/azure-pipelines.yml b/azure-pipelines.yml
index 9a470d5b..fdd75d28 100644
--- a/azure-pipelines.yml
+++ b/azure-pipelines.yml
@@ -23,7 +23,7 @@ stages:
displayName: Build and test
jobs:
- job: Build
- timeoutInMinutes: 100
+ timeoutInMinutes: 200
displayName: Build
pool:
vmImage: 'ubuntu-latest'
@@ -71,6 +71,9 @@ stages:
- script: |
docker-compose build --pull production
displayName: 'Build project images'
+ env:
+ AWS_ACCESS_KEY_ID: $(DRIVERS_AWS_ACCESS_KEY_ID)
+ AWS_SECRET_ACCESS_KEY: $(DRIVERS_AWS_SECRET_ACCESS_KEY)
- script: |
docker-compose run production php -v
parallel -j12 --linebuffer docker-compose run production composer ::: \
@@ -137,7 +140,8 @@ stages:
tests-synapse-heap4000temp-hash \
tests-synapse-heap4000temp-optimized \
tests-synapse-heap4000temp-optimized-hash \
- tests-synapse-next
+ tests-synapse-next \
+ tests-teradata
PARALLEL_EXIT_CODE=$?
cat /tmp/parallel-joblog
sleep 1
@@ -166,6 +170,10 @@ stages:
EXASOL_HOST: $(EXASOL_CLUSTER_DNS)
EXASOL_USERNAME: devel
EXASOL_PASSWORD: $(EXA_SAAS_TOKEN)
+ TERADATA_HOST: $(TERADATA_HOST)
+ TERADATA_PASSWORD: $(TERADATA_PASSWORD)
+ TERADATA_PORT: $(TERADATA_PORT)
+ TERADATA_USERNAME: $(TERADATA_USERNAME)
- script: |
docker-compose stop
php ./provisioning/cli.php app:delete:synapse \
diff --git a/composer.json b/composer.json
index 87937230..f35d07aa 100644
--- a/composer.json
+++ b/composer.json
@@ -10,7 +10,9 @@
"keboola/php-csv-db-import": "^5.0",
"keboola/php-file-storage-utils": ">=0.2",
"keboola/table-backend-utils": "^0.18",
- "microsoft/azure-storage-blob": "^1.4"
+ "keboola/php-temp": "^1.0",
+ "microsoft/azure-storage-blob": "^1.4",
+ "symfony/process": "^4.4|^5.0"
},
"require-dev": {
"phpstan/phpstan": "^0.12.54",
@@ -18,9 +20,7 @@
"keboola/coding-standard": "^9.0",
"php-parallel-lint/php-parallel-lint": "^1.2",
"phpstan/extension-installer": "^1.0",
- "keboola/datadir-tests": "^2.0",
- "keboola/php-temp": "^1.0",
- "symfony/process": "^4.4|^5.0"
+ "keboola/datadir-tests": "^2.0"
},
"autoload": {
"psr-4": {
@@ -49,6 +49,7 @@
"tests-synapse-clusterdindextemp": "SUITE=tests-synapse-clusterdindextemp CREDENTIALS_IMPORT_TYPE=SAS CREDENTIALS_EXPORT_TYPE=MASTER_KEY TEMP_TABLE_TYPE=CLUSTERED_INDEX DEDUP_TYPE=TMP_TABLE phpunit --colors=always --testsuite tests-synapse-clusterdindextemp",
"tests-synapse-mi": "SUITE=tests-synapse-mi CREDENTIALS_IMPORT_TYPE=MANAGED_IDENTITY CREDENTIALS_EXPORT_TYPE=MANAGED_IDENTITY TEMP_TABLE_TYPE=HEAP DEDUP_TYPE=TMP_TABLE phpunit --colors=always --testsuite synapse-mi",
"tests-exasol": "SUITE=tests-exasol STORAGE_TYPE=S3 phpunit --colors=always --testsuite exasol",
+ "tests-teradata": "SUITE=tests-teradata STORAGE_TYPE=S3 phpunit --colors=always --testsuite tests-teradata",
"tests-functional": [
"@tests-snowflake-abs",
"@tests-snowflake-s3",
@@ -58,7 +59,8 @@
"@tests-synapse-clusterdindextemp",
"@tests-synapse-heap4000temp",
"@tests-synapse-heap4000temp-optimized",
- "@tests-exasol"
+ "@tests-exasol",
+ "@tests-teradata"
],
"tests": [
"@tests-unit",
diff --git a/docker-compose.yml b/docker-compose.yml
index 153c5249..1870e4b3 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -1,7 +1,11 @@
version: '3'
services:
production: &prod
- build: .
+ build:
+ context: .
+ args:
+ - AWS_ACCESS_KEY_ID
+ - AWS_SECRET_ACCESS_KEY
environment:
- ABS_ACCOUNT_NAME
- ABS_ACCOUNT_KEY
@@ -24,6 +28,10 @@ services:
- EXASOL_USERNAME
- EXASOL_PASSWORD
- BUILD_PREFIX
+ - TERADATA_HOST
+ - TERADATA_USERNAME
+ - TERADATA_PASSWORD
+ - TERADATA_PORT
dev: &dev
<<: *prod
image: keboola/php-db-import-export
diff --git a/docker/teradata/odbc.ini b/docker/teradata/odbc.ini
new file mode 100644
index 00000000..151d655f
--- /dev/null
+++ b/docker/teradata/odbc.ini
@@ -0,0 +1,14 @@
+[ODBC]
+InstallDir=/opt/teradata/client/ODBC_64
+Trace=0
+TraceDll=/opt/teradata/client/ODBC_64/lib/odbctrac.so
+TraceFile=/usr/odbcusr/trace.log
+TraceAutoStop=0
+
+[Teradata]
+Driver = /opt/teradata/client/ODBC_64/lib/tdataodbc_sb64.so
+UsageCount = 2
+APILevel = CORE
+ConnectFunctions = YYY
+DriverODBCVer = 3.51
+SQLLevel = 1
diff --git a/docker/teradata/odbcinst.ini b/docker/teradata/odbcinst.ini
new file mode 100644
index 00000000..af1a3dc2
--- /dev/null
+++ b/docker/teradata/odbcinst.ini
@@ -0,0 +1,9 @@
+[ODBC DRIVERS]
+Teradata=Installed
+
+[Teradata]
+Driver=/opt/teradata/client/ODBC_64/lib/tdataodbc_sb64.so
+APILevel=CORE
+ConnectFunctions=YYY
+DriverODBCVer=3.51
+SQLLevel=1
diff --git a/phpunit.xml.dist b/phpunit.xml.dist
index d737710a..a20d82d0 100644
--- a/phpunit.xml.dist
+++ b/phpunit.xml.dist
@@ -61,6 +61,10 @@
tests/functional/Synapse/SqlCommandBuilderTest.php
+
+ tests/functional/Teradata
+
+
tests/unit
diff --git a/src/Backend/Teradata/Helper/BackendHelper.php b/src/Backend/Teradata/Helper/BackendHelper.php
new file mode 100644
index 00000000..ae708405
--- /dev/null
+++ b/src/Backend/Teradata/Helper/BackendHelper.php
@@ -0,0 +1,67 @@
+ file0*
+ * TODO
+ * - has to fix edgecases a) [1_file.csv, 2_file.csv] b) not all the files matched in WC have to be on s3
+ * @param SourceFile $source
+ * @return string
+ * @throws \Keboola\Db\Import\Exception
+ */
+ public static function getMask(SourceFile $source): string
+ {
+ $entries = $source->getManifestEntries();
+ if (count($entries) === 0) {
+ // no entries -> no data to load
+ return '';
+ }
+ // SourceDirectory returns fileName as directory/file.csv but SourceFile returns s3://bucket/directory/file.csv
+ $toRemove = $source->getS3Prefix() . '/';
+ $entriesAsArrays = [];
+ $min = 99999;
+ $minIndex = 0;
+ foreach ($entries as $i => $entry) {
+ $entry = str_replace($toRemove, '', $entry);
+ $asArray = str_split($entry);
+ $entriesAsArrays[] = $asArray;
+ $thisSize = count($asArray);
+ if ($thisSize < $min) {
+ $min = $thisSize;
+ $minIndex = $i;
+ }
+ }
+ $out = [];
+
+ foreach ($entriesAsArrays[$minIndex] as $index => $letter) {
+ $match = true;
+
+ foreach ($entriesAsArrays as $fileName) {
+ if ($fileName[$index] !== $letter) {
+ $match = false;
+ break;
+ }
+ }
+ $out[$index] = $match ? $letter : '*';
+ }
+ return implode('', $out);
+ }
+}
diff --git a/src/Backend/Teradata/TeradataException.php b/src/Backend/Teradata/TeradataException.php
new file mode 100644
index 00000000..9057c5e4
--- /dev/null
+++ b/src/Backend/Teradata/TeradataException.php
@@ -0,0 +1,17 @@
+teradataHost = $teradataHost;
+ $this->teradataUser = $teradataUser;
+ $this->teradataPassword = $teradataPassword;
+ $this->teradataPort = $teradataPort;
+ }
+
+ public function getTeradataHost(): string
+ {
+ return $this->teradataHost;
+ }
+
+ public function getTeradataUser(): string
+ {
+ return $this->teradataUser;
+ }
+
+ public function getTeradataPassword(): string
+ {
+ return $this->teradataPassword;
+ }
+
+ public function getTeradataPort(): int
+ {
+ return $this->teradataPort;
+ }
+}
diff --git a/src/Backend/Teradata/ToFinalTable/FullImporter.php b/src/Backend/Teradata/ToFinalTable/FullImporter.php
new file mode 100644
index 00000000..0072ceba
--- /dev/null
+++ b/src/Backend/Teradata/ToFinalTable/FullImporter.php
@@ -0,0 +1,128 @@
+connection = $connection;
+ $this->sqlBuilder = new SqlBuilder();
+ }
+
+ private function doLoadFullWithoutDedup(
+ TeradataTableDefinition $stagingTableDefinition,
+ TeradataTableDefinition $destinationTableDefinition,
+ TeradataImportOptions $options,
+ ImportState $state
+ ): void {
+ // truncate destination table
+ $this->connection->executeStatement(
+ $this->sqlBuilder->getTruncateTableWithDeleteCommand(
+ $destinationTableDefinition->getSchemaName(),
+ $destinationTableDefinition->getTableName()
+ )
+ );
+ $state->startTimer(self::TIMER_COPY_TO_TARGET);
+
+ // move data with INSERT INTO
+ $sql = $this->sqlBuilder->getInsertAllIntoTargetTableCommand(
+ $stagingTableDefinition,
+ $destinationTableDefinition,
+ $options,
+ DateTimeHelper::getNowFormatted()
+ );
+ $this->connection->executeStatement(
+ $sql
+ );
+ $state->stopTimer(self::TIMER_COPY_TO_TARGET);
+ }
+
+ public function importToTable(
+ TableDefinitionInterface $stagingTableDefinition,
+ TableDefinitionInterface $destinationTableDefinition,
+ ImportOptionsInterface $options,
+ ImportState $state
+ ): Result {
+ assert($stagingTableDefinition instanceof TeradataTableDefinition);
+ assert($destinationTableDefinition instanceof TeradataTableDefinition);
+ assert($options instanceof TeradataImportOptions);
+
+ /** @var TeradataTableDefinition $destinationTableDefinition */
+ try {
+ //import files to staging table
+ if (!empty($destinationTableDefinition->getPrimaryKeysNames())) {
+ // dedup
+ throw new \Exception('not implemented yet');
+ } else {
+ $this->doLoadFullWithoutDedup(
+ $stagingTableDefinition,
+ $destinationTableDefinition,
+ $options,
+ $state
+ );
+ }
+ } catch (Exception $e) {
+ throw TeradataException::covertException($e);
+ } finally {
+ $tmpTableName = $destinationTableDefinition->getTableName() . self::OPTIMIZED_LOAD_TMP_TABLE_SUFFIX;
+ $renameTableName = $destinationTableDefinition->getTableName() . self::OPTIMIZED_LOAD_RENAME_TABLE_SUFFIX;
+
+ if ($this->tableExists($destinationTableDefinition->getSchemaName(), $renameTableName)) {
+ // drop optimized load tmp table if exists
+ $this->connection->executeStatement(
+ $this->sqlBuilder->getDropTableUnsafe(
+ $destinationTableDefinition->getSchemaName(),
+ $tmpTableName
+ )
+ );
+ }
+
+ if ($this->tableExists($destinationTableDefinition->getSchemaName(), $renameTableName)) {
+ // drop optimized load rename table if exists
+ $this->connection->executeStatement(
+ $this->sqlBuilder->getDropTableUnsafe(
+ $destinationTableDefinition->getSchemaName(),
+ $renameTableName
+ )
+ );
+ }
+ }
+
+ $state->setImportedColumns($stagingTableDefinition->getColumnsNames());
+
+ return $state->getResult();
+ }
+
+ protected function tableExists(string $dbName, string $tableName): bool
+ {
+ $data = $this->connection->fetchOne($this->sqlBuilder->getTableExistsCommand($dbName, $tableName));
+ return ((int) $data) > 0;
+ }
+}
diff --git a/src/Backend/Teradata/ToFinalTable/SqlBuilder.php b/src/Backend/Teradata/ToFinalTable/SqlBuilder.php
new file mode 100644
index 00000000..42fd7e79
--- /dev/null
+++ b/src/Backend/Teradata/ToFinalTable/SqlBuilder.php
@@ -0,0 +1,154 @@
+getSchemaName()),
+ TeradataQuote::quoteSingleIdentifier($destinationTableDefinition->getTableName())
+ );
+
+ $columnsToInsert = $sourceTableDefinition->getColumnsNames();
+ $useTimestamp = !in_array(ToStageImporterInterface::TIMESTAMP_COLUMN_NAME, $columnsToInsert, true)
+ && $importOptions->useTimestamp();
+
+ if ($useTimestamp) {
+ $columnsToInsert = array_merge(
+ $sourceTableDefinition->getColumnsNames(),
+ [ToStageImporterInterface::TIMESTAMP_COLUMN_NAME]
+ );
+ }
+
+ $columnsSetSql = [];
+
+ /** @var TeradataColumn $columnDefinition */
+ foreach ($sourceTableDefinition->getColumnsDefinitions() as $columnDefinition) {
+ if (in_array($columnDefinition->getColumnName(), $importOptions->getConvertEmptyValuesToNull(), true)) {
+ // use nullif only for string base type
+ if ($columnDefinition->getColumnDefinition()->getBasetype() === BaseType::STRING) {
+ $columnsSetSql[] = sprintf(
+ 'NULLIF(%s, \'\')',
+ TeradataQuote::quoteSingleIdentifier($columnDefinition->getColumnName())
+ );
+ } else {
+ $columnsSetSql[] = TeradataQuote::quoteSingleIdentifier($columnDefinition->getColumnName());
+ }
+ } elseif ($columnDefinition->getColumnDefinition()->getBasetype() === BaseType::STRING) {
+ $columnsSetSql[] = sprintf(
+ 'CAST(COALESCE(%s, \'\') as %s) AS %s',
+ TeradataQuote::quoteSingleIdentifier($columnDefinition->getColumnName()),
+ $columnDefinition->getColumnDefinition()->buildTypeWithLength(),
+ TeradataQuote::quoteSingleIdentifier($columnDefinition->getColumnName())
+ );
+ } else {
+ // on columns other than string dont use COALESCE, use direct cast
+ // this will fail if the column is not null, but this is expected
+ $columnsSetSql[] = sprintf(
+ 'CAST(%s as %s) AS %s',
+ TeradataQuote::quoteSingleIdentifier($columnDefinition->getColumnName()),
+ $columnDefinition->getColumnDefinition()->buildTypeWithLength(),
+ TeradataQuote::quoteSingleIdentifier($columnDefinition->getColumnName())
+ );
+ }
+ }
+
+ if ($useTimestamp) {
+ $columnsSetSql[] = TeradataQuote::quote($timestamp);
+ }
+
+ return sprintf(
+ 'INSERT INTO %s (%s) SELECT %s FROM %s.%s AS %s',
+ $destinationTable,
+ $this->getColumnsString($columnsToInsert),
+ implode(',', $columnsSetSql),
+ TeradataQuote::quoteSingleIdentifier($sourceTableDefinition->getSchemaName()),
+ TeradataQuote::quoteSingleIdentifier($sourceTableDefinition->getTableName()),
+ TeradataQuote::quoteSingleIdentifier(self::SRC_ALIAS)
+ );
+ }
+
+ public function getTruncateTableWithDeleteCommand(
+ string $schema,
+ string $tableName
+ ): string {
+ return sprintf(
+ 'DELETE FROM %s.%s',
+ TeradataQuote::quoteSingleIdentifier($schema),
+ TeradataQuote::quoteSingleIdentifier($tableName)
+ );
+ }
+
+ /**
+ * @param string[] $columns
+ */
+ public function getColumnsString(
+ array $columns,
+ string $delimiter = ', ',
+ ?string $tableAlias = null
+ ): string {
+ return implode($delimiter, array_map(static function ($columns) use (
+ $tableAlias
+ ) {
+ $alias = $tableAlias === null ? '' : $tableAlias . '.';
+ return $alias . TeradataQuote::quoteSingleIdentifier($columns);
+ }, $columns));
+ }
+
+ public function getDeleteOldItemsCommand(
+ TeradataTableDefinition $stagingTableDefinition,
+ TeradataTableDefinition $destinationTableDefinition
+ ): void {
+ throw new \Exception('not implemented yet');
+ }
+}
diff --git a/src/Backend/Teradata/ToStage/Exception/FailedTPTLoadException.php b/src/Backend/Teradata/ToStage/Exception/FailedTPTLoadException.php
new file mode 100644
index 00000000..54f6c287
--- /dev/null
+++ b/src/Backend/Teradata/ToStage/Exception/FailedTPTLoadException.php
@@ -0,0 +1,105 @@
+stdErr = $stdErr;
+ $this->stdOut = $stdOut;
+ $this->logContent = $logContent;
+ $this->exitCode = $exitCode;
+ $this->logTableContent = $logTableContent;
+ $this->errTableContent = $errTableContent;
+ $this->errTable2Content = $errTable2Content;
+ }
+
+ public function getStdErr(): string
+ {
+ return $this->stdErr;
+ }
+
+ public function getStdOut(): string
+ {
+ return $this->stdOut;
+ }
+
+ public function getLogContent(): ?string
+ {
+ return $this->logContent;
+ }
+
+ public function getExitCode(): ?int
+ {
+ return $this->exitCode;
+ }
+
+ /**
+ * @return array|string[]|null
+ */
+ public function getLogTableContent(): ?array
+ {
+ return $this->logTableContent;
+ }
+
+ /**
+ * @return array|string[]|null
+ */
+ public function getErrTableContent(): ?array
+ {
+ return $this->errTableContent;
+ }
+
+ /**
+ * @return array|string[]|null
+ */
+ public function getErrTable2Content(): ?array
+ {
+ return $this->errTable2Content;
+ }
+}
diff --git a/src/Backend/Teradata/ToStage/FromS3TPTAdapter.php b/src/Backend/Teradata/ToStage/FromS3TPTAdapter.php
new file mode 100644
index 00000000..832a026f
--- /dev/null
+++ b/src/Backend/Teradata/ToStage/FromS3TPTAdapter.php
@@ -0,0 +1,304 @@
+connection = $connection;
+ }
+
+ /**
+ * @param Storage\S3\SourceFile $source
+ * @param TeradataTableDefinition $destination
+ * @param ImportOptions $importOptions
+ */
+ public function runCopyCommand(
+ Storage\SourceInterface $source,
+ TableDefinitionInterface $destination,
+ ImportOptionsInterface $importOptions
+ ): int {
+ assert($source instanceof Storage\S3\SourceFile);
+ assert($destination instanceof TeradataTableDefinition);
+ assert($importOptions instanceof TeradataImportOptions);
+
+ // empty manifest. TPT cannot import empty data
+ if ($source->isSliced() && count($source->getManifestEntries()) === 0) {
+ return 0;
+ }
+
+ /**
+ * @var Temp $temp
+ */
+ [
+ $temp,
+ $logTable,
+ $errTable,
+ $errTable2,
+ $processCmd,
+ ] = $this->generateTPTScript($source, $destination, $importOptions);
+
+ $process = new Process(
+ $processCmd,
+ null,
+ [
+ 'AWS_ACCESS_KEY_ID' => $source->getKey(),
+ 'AWS_SECRET_ACCESS_KEY' => $source->getSecret(),
+ ]
+ );
+ $process->start();
+ // check end of process
+ $process->wait();
+
+ // debug stuff
+// foreach ($process as $type => $data) {
+// if ($process::OUT === $type) {
+// echo "\nRead from stdout: " . $data;
+// } else { // $process::ERR === $type
+// echo "\nRead from stderr: " . $data;
+// }
+// }
+ $qb = new SqlBuilder();
+ $isTableExists = function (string $databaseName, string $tableName) use ($qb) {
+ return (bool) $this->connection->fetchOne($qb->getTableExistsCommand($databaseName, $tableName));
+ };
+
+ $logContent = null;
+ if ($isTableExists($destination->getSchemaName(), $logTable)) {
+ $logContent = $this->connection->fetchAllAssociative(
+ sprintf(
+ 'SELECT * FROM %s.%s',
+ TeradataQuote::quoteSingleIdentifier($destination->getSchemaName()),
+ TeradataQuote::quoteSingleIdentifier($logTable)
+ )
+ );
+ $this->connection->executeStatement($qb->getDropTableUnsafe($destination->getSchemaName(), $logTable));
+ }
+ $errContent = null;
+ if ($isTableExists($destination->getSchemaName(), $errTable)) {
+ $errContent = $this->connection->fetchAllAssociative(sprintf(
+ 'SELECT * FROM %s.%s',
+ TeradataQuote::quoteSingleIdentifier($destination->getSchemaName()),
+ TeradataQuote::quoteSingleIdentifier($errTable)
+ ));
+ $this->connection->executeStatement($qb->getDropTableUnsafe($destination->getSchemaName(), $errTable));
+ }
+ $err2Content = null;
+ if ($isTableExists($destination->getSchemaName(), $errTable2)) {
+ $err2Content = $this->connection->fetchAllAssociative(sprintf(
+ 'SELECT * FROM %s.%s',
+ TeradataQuote::quoteSingleIdentifier($destination->getSchemaName()),
+ TeradataQuote::quoteSingleIdentifier($errTable2)
+ ));
+ $this->connection->executeStatement($qb->getDropTableUnsafe($destination->getSchemaName(), $errTable2));
+ }
+ // TODO find the way how to get this out
+
+ if ($process->getExitCode() !== 0 || $errContent || $err2Content) {
+ $qb = new TeradataTableQueryBuilder();
+ // drop destination table it's not usable
+ $this->connection->executeStatement($qb->getDropTableCommand(
+ $destination->getSchemaName(),
+ $destination->getTableName()
+ ));
+
+ throw new FailedTPTLoadException(
+ $process->getErrorOutput(),
+ $process->getOutput(),
+ $process->getExitCode(),
+ $this->getLogData($temp),
+ $logContent,
+ $errContent,
+ $err2Content
+ );
+ }
+
+ $ref = new TeradataTableReflection(
+ $this->connection,
+ $destination->getSchemaName(),
+ $destination->getTableName()
+ );
+
+ return $ref->getRowsCount();
+ }
+
+ private function getLogData(Temp $temp): string
+ {
+ if (file_exists($temp->getTmpFolder() . '/import-1.out')) {
+ return file_get_contents($temp->getTmpFolder() . '/import-1.out') ?: 'unable to get error';
+ }
+
+ return 'unable to get error';
+ }
+
+ /**
+ * @return array{0: Temp, 1:string, 2:string, 3:string, 4: string[]}
+ */
+ private function generateTPTScript(
+ Storage\S3\SourceFile $source,
+ TeradataTableDefinition $destination,
+ TeradataImportOptions $importOptions
+ ): array {
+ $temp = new Temp();
+ $temp->initRunFolder();
+ $folder = $temp->getTmpFolder();
+ $target = sprintf(
+ '%s.%s',
+ TeradataQuote::quoteSingleIdentifier($destination->getSchemaName()),
+ TeradataQuote::quoteSingleIdentifier($destination->getTableName()),
+ );
+
+ if ($source->isSliced()) {
+ $mask = BackendHelper::getMask($source);
+ $path = RelativePath::createFromRootAndPath(new S3Provider(), $source->getBucket(), $mask);
+ $moduleStr = sprintf(
+ // phpcs:ignore
+ 'AccessModuleInitStr = \'S3Region="%s" S3Bucket="%s" S3Prefix="%s" S3Object="%s" S3SinglePartFile=True\'',
+ $source->getRegion(),
+ $path->getRoot(),
+ $path->getPathWithoutRoot() . '/',
+ $path->getFileName()
+ );
+ } else {
+ $moduleStr = sprintf(
+ // phpcs:ignore
+ 'AccessModuleInitStr = \'S3Region="%s" S3Bucket="%s" S3Prefix="%s" S3Object="%s" S3SinglePartFile=True\'',
+ $source->getRegion(),
+ $source->getBucket(),
+ $source->getPrefix(),
+ $source->getFileName()
+ );
+ }
+ $tptScript = <<getTeradataHost();
+ $user = $importOptions->getTeradataUser();
+ $pass = $importOptions->getTeradataPassword();
+ $csvOptions = $source->getCsvOptions();
+ $delimiter = $csvOptions->getDelimiter();
+ if ($delimiter === "\t") {
+ $delimiter = 'TAB';
+ }
+ $enclosure = $csvOptions->getEnclosure();
+ if ($enclosure === '\'') {
+ $enclosure = '\\\'';
+ }
+ $escapedBy = $csvOptions->getEscapedBy();
+ if ($escapedBy !== '') {
+ $escapedBy = sprintf(',FileReaderEscapeQuoteDelimiter = \'%s\'', $escapedBy);
+ }
+ $ignoredLines = $importOptions->getNumberOfIgnoredLines();
+
+ $quotedDestination = TeradataQuote::quoteSingleIdentifier($destination->getSchemaName());
+ $tablesPrefix = BackendHelper::generateTempTableName();
+ $logTable = $tablesPrefix . '_log';
+ $logTableQuoted = $quotedDestination . '.' . TeradataQuote::quoteSingleIdentifier($logTable);
+ $errTable1 = $tablesPrefix . '_e1';
+ $errTableQuoted = $quotedDestination . '.' . TeradataQuote::quoteSingleIdentifier($errTable1);
+ $errTable2 = $tablesPrefix . '_e2';
+ $errTable2Quoted = $quotedDestination . '.' . TeradataQuote::quoteSingleIdentifier($errTable2);
+
+ $jobVariableFile = <<connection = $connection;
+ }
+
+ public function runCopyCommand(
+ Storage\SourceInterface $source,
+ TableDefinitionInterface $destination,
+ ImportOptionsInterface $importOptions
+ ): int {
+ assert($source instanceof SelectSource || $source instanceof Table);
+ assert($destination instanceof TeradataTableDefinition);
+ assert($importOptions instanceof TeradataImportOptions);
+
+ $quotedColumns = array_map(static function ($column) {
+ return TeradataQuote::quoteSingleIdentifier($column);
+ }, $destination->getColumnsNames());
+
+ $sql = sprintf(
+ 'INSERT INTO %s.%s (%s) %s',
+ TeradataQuote::quoteSingleIdentifier($destination->getSchemaName()),
+ TeradataQuote::quoteSingleIdentifier($destination->getTableName()),
+ implode(', ', $quotedColumns),
+ $source->getFromStatement()
+ );
+
+ if ($source instanceof SelectSource) {
+ $this->connection->executeQuery($sql, $source->getQueryBindings(), $source->getDataTypes());
+ } else {
+ $this->connection->executeStatement($sql);
+ }
+
+ $ref = new TeradataTableReflection(
+ $this->connection,
+ $destination->getSchemaName(),
+ $destination->getTableName()
+ );
+
+ return $ref->getRowsCount();
+ }
+}
diff --git a/src/Backend/Teradata/ToStage/StageTableDefinitionFactory.php b/src/Backend/Teradata/ToStage/StageTableDefinitionFactory.php
new file mode 100644
index 00000000..9dae73fb
--- /dev/null
+++ b/src/Backend/Teradata/ToStage/StageTableDefinitionFactory.php
@@ -0,0 +1,92 @@
+getColumnsDefinitions() as $definition) {
+ if ($definition->getColumnName() === $columnName) {
+ // if column exists in destination set destination type
+ $newDefinitions[] = new TeradataColumn(
+ $columnName,
+ new Teradata(
+ $definition->getColumnDefinition()->getType(),
+ [
+ 'length' => $definition->getColumnDefinition()->getLength(),
+ 'nullable' => true,
+ 'default' => $definition->getColumnDefinition()->getDefault(),
+ ]
+ )
+ );
+ continue 2;
+ }
+ }
+ // if column doesn't exists in destination set default type
+ $newDefinitions[] = self::createVarcharColumn($columnName);
+ }
+
+ return new TeradataTableDefinition(
+ $destination->getSchemaName(),
+ BackendHelper::generateTempTableName(),
+ true,
+ new ColumnCollection($newDefinitions),
+ $destination->getPrimaryKeysNames()
+ );
+ }
+
+ private static function createVarcharColumn(string $columnName): TeradataColumn
+ {
+ return new TeradataColumn(
+ $columnName,
+ new Teradata(
+ Teradata::TYPE_VARCHAR,
+ [
+ 'length' => 32000,
+ 'nullable' => true,
+ ]
+ )
+ );
+ }
+
+ /**
+ * @param string[] $sourceColumnsNames
+ */
+ public static function createStagingTableDefinitionWithText(
+ TeradataTableDefinition $destination,
+ array $sourceColumnsNames
+ ): TeradataTableDefinition {
+ $newDefinitions = [];
+ foreach ($sourceColumnsNames as $columnName) {
+ $newDefinitions[] = self::createVarcharColumn($columnName);
+ }
+
+ return new TeradataTableDefinition(
+ $destination->getSchemaName(),
+ BackendHelper::generateTempTableName(),
+ true,
+ new ColumnCollection($newDefinitions),
+ $destination->getPrimaryKeysNames()
+ );
+ }
+}
diff --git a/src/Backend/Teradata/ToStage/ToStageImporter.php b/src/Backend/Teradata/ToStage/ToStageImporter.php
new file mode 100644
index 00000000..c5962602
--- /dev/null
+++ b/src/Backend/Teradata/ToStage/ToStageImporter.php
@@ -0,0 +1,76 @@
+connection = $connection;
+ }
+
+ public function importToStagingTable(
+ Storage\SourceInterface $source,
+ TableDefinitionInterface $destinationDefinition,
+ ImportOptionsInterface $options
+ ): ImportState {
+ assert($destinationDefinition instanceof TeradataTableDefinition);
+ $state = new ImportState($destinationDefinition->getTableName());
+
+ $adapter = $this->getAdapter($source);
+
+ $state->startTimer(self::TIMER_TABLE_IMPORT);
+ try {
+ $state->addImportedRowsCount(
+ $adapter->runCopyCommand(
+ $source,
+ $destinationDefinition,
+ $options
+ )
+ );
+ } catch (\Doctrine\DBAL\Exception $e) {
+ throw SynapseException::covertException($e);
+ }
+ $state->stopTimer(self::TIMER_TABLE_IMPORT);
+
+ return $state;
+ }
+
+ private function getAdapter(Storage\SourceInterface $source): CopyAdapterInterface
+ {
+ switch (true) {
+ case $source instanceof Storage\S3\SourceFile:
+ $adapter = new FromS3TPTAdapter($this->connection);
+ break;
+ case $source instanceof Storage\SqlSourceInterface:
+ $adapter = new FromTableInsertIntoAdapter($this->connection);
+ break;
+ default:
+ throw new LogicException(
+ sprintf(
+ 'No suitable adapter found for source: "%s".',
+ get_class($source)
+ )
+ );
+ }
+ return $adapter;
+ }
+}
diff --git a/src/Exception/Exception.php b/src/Exception/Exception.php
new file mode 100644
index 00000000..20411319
--- /dev/null
+++ b/src/Exception/Exception.php
@@ -0,0 +1,9 @@
+filePath;
}
+ public function getBucket(): string
+ {
+ return $this->bucket;
+ }
+
public function getBucketURL(): string
{
return sprintf('https://%s.s3.%s.amazonaws.com', $this->bucket, $this->region);
diff --git a/src/Storage/S3/SourceDirectory.php b/src/Storage/S3/SourceDirectory.php
index f3188880..d77ec56e 100644
--- a/src/Storage/S3/SourceDirectory.php
+++ b/src/Storage/S3/SourceDirectory.php
@@ -7,16 +7,13 @@
class SourceDirectory extends SourceFile
{
/**
+ * returns all files in directory
* @return string[]
*/
public function getManifestEntries(): array
{
$client = $this->getClient();
- $prefix = $this->filePath;
- if (substr($prefix, -1) !== '/') {
- // add trailing slash if not set to list only blobs in folder
- $prefix .= '/';
- }
+ $prefix = $this->getPrefix();
$response = $client->listObjectsV2([
'Bucket' => $this->bucket,
'Delimiter' => '/',
@@ -27,4 +24,15 @@ public function getManifestEntries(): array
return $file['Key'];
}, $response->get('Contents'));
}
+
+ public function getPrefix(): string
+ {
+ $prefix = $this->filePath;
+ if (substr($prefix, -1) !== '/') {
+ // add trailing slash if not set to list only blobs in folder
+ $prefix .= '/';
+ }
+
+ return $prefix;
+ }
}
diff --git a/src/Storage/S3/SourceFile.php b/src/Storage/S3/SourceFile.php
index 09defe95..da53f505 100644
--- a/src/Storage/S3/SourceFile.php
+++ b/src/Storage/S3/SourceFile.php
@@ -94,6 +94,11 @@ public function getManifestEntries(): array
}, $manifest['entries']);
}
+ public function isSliced(): bool
+ {
+ return $this->isSliced;
+ }
+
/**
* @return string[]|null
*/
@@ -101,4 +106,39 @@ public function getPrimaryKeysNames(): ?array
{
return $this->primaryKeysNames;
}
+
+ /**
+ * from path data/shared/file.csv to file.csv
+ * @return string
+ * @throws \Exception
+ */
+ public function getFileName(): string
+ {
+ if ($this->isSliced) {
+ throw new \Exception('Not supported getFileName for sliced files.');
+ }
+ $fileName = $this->filePath;
+ if (strrpos($fileName, '/') !== false) {
+ // there is dir in the path
+ return substr($fileName, strrpos($fileName, '/') + 1);
+ }
+ // there is no dir in the path, just the filename
+ return $fileName;
+ }
+
+ /**
+ * from path data/shared/file.csv to data/shared/
+ * @return string
+ * @throws \Exception
+ */
+ public function getPrefix(): string
+ {
+ $prefix = $this->filePath;
+ $prefixLength = strrpos($prefix, '/');
+ if ($prefixLength !== false) {
+ // include / at the end
+ return substr($prefix, 0, $prefixLength + 1);
+ }
+ return '';
+ }
}
diff --git a/src/Storage/Teradata/SelectSource.php b/src/Storage/Teradata/SelectSource.php
new file mode 100644
index 00000000..6bcf22cb
--- /dev/null
+++ b/src/Storage/Teradata/SelectSource.php
@@ -0,0 +1,89 @@
+query = $query;
+ $this->queryBindings = $queryBindings;
+ $this->dataTypes = $dataTypes;
+ $this->columnsNames = $columnsNames;
+ $this->primaryKeysNames = $primaryKeysNames;
+ }
+
+ /**
+ * @return string[]
+ */
+ public function getColumnsNames(): array
+ {
+ return $this->columnsNames;
+ }
+
+ /**
+ * @return string[]
+ */
+ public function getDataTypes(): array
+ {
+ return $this->dataTypes;
+ }
+
+ public function getFromStatement(): string
+ {
+ return sprintf('%s', $this->getQuery());
+ }
+
+ public function getQuery(): string
+ {
+ return $this->query;
+ }
+
+ /**
+ * @return string[]|null
+ */
+ public function getPrimaryKeysNames(): ?array
+ {
+ return $this->primaryKeysNames;
+ }
+
+ /**
+ * @return array|string[]
+ */
+ public function getQueryBindings(): array
+ {
+ return $this->queryBindings;
+ }
+}
diff --git a/src/Storage/Teradata/Table.php b/src/Storage/Teradata/Table.php
new file mode 100644
index 00000000..fb46bfd6
--- /dev/null
+++ b/src/Storage/Teradata/Table.php
@@ -0,0 +1,98 @@
+schema = $schema;
+ $this->tableName = $tableName;
+ $this->columnsNames = $columns;
+ $this->primaryKeysNames = $primaryKeysNames;
+ }
+
+ public function getFromStatement(): string
+ {
+ $select = '*';
+ $colums = $this->getColumnsNames();
+ if ($colums !== []) {
+ $quotedColumns = array_map(static function ($column) {
+ // trim because implicit casting adds right padding spaces
+ // value 10.5 as DECIMAL(8,1) implicitly casted to varchar would be then " 10.5"
+ return sprintf('TRIM(%s)', TeradataQuote::quoteSingleIdentifier($column));
+ }, $colums);
+ $select = implode(', ', $quotedColumns);
+ }
+
+ return sprintf('SELECT %s FROM %s', $select, $this->getQuotedTableWithScheme());
+ }
+
+ /**
+ * @return string[]
+ */
+ public function getColumnsNames(): array
+ {
+ return $this->columnsNames;
+ }
+
+ public function getQuotedTableWithScheme(): string
+ {
+ return sprintf(
+ '%s.%s',
+ TeradataQuote::quoteSingleIdentifier($this->getSchema()),
+ TeradataQuote::quoteSingleIdentifier($this->getTableName())
+ );
+ }
+
+ public function getPrimaryKeysNames(): ?array
+ {
+ return $this->primaryKeysNames;
+ }
+
+ /**
+ * @return string[]
+ */
+ public function getQueryBindings(): array
+ {
+ // TODO
+ return [];
+ }
+
+ public function getSchema(): string
+ {
+ return $this->schema;
+ }
+
+ public function getTableName(): string
+ {
+ return $this->tableName;
+ }
+}
diff --git a/tests/data/csv/simple/a_b_c-1row.csv b/tests/data/csv/simple/a_b_c-1row.csv
new file mode 100644
index 00000000..0eadb693
--- /dev/null
+++ b/tests/data/csv/simple/a_b_c-1row.csv
@@ -0,0 +1,2 @@
+a,b,c
+1,2,3
\ No newline at end of file
diff --git a/tests/functional/Teradata/FullImportTest.php b/tests/functional/Teradata/FullImportTest.php
new file mode 100644
index 00000000..e342f97b
--- /dev/null
+++ b/tests/functional/Teradata/FullImportTest.php
@@ -0,0 +1,487 @@
+cleanDatabase($this->getDestinationDbName());
+ $this->createDatabase($this->getDestinationDbName());
+
+ $this->cleanDatabase($this->getSourceDbName());
+ $this->createDatabase($this->getSourceDbName());
+ }
+
+ protected function tearDown(): void
+ {
+ $this->connection->close();
+ parent::tearDown();
+ }
+
+ public function testLoadToFinalTableWithoutDedup(): void
+ {
+ // table translations checks numeric and string-ish data
+ $this->initTable(self::TABLE_TRANSLATIONS);
+
+ // skipping header
+ $options = $this->getImportOptions([], false, false, 1);
+ $source = $this->createS3SourceInstance(
+ self::TABLE_TRANSLATIONS . '.csv',
+ [
+ 'id',
+ 'name',
+ 'price',
+ 'isDeleted',
+ ],
+ false,
+ false,
+ []
+ );
+
+ $importer = new ToStageImporter($this->connection);
+ $destinationRef = new TeradataTableReflection(
+ $this->connection,
+ $this->getDestinationDbName(),
+ self::TABLE_TRANSLATIONS
+ );
+ /** @var TeradataTableDefinition $destination */
+ $destination = $destinationRef->getTableDefinition();
+ $stagingTable = StageTableDefinitionFactory::createStagingTableDefinition($destination, [
+ 'id',
+ 'name',
+ 'price',
+ 'isDeleted',
+ ]);
+ $qb = new TeradataTableQueryBuilder();
+ $this->connection->executeStatement(
+ $qb->getCreateTableCommandFromDefinition($stagingTable)
+ );
+ $importState = $importer->importToStagingTable(
+ $source,
+ $stagingTable,
+ $options
+ );
+ $toFinalTableImporter = new FullImporter($this->connection);
+ $result = $toFinalTableImporter->importToTable(
+ $stagingTable,
+ $destination,
+ $options,
+ $importState
+ );
+
+ self::assertEquals(3, $destinationRef->getRowsCount());
+ }
+
+ /**
+ * @return Generator>
+ */
+ public function fullImportData(): Generator
+ {
+ $expectedEscaping = [];
+ $file = new CsvFile(self::DATA_DIR . 'escaping/standard-with-enclosures.csv');
+ foreach ($file as $row) {
+ $expectedEscaping[] = $row;
+ }
+ $escapingHeader = array_shift($expectedEscaping); // remove header
+ $expectedEscaping = array_values($expectedEscaping);
+
+ $expectedAccounts = [];
+ $file = new CsvFile(self::DATA_DIR . 'tw_accounts.csv');
+ foreach ($file as $row) {
+ $expectedAccounts[] = $row;
+ }
+ $accountsHeader = array_shift($expectedAccounts); // remove header
+ $expectedAccounts = array_values($expectedAccounts);
+
+ $file = new CsvFile(self::DATA_DIR . 'tw_accounts.changedColumnsOrder.csv');
+ $accountChangedColumnsOrderHeader = $file->getHeader();
+
+ $file = new CsvFile(self::DATA_DIR . 'lemma.csv');
+ $expectedLemma = [];
+ foreach ($file as $row) {
+ $expectedLemma[] = $row;
+ }
+ $lemmaHeader = array_shift($expectedLemma);
+ $expectedLemma = array_values($expectedLemma);
+
+ // large sliced manifest
+ $expectedLargeSlicedManifest = [];
+ for ($i = 0; $i <= 1500; $i++) {
+ $expectedLargeSlicedManifest[] = ['a', 'b'];
+ }
+
+ yield 'large manifest' => [
+ $this->createS3SourceInstance(
+ 'sliced/2cols-large/S3.2cols-large.csvmanifest',
+ $escapingHeader,
+ true,
+ false,
+ []
+ ),
+ [$this->getDestinationDbName(), self::TABLE_OUT_CSV_2COLS],
+ $this->getSimpleImportOptions(ImportOptions::SKIP_NO_LINE),
+ $expectedLargeSlicedManifest,
+ 1501,
+ self::TABLE_OUT_CSV_2COLS,
+ ];
+
+ yield 'empty manifest' => [
+ $this->createS3SourceInstance(
+ 'empty.manifest',
+ $escapingHeader,
+ true,
+ false,
+ []
+ ),
+ [$this->getDestinationDbName(), self::TABLE_OUT_CSV_2COLS],
+ $this->getSimpleImportOptions(),
+ [],
+ 0,
+ self::TABLE_OUT_CSV_2COLS,
+ ];
+
+ yield 'lemma' => [
+ $this->createS3SourceInstance(
+ 'lemma.csv',
+ $lemmaHeader,
+ false,
+ false,
+ []
+ ),
+ [$this->getDestinationDbName(), self::TABLE_OUT_LEMMA],
+ $this->getSimpleImportOptions(),
+ $expectedLemma,
+ 5,
+ self::TABLE_OUT_LEMMA,
+ ];
+
+ yield 'standard with enclosures' => [
+ $this->createS3SourceInstance(
+ 'standard-with-enclosures.csv',
+ $escapingHeader,
+ false,
+ false,
+ []
+ ),
+ [$this->getDestinationDbName(), self::TABLE_OUT_CSV_2COLS],
+ $this->getSimpleImportOptions(),
+ $expectedEscaping,
+ 7,
+ self::TABLE_OUT_CSV_2COLS,
+ ];
+
+ yield 'gzipped standard with enclosure' => [
+ $this->createS3SourceInstance(
+ 'gzipped-standard-with-enclosures.csv.gz',
+ $escapingHeader,
+ false,
+ false,
+ []
+ ),
+ [$this->getDestinationDbName(), self::TABLE_OUT_CSV_2COLS],
+ $this->getSimpleImportOptions(),
+ $expectedEscaping,
+ 7,
+ self::TABLE_OUT_CSV_2COLS,
+ ];
+
+ yield 'standard with enclosures tabs' => [
+ $this->createS3SourceInstanceFromCsv(
+ 'standard-with-enclosures.tabs.csv',
+ new CsvOptions("\t"),
+ $escapingHeader,
+ false,
+ false,
+ []
+ ),
+ [$this->getDestinationDbName(), self::TABLE_OUT_CSV_2COLS],
+ $this->getSimpleImportOptions(),
+ $expectedEscaping,
+ 7,
+ self::TABLE_OUT_CSV_2COLS,
+ ];
+
+ yield 'accounts changedColumnsOrder' => [
+ $this->createS3SourceInstance(
+ 'tw_accounts.changedColumnsOrder.csv',
+ $accountChangedColumnsOrderHeader,
+ false,
+ false,
+ ['id']
+ ),
+ [
+ $this->getDestinationDbName(),
+ self::TABLE_ACCOUNTS_3,
+ ],
+ $this->getSimpleImportOptions(),
+ $expectedAccounts,
+ 3,
+ self::TABLE_ACCOUNTS_3,
+ ];
+
+ yield 'accounts' => [
+ $this->createS3SourceInstance(
+ 'tw_accounts.csv',
+ $accountsHeader,
+ false,
+ false,
+ ['id']
+ ),
+ [$this->getDestinationDbName(), self::TABLE_ACCOUNTS_3],
+ $this->getSimpleImportOptions(),
+ $expectedAccounts,
+ 3,
+ self::TABLE_ACCOUNTS_3,
+ ];
+
+ // line ending detection is not supported yet for S3
+ yield 'accounts crlf' => [
+ $this->createS3SourceInstance(
+ 'tw_accounts.crlf.csv',
+ $accountsHeader,
+ false,
+ false,
+ ['id']
+ ),
+ [$this->getDestinationDbName(), self::TABLE_ACCOUNTS_3],
+ $this->getSimpleImportOptions(),
+ $expectedAccounts,
+ 3,
+ self::TABLE_ACCOUNTS_3,
+ ];
+
+ // manifests
+ yield 'accounts sliced' => [
+ $this->createS3SourceInstance(
+ 'sliced/accounts/S3.accounts.csvmanifest',
+ $accountsHeader,
+ true,
+ false,
+ ['id']
+ ),
+ [$this->getDestinationDbName(), self::TABLE_ACCOUNTS_3],
+ $this->getSimpleImportOptions(ImportOptions::SKIP_NO_LINE),
+ $expectedAccounts,
+ 3,
+ self::TABLE_ACCOUNTS_3,
+ ];
+
+ yield 'accounts sliced gzip' => [
+ $this->createS3SourceInstance(
+ 'sliced/accounts-gzip/S3.accounts-gzip.csvmanifest',
+ $accountsHeader,
+ true,
+ false,
+ ['id']
+ ),
+ [$this->getDestinationDbName(), self::TABLE_ACCOUNTS_3],
+ $this->getSimpleImportOptions(ImportOptions::SKIP_NO_LINE),
+ $expectedAccounts,
+ 3,
+ self::TABLE_ACCOUNTS_3,
+ ];
+
+ // folder
+ yield 'accounts sliced folder import' => [
+ $this->createS3SourceInstance(
+ 'sliced_accounts_no_manifest/',
+ $accountsHeader,
+ true,
+ true,
+ ['id']
+ ),
+ [$this->getDestinationDbName(), self::TABLE_ACCOUNTS_3],
+ $this->getSimpleImportOptions(ImportOptions::SKIP_NO_LINE),
+ $expectedAccounts,
+ 3,
+ self::TABLE_ACCOUNTS_3,
+ ];
+
+ // reserved words
+ yield 'reserved words' => [
+ $this->createS3SourceInstance(
+ 'reserved-words.csv',
+ ['column', 'table'],
+ false,
+ false,
+ []
+ ),
+ [$this->getDestinationDbName(), self::TABLE_TABLE],
+ $this->getSimpleImportOptions(),
+ [['table', 'column', null]],
+ 1,
+ self::TABLE_TABLE,
+ ];
+ // import table with _timestamp columns - used by snapshots
+ yield 'import with _timestamp columns' => [
+ $this->createS3SourceInstance(
+ 'with-ts.csv',
+ [
+ 'col1',
+ 'col2',
+ '_timestamp',
+ ],
+ false,
+ false,
+ []
+ ),
+ [
+ $this->getDestinationDbName(),
+ self::TABLE_OUT_CSV_2COLS,
+ ],
+ $this->getSimpleImportOptions(ImportOptions::SKIP_NO_LINE),
+ [
+ ['a', 'b', '2014-11-10 13:12:06.000000'],
+ ['c', 'd', '2014-11-10 14:12:06.000000'],
+ ],
+ 2,
+ self::TABLE_OUT_CSV_2COLS,
+ ];
+ // test creating table without _timestamp column
+ yield 'table without _timestamp column' => [
+ $this->createS3SourceInstance(
+ 'standard-with-enclosures.csv',
+ $escapingHeader,
+ false,
+ false,
+ []
+ ),
+ [
+ $this->getDestinationDbName(),
+ self::TABLE_OUT_NO_TIMESTAMP_TABLE,
+ ],
+ $this->getImportOptions(
+ [],
+ false,
+ false, // don't use timestamp
+ ImportOptions::SKIP_FIRST_LINE
+ ),
+ $expectedEscaping,
+ 7,
+ self::TABLE_OUT_NO_TIMESTAMP_TABLE,
+ ];
+ // copy from table
+ yield 'copy from table' => [
+ new Table($this->getSourceDbName(), self::TABLE_OUT_CSV_2COLS, $escapingHeader),
+ [$this->getDestinationDbName(), self::TABLE_OUT_CSV_2COLS],
+ $this->getSimpleImportOptions(),
+ [['a', 'b'], ['c', 'd']],
+ 2,
+ self::TABLE_OUT_CSV_2COLS,
+ ];
+ yield 'copy from table 2' => [
+ new Table(
+ $this->getSourceDbName(),
+ self::TABLE_TYPES,
+ [
+ 'charCol',
+ 'numCol',
+ 'floatCol',
+ 'boolCol',
+ ]
+ ),
+ [
+ $this->getDestinationDbName(),
+ self::TABLE_TYPES,
+ ],
+ $this->getSimpleImportOptions(),
+ // TODO https://keboola.atlassian.net/browse/KBC-2526 it should cast 0.3 to "0.3"
+ [['a', '10.5', '3.00000000000000E-001', '1']],
+ 1,
+ self::TABLE_TYPES,
+ ];
+ }
+
+ /**
+ * @dataProvider fullImportData
+ * @param array $table
+ * @param array $expected
+ * @param string $tablesToInit
+ */
+ public function testFullImportWithDataSet(
+ SourceInterface $source,
+ array $table,
+ TeradataImportOptions $options,
+ array $expected,
+ int $expectedImportedRowCount,
+ string $tablesToInit
+ ): void {
+ $this->initTable($tablesToInit);
+
+ [$schemaName, $tableName] = $table;
+ /** @var TeradataTableDefinition $destination */
+ $destination = (new TeradataTableReflection(
+ $this->connection,
+ $schemaName,
+ $tableName
+ ))->getTableDefinition();
+
+ $stagingTable = StageTableDefinitionFactory::createStagingTableDefinition(
+ $destination,
+ $source->getColumnsNames()
+ );
+ $qb = new TeradataTableQueryBuilder();
+ $this->connection->executeStatement(
+ $qb->getCreateTableCommandFromDefinition($stagingTable)
+ );
+ $toStageImporter = new ToStageImporter($this->connection);
+ $toFinalTableImporter = new FullImporter($this->connection);
+ try {
+ $importState = $toStageImporter->importToStagingTable(
+ $source,
+ $stagingTable,
+ $options
+ );
+ $result = $toFinalTableImporter->importToTable(
+ $stagingTable,
+ $destination,
+ $options,
+ $importState
+ );
+ } finally {
+ if ($this->connection->fetchOne(
+ (new SqlBuilder())->getTableExistsCommand(
+ $stagingTable->getSchemaName(),
+ $stagingTable->getTableName()
+ )
+ ) > 0) {
+ $this->connection->executeStatement((new SqlBuilder())->getDropTableUnsafe(
+ $stagingTable->getSchemaName(),
+ $stagingTable->getTableName()
+ ));
+ }
+ }
+
+ self::assertEquals($expectedImportedRowCount, $result->getImportedRowsCount());
+
+ $this->assertTeradataTableEqualsExpected(
+ $source,
+ $destination,
+ $options,
+ $expected,
+ 0
+ );
+ }
+}
diff --git a/tests/functional/Teradata/SqlBuilderTest.php b/tests/functional/Teradata/SqlBuilderTest.php
new file mode 100644
index 00000000..c84a3c6b
--- /dev/null
+++ b/tests/functional/Teradata/SqlBuilderTest.php
@@ -0,0 +1,419 @@
+cleanDatabase(self::TEST_DB);
+ }
+
+ protected function getBuilder(): SqlBuilder
+ {
+ return new SqlBuilder();
+ }
+
+ protected function setUp(): void
+ {
+ parent::setUp();
+ $this->dropTestDb();
+ }
+
+ protected function createTestDb(): void
+ {
+ $this->createDatabase(self::TEST_DB);
+ }
+
+ public function testGetDedupCommand(): void
+ {
+ $this->markTestSkipped('not implemented');
+ }
+
+ private function createStagingTableWithData(bool $includeEmptyValues = false): TeradataTableDefinition
+ {
+ $def = $this->getStagingTableDefinition();
+ $qb = new TeradataTableQueryBuilder();
+ $this->connection->executeStatement($qb->getCreateTableCommandFromDefinition($def));
+
+ $this->connection->executeStatement(
+ sprintf(
+ 'INSERT INTO %s.%s("pk1","pk2","col1","col2") VALUES (1,1,\'1\',\'1\')',
+ self::TEST_DB_QUOTED,
+ self::TEST_STAGING_TABLE_QUOTED
+ )
+ );
+ $this->connection->executeStatement(
+ sprintf(
+ 'INSERT INTO %s.%s("pk1","pk2","col1","col2") VALUES (1,1,\'1\',\'1\')',
+ self::TEST_DB_QUOTED,
+ self::TEST_STAGING_TABLE_QUOTED
+ )
+ );
+ $this->connection->executeStatement(
+ sprintf(
+ 'INSERT INTO %s.%s("pk1","pk2","col1","col2") VALUES (2,2,\'2\',\'2\')',
+ self::TEST_DB_QUOTED,
+ self::TEST_STAGING_TABLE_QUOTED
+ )
+ );
+
+ if ($includeEmptyValues) {
+ $this->connection->executeStatement(
+ sprintf(
+ 'INSERT INTO %s.%s("pk1","pk2","col1","col2") VALUES (2,2,\'\',NULL)',
+ self::TEST_DB_QUOTED,
+ self::TEST_STAGING_TABLE_QUOTED
+ )
+ );
+ }
+
+ return $def;
+ }
+
+ public function testGetDeleteOldItemsCommand(): void
+ {
+ $this->markTestSkipped('not implemented');
+ }
+
+ private function assertTableNotExists(string $schemaName, string $tableName): void
+ {
+ try {
+ (new TeradataTableReflection($this->connection, $schemaName, $tableName))->getTableStats();
+ self::fail(sprintf(
+ 'Table "%s.%s" is expected to not exist.',
+ $schemaName,
+ $tableName
+ ));
+ } catch (Exception $e) {
+ }
+ }
+
+ public function testGetDropTableIfExistsCommand(): void
+ {
+ $this->createTestDb();
+ $this->assertTableNotExists(self::TEST_DB, self::TEST_TABLE);
+
+ // check that it cannot find non-existing table
+ $sql = $this->getBuilder()->getTableExistsCommand(self::TEST_DB, self::TEST_TABLE);
+ self::assertEquals(
+ // phpcs:ignore
+ "SELECT COUNT(*) FROM DBC.Tables WHERE DatabaseName = 'import-export-test_schema' AND TableName = 'import-export-test_test';", $sql
+ );
+ $this->assertEquals(0, $this->connection->fetchOne($sql));
+
+ // try to drop not existing table
+ try {
+ $sql = $this->getBuilder()->getDropTableUnsafe(self::TEST_DB, self::TEST_TABLE);
+ self::assertEquals(
+ // phpcs:ignore
+ 'DROP TABLE "import-export-test_schema"."import-export-test_test"',
+ $sql
+ );
+ $this->connection->executeStatement($sql);
+ } catch (DriverException $e) {
+ $this->assertContains('Base table or view not found', $e->getMessage());
+ }
+
+ // create table
+ $this->initSingleTable(self::TEST_DB, self::TEST_TABLE);
+
+ // check that the table exists already
+ $sql = $this->getBuilder()->getTableExistsCommand(self::TEST_DB, self::TEST_TABLE);
+ $this->assertEquals(1, $this->connection->fetchOne($sql));
+
+ // drop existing table
+ $sql = $this->getBuilder()->getDropTableUnsafe(self::TEST_DB, self::TEST_TABLE);
+ $this->connection->executeStatement($sql);
+
+ // check that the table doesn't exist anymore
+ $sql = $this->getBuilder()->getTableExistsCommand(self::TEST_DB, self::TEST_TABLE);
+ $this->assertEquals(0, $this->connection->fetchOne($sql));
+ }
+
+ public function testGetInsertAllIntoTargetTableCommand(): void
+ {
+ $this->createTestDb();
+ $destination = $this->createTestTableWithColumns();
+ $this->createStagingTableWithData(true);
+
+ // create fake stage and say that there is less columns
+ $fakeStage = new TeradataTableDefinition(
+ self::TEST_DB,
+ self::TEST_STAGING_TABLE,
+ true,
+ new ColumnCollection([
+ $this->createNullableGenericColumn('col1'),
+ $this->createNullableGenericColumn('col2'),
+ ]),
+ []
+ );
+
+ // no convert values no timestamp
+ $sql = $this->getBuilder()->getInsertAllIntoTargetTableCommand(
+ $fakeStage,
+ $destination,
+ $this->getImportOptions(),
+ '2020-01-01 00:00:00'
+ );
+
+ self::assertEquals(
+ // phpcs:ignore
+ 'INSERT INTO "import-export-test_schema"."import-export-test_test" ("col1", "col2") SELECT CAST(COALESCE("col1", \'\') as VARCHAR (50)) AS "col1",CAST(COALESCE("col2", \'\') as VARCHAR (50)) AS "col2" FROM "import-export-test_schema"."stagingTable" AS "src"',
+ $sql
+ );
+
+ $out = $this->connection->executeStatement($sql);
+ self::assertEquals(4, $out);
+
+ $result = $this->connection->fetchAllAssociative(sprintf(
+ 'SELECT * FROM %s.%s',
+ TeradataQuote::quoteSingleIdentifier(self::TEST_DB),
+ TeradataQuote::quoteSingleIdentifier(self::TEST_TABLE),
+ ));
+
+ self::assertEqualsCanonicalizing([
+ [
+ 'id' => null,
+ 'col1' => '1',
+ 'col2' => '1',
+ ],
+ [
+ 'id' => null,
+ 'col1' => '1',
+ 'col2' => '1',
+ ],
+ [
+ 'id' => null,
+ 'col1' => '2',
+ 'col2' => '2',
+ ],
+ [
+ 'id' => null,
+ 'col1' => '',
+ 'col2' => '',
+ ],
+ ], $result);
+ }
+
+ protected function createTestTableWithColumns(
+ bool $includeTimestamp = false,
+ bool $includePrimaryKey = false
+ ): TeradataTableDefinition {
+ $columns = [];
+ $pks = [];
+ if ($includePrimaryKey) {
+ $pks[] = 'id';
+ $columns[] = new TeradataColumn(
+ 'id',
+ new Teradata(Teradata::TYPE_INT)
+ );
+ } else {
+ $columns[] = $this->createNullableGenericColumn('id');
+ }
+ $columns[] = $this->createNullableGenericColumn('col1');
+ $columns[] = $this->createNullableGenericColumn('col2');
+
+ if ($includeTimestamp) {
+ $columns[] = new TeradataColumn(
+ '_timestamp',
+ new Teradata(Teradata::TYPE_TIMESTAMP)
+ );
+ }
+
+ $tableDefinition = new TeradataTableDefinition(
+ self::TEST_DB,
+ self::TEST_TABLE,
+ false,
+ new ColumnCollection($columns),
+ $pks
+ );
+ $this->connection->executeStatement(
+ (new TeradataTableQueryBuilder())->getCreateTableCommandFromDefinition($tableDefinition)
+ );
+
+ return $tableDefinition;
+ }
+
+ private function createNullableGenericColumn(string $columnName): TeradataColumn
+ {
+ $definition = new Teradata(
+ Teradata::TYPE_VARCHAR,
+ [
+ 'length' => '50', // should be changed to max in future
+ 'nullable' => true,
+ ]
+ );
+
+ return new TeradataColumn(
+ $columnName,
+ $definition
+ );
+ }
+
+ public function testGetInsertAllIntoTargetTableCommandConvertToNull(): void
+ {
+ $this->createTestDb();
+ $destination = $this->createTestTableWithColumns();
+
+ $this->createStagingTableWithData(true);
+ // create fake stage and say that there is less columns
+ $fakeStage = new TeradataTableDefinition(
+ self::TEST_DB,
+ self::TEST_STAGING_TABLE,
+ true,
+ new ColumnCollection([
+ $this->createNullableGenericColumn('col1'),
+ $this->createNullableGenericColumn('col2'),
+ ]),
+ []
+ );
+
+ // convert col1 to null
+ $options = $this->getImportOptions(['col1']);
+ $sql = $this->getBuilder()->getInsertAllIntoTargetTableCommand(
+ $fakeStage,
+ $destination,
+ $options,
+ '2020-01-01 00:00:00'
+ );
+ self::assertEquals(
+ // phpcs:ignore
+ 'INSERT INTO "import-export-test_schema"."import-export-test_test" ("col1", "col2") SELECT NULLIF("col1", \'\'),CAST(COALESCE("col2", \'\') as VARCHAR (50)) AS "col2" FROM "import-export-test_schema"."stagingTable" AS "src"',
+ $sql
+ );
+ $out = $this->connection->executeStatement($sql);
+ self::assertEquals(4, $out);
+
+ $result = $this->connection->fetchAllAssociative(sprintf(
+ 'SELECT * FROM %s',
+ self::TEST_TABLE_IN_DB
+ ));
+
+ self::assertEqualsCanonicalizing([
+ [
+ 'id' => null,
+ 'col1' => '1',
+ 'col2' => '1',
+ ],
+ [
+ 'id' => null,
+ 'col1' => '1',
+ 'col2' => '1',
+ ],
+ [
+ 'id' => null,
+ 'col1' => '2',
+ 'col2' => '2',
+ ],
+ [
+ 'id' => null,
+ 'col1' => null,
+ 'col2' => '',
+ ],
+ ], $result);
+ }
+
+ public function testGetInsertAllIntoTargetTableCommandConvertToNullWithTimestamp(): void
+ {
+ $this->createTestDb();
+ $destination = $this->createTestTableWithColumns(true);
+ $this->createStagingTableWithData(true);
+ // create fake stage and say that there is less columns
+ $fakeStage = new TeradataTableDefinition(
+ self::TEST_DB,
+ self::TEST_STAGING_TABLE,
+ true,
+ new ColumnCollection([
+ $this->createNullableGenericColumn('col1'),
+ $this->createNullableGenericColumn('col2'),
+ ]),
+ []
+ );
+
+ // use timestamp
+ $options = $this->getImportOptions(['col1'], false, true);
+ $sql = $this->getBuilder()->getInsertAllIntoTargetTableCommand(
+ $fakeStage,
+ $destination,
+ $options,
+ '2020-01-01 00:00:00'
+ );
+ self::assertEquals(
+ // phpcs:ignore
+ 'INSERT INTO "import-export-test_schema"."import-export-test_test" ("col1", "col2", "_timestamp") SELECT NULLIF("col1", \'\'),CAST(COALESCE("col2", \'\') as VARCHAR (50)) AS "col2",\'2020-01-01 00:00:00\' FROM "import-export-test_schema"."stagingTable" AS "src"',
+ $sql
+ );
+ $out = $this->connection->executeStatement($sql);
+ self::assertEquals(4, $out);
+
+ $result = $this->connection->fetchAllAssociative(sprintf(
+ 'SELECT * FROM %s',
+ self::TEST_TABLE_IN_DB
+ ));
+
+ foreach ($result as $item) {
+ self::assertArrayHasKey('id', $item);
+ self::assertArrayHasKey('col1', $item);
+ self::assertArrayHasKey('col2', $item);
+ self::assertArrayHasKey('_timestamp', $item);
+ }
+ }
+
+ public function testGetTruncateTableWithDeleteCommand(): void
+ {
+ $this->createTestDb();
+ $this->createStagingTableWithData();
+
+ $ref = new TeradataTableReflection($this->connection, self::TEST_DB, self::TEST_STAGING_TABLE);
+ self::assertEquals(3, $ref->getRowsCount());
+
+ $sql = $this->getBuilder()->getTruncateTableWithDeleteCommand(self::TEST_DB, self::TEST_STAGING_TABLE);
+ self::assertEquals(
+ 'DELETE FROM "import-export-test_schema"."stagingTable"',
+ $sql
+ );
+ $this->connection->executeStatement($sql);
+ self::assertEquals(0, $ref->getRowsCount());
+ }
+
+ private function getStagingTableDefinition(): TeradataTableDefinition
+ {
+ return new TeradataTableDefinition(
+ self::TEST_DB,
+ self::TEST_STAGING_TABLE,
+ true,
+ new ColumnCollection([
+ $this->createNullableGenericColumn('pk1'),
+ $this->createNullableGenericColumn('pk2'),
+ $this->createNullableGenericColumn('col1'),
+ $this->createNullableGenericColumn('col2'),
+ ]),
+ []
+ );
+ }
+}
diff --git a/tests/functional/Teradata/StageImportTest.php b/tests/functional/Teradata/StageImportTest.php
new file mode 100644
index 00000000..1e0bc1ab
--- /dev/null
+++ b/tests/functional/Teradata/StageImportTest.php
@@ -0,0 +1,57 @@
+cleanDatabase(self::TEST_DATABASE);
+ $this->createDatabase(self::TEST_DATABASE);
+ }
+
+ public function testSimpleStageImport(): void
+ {
+ $this->connection->executeQuery(
+ sprintf(
+ 'CREATE MULTISET TABLE %s.%s ,NO FALLBACK
+ (
+ "id" INTEGER NOT NULL,
+ "first_name" CHAR(50),
+ "last_name" CHAR(50)
+ );',
+ TeradataQuote::quoteSingleIdentifier(self::TEST_DATABASE),
+ TeradataQuote::quoteSingleIdentifier(self::TABLE_GENERIC)
+ )
+ );
+
+ $importer = new ToStageImporter($this->connection);
+ $ref = new TeradataTableReflection(
+ $this->connection,
+ self::TEST_DATABASE,
+ self::TABLE_GENERIC
+ );
+
+ $state = $importer->importToStagingTable(
+ $this->createS3SourceInstanceFromCsv('csv/simple/a_b_c-1row.csv', new CsvOptions()),
+ $ref->getTableDefinition(),
+ $this->getImportOptions(
+ [],
+ false,
+ false,
+ 1
+ )
+ );
+
+ $this->assertEquals(1, $state->getResult()->getImportedRowsCount());
+ }
+}
diff --git a/tests/functional/Teradata/TeradataBaseTestCase.php b/tests/functional/Teradata/TeradataBaseTestCase.php
new file mode 100644
index 00000000..e9a9e3b0
--- /dev/null
+++ b/tests/functional/Teradata/TeradataBaseTestCase.php
@@ -0,0 +1,443 @@
+connection = $this->getTeradataConnection();
+ }
+
+ protected function getSourceDbName(): string
+ {
+ return self::TERADATA_SOURCE_DATABASE_NAME
+ . '-'
+ . getenv('SUITE');
+ }
+
+ protected function getDestinationDbName(): string
+ {
+ return self::TERADATA_DESTINATION_DATABASE_NAME
+ . '-'
+ . getenv('SUITE');
+ }
+
+ private function getTeradataConnection(): Connection
+ {
+ return TeradataConnection::getConnection([
+ 'host' => (string) getenv('TERADATA_HOST'),
+ 'user' => (string) getenv('TERADATA_USERNAME'),
+ 'password' => (string) getenv('TERADATA_PASSWORD'),
+ 'port' => (int) getenv('TERADATA_PORT'),
+ 'dbname' => '',
+ ]);
+ }
+
+ /**
+ * @param string[] $columnsNames
+ */
+ public function getColumnsWithoutTypes(array $columnsNames): ColumnCollection
+ {
+ $columns = array_map(function ($colName) {
+ return new TeradataColumn(
+ $colName,
+ new Teradata(
+ Teradata::TYPE_VARCHAR,
+ ['length' => 4000]
+ )
+ );
+ }, $columnsNames);
+ return new ColumnCollection($columns);
+ }
+
+ /**
+ * @param string[] $columns
+ * @param string[] $pks
+ */
+ public function getGenericTableDefinition(
+ string $schemaName,
+ string $tableName,
+ array $columns,
+ array $pks = []
+ ): TeradataTableDefinition {
+ return new TeradataTableDefinition(
+ $schemaName,
+ $tableName,
+ false,
+ $this->getColumnsWithoutTypes($columns),
+ $pks
+ );
+ }
+
+ protected function cleanDatabase(string $dbname): void
+ {
+ if (!$this->dbExists($dbname)) {
+ return;
+ }
+
+ // delete all objects in the DB
+ $this->connection->executeQuery(
+ sprintf('DELETE DATABASE %s ALL', TeradataQuote::quoteSingleIdentifier($dbname))
+ );
+ // drop the empty db
+ $this->connection->executeQuery(
+ sprintf('DROP DATABASE %s', TeradataQuote::quoteSingleIdentifier($dbname))
+ );
+ }
+
+ public function createDatabase(string $dbName): void
+ {
+ $this->connection->executeQuery(sprintf('
+CREATE DATABASE %s AS
+ PERM = 5e7
+ SPOOL = 5e7;
+
+ ', TeradataQuote::quoteSingleIdentifier($dbName)));
+ }
+
+ protected function dbExists(string $dbname): bool
+ {
+ try {
+ $this->connection->executeQuery(sprintf('HELP DATABASE %s', TeradataQuote::quoteSingleIdentifier($dbname)));
+ return true;
+ } catch (\Doctrine\DBAL\Exception $e) {
+ // https://docs.teradata.com/r/GVKfXcemJFkTJh_89R34UQ/j2TdlzqRJ9LpndY3efMdlw
+ if (strpos($e->getMessage(), '3802')) {
+ return false;
+ }
+ throw $e;
+ }
+ }
+
+ protected function initSingleTable(
+ string $db = self::TERADATA_SOURCE_DATABASE_NAME,
+ string $table = self::TABLE_TABLE
+ ): void {
+ if (!$this->dbExists($db)) {
+ $this->createDatabase($db);
+ }
+ // char because of Stats test
+ $this->connection->executeQuery(
+ sprintf(
+ 'CREATE MULTISET TABLE %s.%s, NO FALLBACK
+ (
+"Other" VARCHAR(50)
+ );',
+ TeradataQuote::quoteSingleIdentifier($db),
+ TeradataQuote::quoteSingleIdentifier($table)
+ )
+ );
+ }
+
+ protected function initTable(string $tableName): void
+ {
+ switch ($tableName) {
+ case self::TABLE_OUT_CSV_2COLS_WITHOUT_TS:
+ $this->connection->executeQuery(
+ sprintf(
+ 'CREATE MULTISET TABLE %s.%s, NO FALLBACK
+ (
+"VisitID" VARCHAR(50) CHARACTER SET UNICODE,
+"Value" VARCHAR(50),
+"MenuItem" VARCHAR(50),
+"Something" VARCHAR(50),
+"Other" VARCHAR(50),
+ )
+PRIMARY INDEX ("VisitID");
+ );',
+ TeradataQuote::quoteSingleIdentifier($this->getDestinationDbName()),
+ TeradataQuote::quoteSingleIdentifier($tableName)
+ )
+ );
+ break;
+ case self::TABLE_COLUMN_NAME_ROW_NUMBER:
+ $this->connection->executeQuery(sprintf(
+ 'CREATE MULTISET TABLE %s.%s, NO FALLBACK
+ (
+ "id" VARCHAR(50) CHARACTER SET UNICODE,
+ "row_number" VARCHAR(50)
+ )',
+ TeradataQuote::quoteSingleIdentifier($this->getDestinationDbName()),
+ TeradataQuote::quoteSingleIdentifier($tableName)
+ ));
+ break;
+ case self::TABLE_TRANSLATIONS:
+ $this->connection->executeQuery(sprintf(
+ 'CREATE MULTISET TABLE %s.%s, NO FALLBACK
+ (
+ "id" INT ,
+ "name" VARCHAR(50) CHARACTER SET UNICODE,
+ "price" INT ,
+ "isDeleted" INT
+ )',
+ TeradataQuote::quoteSingleIdentifier($this->getDestinationDbName()),
+ TeradataQuote::quoteSingleIdentifier($tableName)
+ ));
+ break;
+ case self::TABLE_OUT_CSV_2COLS:
+ $this->connection->executeQuery(
+ sprintf(
+ 'CREATE MULTISET TABLE %s.%s, NO FALLBACK (
+ "col1" VARCHAR(200) ,
+ "col2" VARCHAR(200) ,
+ "_timestamp" TIMESTAMP
+ );',
+ TeradataQuote::quoteSingleIdentifier($this->getDestinationDbName()),
+ TeradataQuote::quoteSingleIdentifier($tableName)
+ )
+ );
+
+ $this->connection->executeQuery(sprintf(
+ 'INSERT INTO %s.%s VALUES (\'x\', \'y\', NOW());',
+ TeradataQuote::quoteSingleIdentifier($this->getDestinationDbName()),
+ TeradataQuote::quoteSingleIdentifier($tableName)
+ ));
+
+ $this->connection->executeQuery(sprintf(
+ 'CREATE MULTISET TABLE %s.%s, NO FALLBACK (
+ "col1" VARCHAR(50) CHARACTER SET UNICODE,
+ "col2" VARCHAR(50)
+ );',
+ TeradataQuote::quoteSingleIdentifier($this->getSourceDbName()),
+ TeradataQuote::quoteSingleIdentifier($tableName)
+ ));
+
+ $this->connection->executeQuery(sprintf(
+ 'INSERT INTO %s.%s VALUES (\'a\', \'b\');',
+ TeradataQuote::quoteSingleIdentifier($this->getSourceDbName()),
+ TeradataQuote::quoteSingleIdentifier($tableName)
+ ));
+
+ $this->connection->executeQuery(sprintf(
+ 'INSERT INTO %s.%s VALUES (\'c\', \'d\');',
+ TeradataQuote::quoteSingleIdentifier($this->getSourceDbName()),
+ TeradataQuote::quoteSingleIdentifier($tableName)
+ ));
+ break;
+ case self::TABLE_OUT_LEMMA:
+ $this->connection->executeQuery(sprintf(
+ 'CREATE MULTISET TABLE %s.%s, NO FALLBACK (
+ "ts" VARCHAR(50) ,
+ "lemma" VARCHAR(50) ,
+ "lemmaIndex" VARCHAR(50) CHARACTER SET UNICODE,
+ "_timestamp" TIMESTAMP
+ );',
+ TeradataQuote::quoteSingleIdentifier($this->getDestinationDbName()),
+ TeradataQuote::quoteSingleIdentifier($tableName)
+ ));
+ break;
+ case self::TABLE_ACCOUNTS_3:
+ $this->connection->executeQuery(sprintf(
+ 'CREATE MULTISET TABLE %s.%s, NO FALLBACK (
+ "id" VARCHAR(50) CHARACTER SET UNICODE,
+ "idTwitter" VARCHAR(50) CHARACTER SET UNICODE,
+ "name" VARCHAR(100) CHARACTER SET UNICODE,
+ "import" VARCHAR(50) CHARACTER SET UNICODE,
+ "isImported" VARCHAR(50) CHARACTER SET UNICODE,
+ "apiLimitExceededDatetime" VARCHAR(50) CHARACTER SET UNICODE,
+ "analyzeSentiment" VARCHAR(50) CHARACTER SET UNICODE,
+ "importKloutScore" VARCHAR(50) CHARACTER SET UNICODE,
+ "timestamp" VARCHAR(50) CHARACTER SET UNICODE,
+ "oauthToken" VARCHAR(50) CHARACTER SET UNICODE,
+ "oauthSecret" VARCHAR(50) CHARACTER SET UNICODE,
+ "idApp" VARCHAR(50) CHARACTER SET UNICODE,
+ "_timestamp" TIMESTAMP
+ ) PRIMARY INDEX ("id");',
+ TeradataQuote::quoteSingleIdentifier($this->getDestinationDbName()),
+ TeradataQuote::quoteSingleIdentifier($tableName)
+ ));
+ break;
+ case self::TABLE_TABLE:
+ $this->connection->executeQuery(sprintf(
+ 'CREATE MULTISET TABLE %s.%s, NO FALLBACK (
+ "column" VARCHAR(50) ,
+ "table" VARCHAR(50) ,
+ "lemmaIndex" VARCHAR(50) CHARACTER SET UNICODE,
+ "_timestamp" TIMESTAMP
+ );',
+ TeradataQuote::quoteSingleIdentifier($this->getDestinationDbName()),
+ TeradataQuote::quoteSingleIdentifier($tableName)
+ ));
+ break;
+ case self::TABLE_OUT_NO_TIMESTAMP_TABLE:
+ $this->connection->executeQuery(sprintf(
+ 'CREATE MULTISET TABLE %s.%s, NO FALLBACK (
+ "col1" VARCHAR(50) ,
+ "col2" VARCHAR(50)
+ );',
+ TeradataQuote::quoteSingleIdentifier($this->getDestinationDbName()),
+ TeradataQuote::quoteSingleIdentifier($tableName)
+ ));
+ break;
+
+ case self::TABLE_TYPES:
+ $this->connection->executeQuery(sprintf(
+ 'CREATE TABLE %s."types" (
+ "charCol" VARCHAR(50) CHARACTER SET UNICODE,
+ "numCol" VARCHAR(50) CHARACTER SET UNICODE,
+ "floatCol" VARCHAR(50) CHARACTER SET UNICODE,
+ "boolCol" VARCHAR(50) CHARACTER SET UNICODE,
+ "_timestamp" TIMESTAMP
+ );',
+ TeradataQuote::quoteSingleIdentifier($this->getDestinationDbName())
+ ));
+
+ $this->connection->executeQuery(sprintf(
+ 'CREATE TABLE %s."types" (
+ "charCol" VARCHAR(50) CHARACTER SET UNICODE,
+ "numCol" decimal(10,1) ,
+ "floatCol" float ,
+ "boolCol" BYTEINT
+ );',
+ TeradataQuote::quoteSingleIdentifier($this->getSourceDbName())
+ ));
+ $this->connection->executeQuery(sprintf(
+ 'INSERT INTO %s."types" VALUES
+ (\'a\', \'10.5\', \'0.3\', 1)
+ ;',
+ TeradataQuote::quoteSingleIdentifier($this->getSourceDbName())
+ ));
+ break;
+ default:
+ throw new \Exception('unknown table');
+ }
+ }
+
+ /**
+ * @param string[] $convertEmptyValuesToNull
+ */
+ protected function getImportOptions(
+ array $convertEmptyValuesToNull = [],
+ bool $isIncremental = false,
+ bool $useTimestamp = false,
+ int $numberOfIgnoredLines = 0
+ ): TeradataImportOptions {
+ return
+ new TeradataImportOptions(
+ (string) getenv('TERADATA_HOST'),
+ (string) getenv('TERADATA_USERNAME'),
+ (string) getenv('TERADATA_PASSWORD'),
+ (int) getenv('TERADATA_PORT'),
+ $convertEmptyValuesToNull,
+ $isIncremental,
+ $useTimestamp,
+ $numberOfIgnoredLines,
+ );
+ }
+
+ protected function getSimpleImportOptions(
+ int $skipLines = ImportOptions::SKIP_FIRST_LINE,
+ bool $useTimestamp = true
+ ): TeradataImportOptions {
+ return
+ new TeradataImportOptions(
+ (string) getenv('TERADATA_HOST'),
+ (string) getenv('TERADATA_USERNAME'),
+ (string) getenv('TERADATA_PASSWORD'),
+ (int) getenv('TERADATA_PORT'),
+ [],
+ false,
+ $useTimestamp,
+ $skipLines,
+ );
+ }
+
+ /**
+ * @param int|string $sortKey
+ * @param array $expected
+ * @param string|int $sortKey
+ */
+ protected function assertTeradataTableEqualsExpected(
+ SourceInterface $source,
+ TeradataTableDefinition $destination,
+ TeradataImportOptions $options,
+ array $expected,
+ $sortKey,
+ string $message = 'Imported tables are not the same as expected'
+ ): void {
+ $tableColumns = (new TeradataTableReflection(
+ $this->connection,
+ $destination->getSchemaName(),
+ $destination->getTableName()
+ ))->getColumnsNames();
+
+ if ($options->useTimestamp()) {
+ self::assertContains('_timestamp', $tableColumns);
+ } else {
+ self::assertNotContains('_timestamp', $tableColumns);
+ }
+
+ if (!in_array('_timestamp', $source->getColumnsNames(), true)) {
+ $tableColumns = array_filter($tableColumns, static function ($column) {
+ return $column !== '_timestamp';
+ });
+ }
+
+ $tableColumns = array_map(static function ($column) {
+ return sprintf('%s', $column);
+ }, $tableColumns);
+
+ $sql = sprintf(
+ 'SELECT %s FROM %s.%s',
+ implode(', ', array_map(static function ($item) {
+ return TeradataQuote::quoteSingleIdentifier($item);
+ }, $tableColumns)),
+ TeradataQuote::quoteSingleIdentifier($destination->getSchemaName()),
+ TeradataQuote::quoteSingleIdentifier($destination->getTableName())
+ );
+
+ $queryResult = array_map(static function ($row) {
+ return array_map(static function ($column) {
+ return $column;
+ }, array_values($row));
+ }, $this->connection->fetchAllAssociative($sql));
+
+ $this->assertArrayEqualsSorted(
+ $expected,
+ $queryResult,
+ $sortKey,
+ $message
+ );
+ }
+}
diff --git a/tests/unit/Backend/Teradata/HelperTest.php b/tests/unit/Backend/Teradata/HelperTest.php
new file mode 100644
index 00000000..f67000bc
--- /dev/null
+++ b/tests/unit/Backend/Teradata/HelperTest.php
@@ -0,0 +1,67 @@
+createMock(SourceFile::class);
+
+ $mock->method('getManifestEntries')
+ ->willReturn($entriesData);
+ $mock->method('getS3Prefix')
+ ->willReturn('s3://zajca-php-db-import-test-s3filesbucket-bwdj3sk0c9xy');
+ $this->assertEquals($expected, BackendHelper::getMask($mock));
+ }
+
+ /**
+ * @return array[]
+ */
+ public function dataProvider(): array
+ {
+ return [
+ [
+ 'sliced/accounts-gzip/tw_accounts.csv.gz000*_part_00.gz',
+ [
+ // phpcs:ignore
+ 's3://zajca-php-db-import-test-s3filesbucket-bwdj3sk0c9xy/sliced/accounts-gzip/tw_accounts.csv.gz0001_part_00.gz',
+ // phpcs:ignore
+ 's3://zajca-php-db-import-test-s3filesbucket-bwdj3sk0c9xy/sliced/accounts-gzip/tw_accounts.csv.gz0002_part_00.gz',
+ ],
+ ],
+ [
+ 'sliced.csv_*',
+ [
+ 'sliced.csv_1001',
+ 'sliced.csv_0',
+ 'sliced.csv_1',
+ 'sliced.csv_10',
+ 'sliced.csv_100',
+ 'sliced.csv_1002',
+ ],
+ ],
+ [
+ 'sliced0****',
+ [
+ 'sliced0122.csv',
+ 'sliced012.csv',
+ 'sliced01.csv',
+ 'sliced01999.csv',
+ 'sliced0.csv',
+ 'sliced034.csv',
+ ],
+ ],
+ ];
+ }
+}
diff --git a/tests/unit/Storage/S3/SourceFileTest.php b/tests/unit/Storage/S3/SourceFileTest.php
index dad7e174..1156c0cc 100644
--- a/tests/unit/Storage/S3/SourceFileTest.php
+++ b/tests/unit/Storage/S3/SourceFileTest.php
@@ -24,4 +24,11 @@ public function testDefaultValues(): void
self::assertEquals([], $source->getColumnsNames());
self::assertNull($source->getPrimaryKeysNames());
}
+
+ public function testGetFilepathParts(): void
+ {
+ $source = $this->createDummyS3SourceInstance('data/shared/file.csv');
+ self::assertEquals('data/shared/', $source->getPrefix());
+ self::assertEquals('file.csv', $source->getFileName());
+ }
}