From 72004273b053cd0d2f3e473ba30a4bbb5cd40726 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Thu, 31 Oct 2024 12:18:37 +0100 Subject: [PATCH] #508 Fix the way descriptions are fetched for JDBC tables. --- .../core/reader/TableReaderJdbcNative.scala | 2 +- .../pramen/core/utils/JdbcSparkUtils.scala | 59 ++++++++++++------- 2 files changed, 38 insertions(+), 23 deletions(-) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbcNative.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbcNative.scala index e9f597b24..d9dc6db38 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbcNative.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbcNative.scala @@ -89,7 +89,7 @@ class TableReaderJdbcNative(jdbcReaderConfig: TableReaderJdbcConfig, } if (jdbcReaderConfig.enableSchemaMetadata) { - JdbcSparkUtils.withJdbcMetadata(jdbcReaderConfig.jdbcConfig, sql) { (connection, _) => + JdbcSparkUtils.withJdbcMetadata(jdbcConfig, sql) { (connection, _) => val schemaWithColumnDescriptions = tableOpt match { case Some(table) => log.info(s"Reading JDBC metadata descriptions the table: $table") diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/JdbcSparkUtils.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/JdbcSparkUtils.scala index b89cfbda5..294ca4459 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/JdbcSparkUtils.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/JdbcSparkUtils.scala @@ -143,30 +143,34 @@ object JdbcSparkUtils { def getColumnMetadata(fullTableName: String, connection: Connection): ResultSet = { val dbMetadata: DatabaseMetaData = connection.getMetaData - if (!dbMetadata.getColumns(null, null, fullTableName, null).next()) { - val parts = fullTableName.split('.') - if (parts.length == 3) { - // database, schema, and table table are all present - dbMetadata.getColumns(parts(0), parts(1), parts(2), null) - } else if (parts.length == 2) { - if (dbMetadata.getColumns(null, parts(0), parts(1), null).next()) { - dbMetadata.getColumns(null, parts(0), parts(1), null) - // schema and table only - } else { - // database and table only - dbMetadata.getColumns(parts(0), null, parts(1), null) - } + val parts = fullTableName.split('.') + if (parts.length == 3) { + // database, schema, and table table are all present + dbMetadata.getColumns(parts(0), parts(1), parts(2), null) + } else if (parts.length == 2) { + val rs = dbMetadata.getColumns(null, parts(0), parts(1), null) + if (rs.isBeforeFirst) { + rs + // schema and table only + } else { + // database and table only + dbMetadata.getColumns(parts(0), null, parts(1), null) + } + } else { + // Table only. + val rs = dbMetadata.getColumns(null, null, fullTableName, null) + + if (rs.isBeforeFirst) { + rs } else { - // Table only. The exact casing was already checked. Checking upper and lower casing in case - // the JDBC driver is case-sensitive, but objects ub db metadata are automatically upper- or lower- cased. - if (dbMetadata.getColumns(null, null, fullTableName.toUpperCase, null).next()) - dbMetadata.getColumns(null, null, fullTableName.toUpperCase, null) + // The exact casing was already checked. Checking upper and lower casing in case + // the JDBC driver is case-sensitive, but objects ub db metadata are automatically upper- or lower- cased (HSQLDB). + val rsUpper = dbMetadata.getColumns(null, null, fullTableName.toUpperCase, null) + if (rsUpper.isBeforeFirst) + rsUpper else dbMetadata.getColumns(null, null, fullTableName.toLowerCase, null) } - } else { - // table only - dbMetadata.getColumns(null, null, fullTableName, null) } } @@ -185,11 +189,22 @@ object JdbcSparkUtils { connection.setAutoCommit(false) + /** If not filtered out, some JDBC drivers will try to receive all data before closing the result set. + * ToDo Fix this properly using SQL generators by adding a generator for schema query. */ + val q = if (nativeQuery.toLowerCase.contains(" where ")) { + nativeQuery + " AND 0=1" + } else { + nativeQuery + " WHERE 0=1" + } + log.info(s"Successfully connected to JDBC URL: $url") + log.info(s"Getting metadata for: $q") try { - withMetadataResultSet(connection, nativeQuery) { rs => - action(connection, rs.getMetaData) + withMetadataResultSet(connection, q) { rs => + val metadata = rs.getMetaData + rs.close() + action(connection, metadata) } } finally { connection.close()