Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge support/1.9 for the v1.9.11 #512

Merged
merged 7 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class TableReaderJdbcNative(jdbcReaderConfig: TableReaderJdbcConfig,
}

if (jdbcReaderConfig.enableSchemaMetadata) {
JdbcSparkUtils.withJdbcMetadata(jdbcReaderConfig.jdbcConfig, sql) { (connection, _) =>
JdbcSparkUtils.withJdbcMetadata(jdbcConfig, sql) { (connection, _) =>
val schemaWithColumnDescriptions = tableOpt match {
case Some(table) =>
log.info(s"Reading JDBC metadata descriptions the table: $table")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,30 +143,34 @@ object JdbcSparkUtils {
def getColumnMetadata(fullTableName: String, connection: Connection): ResultSet = {
val dbMetadata: DatabaseMetaData = connection.getMetaData

if (!dbMetadata.getColumns(null, null, fullTableName, null).next()) {
val parts = fullTableName.split('.')
if (parts.length == 3) {
// database, schema, and table table are all present
dbMetadata.getColumns(parts(0), parts(1), parts(2), null)
} else if (parts.length == 2) {
if (dbMetadata.getColumns(null, parts(0), parts(1), null).next()) {
dbMetadata.getColumns(null, parts(0), parts(1), null)
// schema and table only
} else {
// database and table only
dbMetadata.getColumns(parts(0), null, parts(1), null)
}
val parts = fullTableName.split('.')
if (parts.length == 3) {
// database, schema, and table table are all present
dbMetadata.getColumns(parts(0), parts(1), parts(2), null)
} else if (parts.length == 2) {
val rs = dbMetadata.getColumns(null, parts(0), parts(1), null)
if (rs.isBeforeFirst) {
rs
// schema and table only
} else {
// database and table only
dbMetadata.getColumns(parts(0), null, parts(1), null)
}
} else {
// Table only.
val rs = dbMetadata.getColumns(null, null, fullTableName, null)

if (rs.isBeforeFirst) {
rs
} else {
// Table only. The exact casing was already checked. Checking upper and lower casing in case
// the JDBC driver is case-sensitive, but objects ub db metadata are automatically upper- or lower- cased.
if (dbMetadata.getColumns(null, null, fullTableName.toUpperCase, null).next())
dbMetadata.getColumns(null, null, fullTableName.toUpperCase, null)
// The exact casing was already checked. Checking upper and lower casing in case
// the JDBC driver is case-sensitive, but objects ub db metadata are automatically upper- or lower- cased (HSQLDB).
val rsUpper = dbMetadata.getColumns(null, null, fullTableName.toUpperCase, null)
if (rsUpper.isBeforeFirst)
rsUpper
else
dbMetadata.getColumns(null, null, fullTableName.toLowerCase, null)
}
} else {
// table only
dbMetadata.getColumns(null, null, fullTableName, null)
}
}

Expand All @@ -185,10 +189,19 @@ object JdbcSparkUtils {

connection.setAutoCommit(false)

/** If not filtered out, some JDBC drivers will try to receive all data before closing the result set.
* ToDo Fix this properly using SQL generators by adding a generator for schema query. */
val q = if (nativeQuery.toLowerCase.contains(" where ")) {
nativeQuery + " AND 0=1"
} else {
nativeQuery + " WHERE 0=1"
}

log.info(s"Successfully connected to JDBC URL: $url")
log.info(s"Getting metadata for: $q")

try {
withMetadataResultSet(connection, nativeQuery) { rs =>
withMetadataResultSet(connection, q) { rs =>
action(connection, rs.getMetaData)
}
} finally {
Expand Down
Loading