From b76af78e779f1ad7671ce499a2fb37abf18526d8 Mon Sep 17 00:00:00 2001 From: Ryan Syed Date: Wed, 13 Dec 2023 17:49:07 -0800 Subject: [PATCH] fix: Reduced SQL calls in GetObjects to two, added prefixing DbName for INFORMATION_SCHEMA calls, and removed SQL cursor code Additional changes: * Reduced SQL calls by making only 1 - 2 SQL call based on the ObjectsDepth and using that data to populate all the previous depth information * There is one SHOW TERSE DATABASES that is always called and the catalogPattern is used to filter the databases and prepare the SQL based on the depth for schema, tables, or columns * The SQL cursor code was removed and the replaced with a static SQL that is prepared in Go based on the databases that match the catalogPattern * GetObjects populates the MetadataRecords by making the necessary SQL call based on ObjectsDepth * Modified the logic of GetObjects Init to pass MetadataRecords in getObjectsDbSchemas and getObjectsTables * Modified tests to check the table type returned --- .../Drivers/Interop/Snowflake/DriverTests.cs | 18 +- .../driver/flightsql/flightsql_connection.go | 4 +- go/adbc/driver/internal/shared_utils.go | 21 +- go/adbc/driver/snowflake/connection.go | 664 +++++++++++++----- 4 files changed, 505 insertions(+), 202 deletions(-) diff --git a/csharp/test/Drivers/Interop/Snowflake/DriverTests.cs b/csharp/test/Drivers/Interop/Snowflake/DriverTests.cs index ea3574fca2..c60218d5f1 100644 --- a/csharp/test/Drivers/Interop/Snowflake/DriverTests.cs +++ b/csharp/test/Drivers/Interop/Snowflake/DriverTests.cs @@ -82,12 +82,6 @@ public DriverTests() Dictionary options = new Dictionary(); _snowflakeDriver = SnowflakeTestingUtils.GetSnowflakeAdbcDriver(_testConfiguration, out parameters); - string databaseName = _testConfiguration.Metadata.Catalog; - string schemaName = _testConfiguration.Metadata.Schema; - - parameters[SnowflakeParameters.DATABASE] = databaseName; - parameters[SnowflakeParameters.SCHEMA] = schemaName; - _database = _snowflakeDriver.Open(parameters); _connection = _database.Connect(options); } @@ -214,7 +208,7 @@ public void CanGetObjectsTables(string tableNamePattern) string tableName = _testConfiguration.Metadata.Table; using IArrowArrayStream stream = _connection.GetObjects( - depth: AdbcConnection.GetObjectsDepth.All, + depth: AdbcConnection.GetObjectsDepth.Tables, catalogPattern: databaseName, dbSchemaPattern: schemaName, tableNamePattern: tableNamePattern, @@ -235,6 +229,7 @@ public void CanGetObjectsTables(string tableNamePattern) AdbcTable table = tables.Where((table) => string.Equals(table.Name, tableName)).FirstOrDefault(); Assert.True(table != null, "table should not be null"); + Assert.Equal("BASE TABLE", table.Type); } /// @@ -260,8 +255,7 @@ public void CanGetObjectsAll() using RecordBatch recordBatch = stream.ReadNextRecordBatchAsync().Result; List catalogs = GetObjectsParser.ParseCatalog(recordBatch, databaseName, schemaName); - - List columns = catalogs + AdbcTable table = catalogs .Where(c => string.Equals(c.Name, databaseName)) .Select(c => c.DbSchemas) .FirstOrDefault() @@ -269,9 +263,13 @@ public void CanGetObjectsAll() .Select(s => s.Tables) .FirstOrDefault() .Where(t => string.Equals(t.Name, tableName)) - .Select(t => t.Columns) .FirstOrDefault(); + + Assert.True(table != null, "table should not be null"); + Assert.Equal("BASE TABLE", table.Type); + List columns = table.Columns; + Assert.True(columns != null, "Columns cannot be null"); Assert.Equal(_testConfiguration.Metadata.ExpectedColumnCount, columns.Count); diff --git a/go/adbc/driver/flightsql/flightsql_connection.go b/go/adbc/driver/flightsql/flightsql_connection.go index a541bfeb21..ee24b75ee2 100644 --- a/go/adbc/driver/flightsql/flightsql_connection.go +++ b/go/adbc/driver/flightsql/flightsql_connection.go @@ -547,7 +547,7 @@ func (c *cnxn) readInfo(ctx context.Context, expectedSchema *arrow.Schema, info } // Helper function to build up a map of catalogs to DB schemas -func (c *cnxn) getObjectsDbSchemas(ctx context.Context, depth adbc.ObjectDepth, catalog *string, dbSchema *string) (result map[string][]string, err error) { +func (c *cnxn) getObjectsDbSchemas(ctx context.Context, depth adbc.ObjectDepth, catalog *string, dbSchema *string, metadataRecords []internal.Metadata) (result map[string][]string, err error) { if depth == adbc.ObjectDepthCatalogs { return } @@ -588,7 +588,7 @@ func (c *cnxn) getObjectsDbSchemas(ctx context.Context, depth adbc.ObjectDepth, return } -func (c *cnxn) getObjectsTables(ctx context.Context, depth adbc.ObjectDepth, catalog *string, dbSchema *string, tableName *string, columnName *string, tableType []string) (result internal.SchemaToTableInfo, err error) { +func (c *cnxn) getObjectsTables(ctx context.Context, depth adbc.ObjectDepth, catalog *string, dbSchema *string, tableName *string, columnName *string, tableType []string, metadataRecords []internal.Metadata) (result internal.SchemaToTableInfo, err error) { if depth == adbc.ObjectDepthCatalogs || depth == adbc.ObjectDepthDBSchemas { return } diff --git a/go/adbc/driver/internal/shared_utils.go b/go/adbc/driver/internal/shared_utils.go index a6b285f020..3a5791192c 100644 --- a/go/adbc/driver/internal/shared_utils.go +++ b/go/adbc/driver/internal/shared_utils.go @@ -19,9 +19,11 @@ package internal import ( "context" + "database/sql" "regexp" "strconv" "strings" + "time" "github.com/apache/arrow-adbc/go/adbc" "github.com/apache/arrow/go/v14/arrow" @@ -38,8 +40,18 @@ type TableInfo struct { Schema *arrow.Schema } -type GetObjDBSchemasFn func(ctx context.Context, depth adbc.ObjectDepth, catalog *string, schema *string) (map[string][]string, error) -type GetObjTablesFn func(ctx context.Context, depth adbc.ObjectDepth, catalog *string, schema *string, tableName *string, columnName *string, tableType []string) (map[CatalogAndSchema][]TableInfo, error) +type Metadata struct { + Created time.Time + ColName, DataType string + Dbname, Kind, Schema, TblName, TblType, IdentGen, IdentIncrement, Comment sql.NullString + OrdinalPos int + NumericPrec, NumericPrecRadix, NumericScale, DatetimePrec sql.NullInt16 + IsNullable, IsIdent bool + CharMaxLength, CharOctetLength sql.NullInt32 +} + +type GetObjDBSchemasFn func(ctx context.Context, depth adbc.ObjectDepth, catalog *string, schema *string, metadataRecords []Metadata) (map[string][]string, error) +type GetObjTablesFn func(ctx context.Context, depth adbc.ObjectDepth, catalog *string, schema *string, tableName *string, columnName *string, tableType []string, metadataRecords []Metadata) (map[CatalogAndSchema][]TableInfo, error) type SchemaToTableInfo = map[CatalogAndSchema][]TableInfo // Helper function that compiles a SQL-style pattern (%, _) to a regex @@ -87,6 +99,7 @@ type GetObjects struct { builder *array.RecordBuilder schemaLookup map[string][]string tableLookup map[CatalogAndSchema][]TableInfo + MetadataRecords []Metadata catalogPattern *regexp.Regexp columnNamePattern *regexp.Regexp @@ -123,13 +136,13 @@ type GetObjects struct { } func (g *GetObjects) Init(mem memory.Allocator, getObj GetObjDBSchemasFn, getTbls GetObjTablesFn) error { - if catalogToDbSchemas, err := getObj(g.Ctx, g.Depth, g.Catalog, g.DbSchema); err != nil { + if catalogToDbSchemas, err := getObj(g.Ctx, g.Depth, g.Catalog, g.DbSchema, g.MetadataRecords); err != nil { return err } else { g.schemaLookup = catalogToDbSchemas } - if tableLookup, err := getTbls(g.Ctx, g.Depth, g.Catalog, g.DbSchema, g.TableName, g.ColumnName, g.TableType); err != nil { + if tableLookup, err := getTbls(g.Ctx, g.Depth, g.Catalog, g.DbSchema, g.TableName, g.ColumnName, g.TableType, g.MetadataRecords); err != nil { return err } else { g.tableLookup = tableLookup diff --git a/go/adbc/driver/snowflake/connection.go b/go/adbc/driver/snowflake/connection.go index e96799933f..1a12e8175d 100644 --- a/go/adbc/driver/snowflake/connection.go +++ b/go/adbc/driver/snowflake/connection.go @@ -23,6 +23,7 @@ import ( "database/sql/driver" "fmt" "io" + "regexp" "strconv" "strings" "time" @@ -238,84 +239,59 @@ func (c *cnxn) GetInfo(ctx context.Context, infoCodes []adbc.InfoCode) (array.Re // All non-empty, non-nil strings should be a search pattern (as described // earlier). func (c *cnxn) GetObjects(ctx context.Context, depth adbc.ObjectDepth, catalog *string, dbSchema *string, tableName *string, columnName *string, tableType []string) (array.RecordReader, error) { - g := internal.GetObjects{Ctx: ctx, Depth: depth, Catalog: catalog, DbSchema: dbSchema, TableName: tableName, ColumnName: columnName, TableType: tableType} - if err := g.Init(c.db.Alloc, c.getObjectsDbSchemas, c.getObjectsTables); err != nil { + metadataRecords, err := c.populateMetadata(ctx, depth, catalog, dbSchema, tableName, columnName, tableType) + if err != nil { return nil, err } - defer g.Release() - rows, err := c.sqldb.QueryContext(ctx, "SHOW TERSE DATABASES", nil) - if err != nil { + g := internal.GetObjects{Ctx: ctx, Depth: depth, Catalog: catalog, DbSchema: dbSchema, TableName: tableName, ColumnName: columnName, TableType: tableType} + g.MetadataRecords = metadataRecords + if err := g.Init(c.db.Alloc, c.getObjectsDbSchemas, c.getObjectsTables); err != nil { return nil, err } - defer rows.Close() - - var ( - created time.Time - name string - kind, dbname, schema sql.NullString - ) - for rows.Next() { - if err := rows.Scan(&created, &name, &kind, &dbname, &schema); err != nil { - return nil, errToAdbcErr(adbc.StatusInvalidData, err) - } + defer g.Release() - // SNOWFLAKE catalog contains functions and no tables - if name == "SNOWFLAKE" { + uniqueCatalogs := make(map[string]bool) + for _, data := range metadataRecords { + if !data.Dbname.Valid { continue } - // schema for SHOW TERSE DATABASES is: - // created_on:timestamp, name:text, kind:null, database_name:null, schema_name:null - // the last three columns are always null because they are not applicable for databases - // so we want values[1].(string) for the name - g.AppendCatalog(name) + if _, exists := uniqueCatalogs[data.Dbname.String]; !exists { + uniqueCatalogs[data.Dbname.String] = true + g.AppendCatalog(data.Dbname.String) + } } return g.Finish() } -func (c *cnxn) getObjectsDbSchemas(ctx context.Context, depth adbc.ObjectDepth, catalog *string, dbSchema *string) (result map[string][]string, err error) { +func (c *cnxn) getObjectsDbSchemas(ctx context.Context, depth adbc.ObjectDepth, catalog *string, dbSchema *string, metadataRecords []internal.Metadata) (result map[string][]string, err error) { if depth == adbc.ObjectDepthCatalogs { return } - conditions := make([]string, 0) - if catalog != nil && *catalog != "" { - conditions = append(conditions, ` CATALOG_NAME ILIKE '`+*catalog+`'`) - } - if dbSchema != nil && *dbSchema != "" { - conditions = append(conditions, ` SCHEMA_NAME ILIKE '`+*dbSchema+`'`) - } - - cond := strings.Join(conditions, " AND ") - result = make(map[string][]string) + uniqueCatalogSchema := make(map[string]map[string]bool) - query := `SELECT CATALOG_NAME, SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA` - if cond != "" { - query += " WHERE " + cond - } - var rows *sql.Rows - rows, err = c.sqldb.QueryContext(ctx, query) - if err != nil { - err = errToAdbcErr(adbc.StatusIO, err) - return - } - defer rows.Close() + for _, data := range metadataRecords { + if !data.Dbname.Valid || !data.Schema.Valid { + continue + } - var catalogName, schemaName string - for rows.Next() { - if err = rows.Scan(&catalogName, &schemaName); err != nil { - err = errToAdbcErr(adbc.StatusIO, err) - return + if _, exists := uniqueCatalogSchema[data.Dbname.String]; !exists { + uniqueCatalogSchema[data.Dbname.String] = make(map[string]bool) } - cat, ok := result[catalogName] - if !ok { + cat, exists := result[data.Dbname.String] + if !exists { cat = make([]string, 0, 1) } - result[catalogName] = append(cat, schemaName) + + if _, exists := uniqueCatalogSchema[data.Dbname.String][data.Schema.String]; !exists { + result[data.Dbname.String] = append(cat, data.Schema.String) + uniqueCatalogSchema[data.Dbname.String][data.Schema.String] = true + } } return @@ -477,7 +453,7 @@ func toXdbcDataType(dt arrow.DataType) (xdbcType internal.XdbcDataType) { } } -func (c *cnxn) getObjectsTables(ctx context.Context, depth adbc.ObjectDepth, catalog *string, dbSchema *string, tableName *string, columnName *string, tableType []string) (result internal.SchemaToTableInfo, err error) { +func (c *cnxn) getObjectsTables(ctx context.Context, depth adbc.ObjectDepth, catalog *string, dbSchema *string, tableName *string, columnName *string, tableType []string, metadataRecords []internal.Metadata) (result internal.SchemaToTableInfo, err error) { if depth == adbc.ObjectDepthCatalogs || depth == adbc.ObjectDepthDBSchemas { return } @@ -485,152 +461,45 @@ func (c *cnxn) getObjectsTables(ctx context.Context, depth adbc.ObjectDepth, cat result = make(internal.SchemaToTableInfo) includeSchema := depth == adbc.ObjectDepthAll || depth == adbc.ObjectDepthColumns - conditions := make([]string, 0) - if catalog != nil && *catalog != "" { - conditions = append(conditions, ` TABLE_CATALOG ILIKE '`+*catalog+`'`) - } - if dbSchema != nil && *dbSchema != "" { - conditions = append(conditions, ` TABLE_SCHEMA ILIKE '`+*dbSchema+`'`) - } - if tableName != nil && *tableName != "" { - conditions = append(conditions, ` TABLE_NAME ILIKE '`+*tableName+`'`) - } - - // first populate the tables and table types - var rows *sql.Rows - var tblConditions []string - if len(tableType) > 0 { - tblConditions = append(conditions, ` TABLE_TYPE IN ('`+strings.Join(tableType, `','`)+`')`) - } else { - tblConditions = conditions - } + uniqueCatalogSchemaTable := make(map[string]map[string]map[string]bool) + for _, data := range metadataRecords { + if !data.Dbname.Valid || !data.Schema.Valid || !data.TblName.Valid || !data.TblType.Valid { + continue + } - cond := strings.Join(tblConditions, " AND ") - query := "SELECT table_catalog, table_schema, table_name, table_type FROM INFORMATION_SCHEMA.TABLES" - if cond != "" { - query += " WHERE " + cond - } - rows, err = c.sqldb.QueryContext(ctx, query) - if err != nil { - err = errToAdbcErr(adbc.StatusIO, err) - return - } - defer rows.Close() + if _, exists := uniqueCatalogSchemaTable[data.Dbname.String]; !exists { + uniqueCatalogSchemaTable[data.Dbname.String] = make(map[string]map[string]bool) + } - var tblCat, tblSchema, tblName string - var tblType sql.NullString - for rows.Next() { - if err = rows.Scan(&tblCat, &tblSchema, &tblName, &tblType); err != nil { - err = errToAdbcErr(adbc.StatusIO, err) - return + if _, exists := uniqueCatalogSchemaTable[data.Dbname.String][data.Schema.String]; !exists { + uniqueCatalogSchemaTable[data.Dbname.String][data.Schema.String] = make(map[string]bool) } - key := internal.CatalogAndSchema{ - Catalog: tblCat, Schema: tblSchema} + if _, exists := uniqueCatalogSchemaTable[data.Dbname.String][data.Schema.String][data.TblName.String]; !exists { + uniqueCatalogSchemaTable[data.Dbname.String][data.Schema.String][data.TblName.String] = true - result[key] = append(result[key], internal.TableInfo{ - Name: tblName, TableType: tblType.String}) - } + key := internal.CatalogAndSchema{ + Catalog: data.Dbname.String, Schema: data.Schema.String} - if includeSchema { - conditions := make([]string, 0) - if catalog != nil && *catalog != "" { - conditions = append(conditions, ` TABLE_CATALOG ILIKE \'`+*catalog+`\'`) - } - if dbSchema != nil && *dbSchema != "" { - conditions = append(conditions, ` TABLE_SCHEMA ILIKE \'`+*dbSchema+`\'`) - } - if tableName != nil && *tableName != "" { - conditions = append(conditions, ` TABLE_NAME ILIKE \'`+*tableName+`\'`) - } - // if we need to include the schemas of the tables, make another fetch - // to fetch the columns and column info - if columnName != nil && *columnName != "" { - conditions = append(conditions, ` column_name ILIKE \'`+*columnName+`\'`) - } - cond = strings.Join(conditions, " AND ") - if cond != "" { - cond = " WHERE " + cond - } - cond = `statement := 'SELECT * FROM (' || statement || ')` + cond + - ` ORDER BY table_catalog, table_schema, table_name, ordinal_position';` - - var queryPrefix = `DECLARE - c1 CURSOR FOR SELECT DATABASE_NAME FROM INFORMATION_SCHEMA.DATABASES; - res RESULTSET; - counter INTEGER DEFAULT 0; - statement VARCHAR DEFAULT ''; - BEGIN - FOR rec IN c1 DO - IF (counter > 0) THEN - statement := statement || ' UNION ALL '; - END IF; - ` - - const getSchema = `statement := statement || - ' SELECT - table_catalog, table_schema, table_name, column_name, - ordinal_position, is_nullable::boolean, data_type, numeric_precision, - numeric_precision_radix, numeric_scale, is_identity::boolean, - identity_generation, identity_increment, - character_maximum_length, character_octet_length, datetime_precision, comment - FROM ' || rec.database_name || '.INFORMATION_SCHEMA.COLUMNS'; - - counter := counter + 1; - END FOR; - ` - - const querySuffix = ` - res := (EXECUTE IMMEDIATE :statement); - RETURN TABLE (res); - END;` - - if catalog != nil && *catalog != "" { - queryPrefix = `DECLARE - c1 CURSOR FOR SELECT DATABASE_NAME FROM INFORMATION_SCHEMA.DATABASES WHERE DATABASE_NAME ILIKE '` + *catalog + `';` + - `res RESULTSET; - counter INTEGER DEFAULT 0; - statement VARCHAR DEFAULT ''; - BEGIN - FOR rec IN c1 DO - IF (counter > 0) THEN - statement := statement || ' UNION ALL '; - END IF; - ` + result[key] = append(result[key], internal.TableInfo{ + Name: data.TblName.String, TableType: data.TblType.String}) } - query = queryPrefix + getSchema + cond + querySuffix - rows, err = c.sqldb.QueryContext(ctx, query) - if err != nil { - return - } - defer rows.Close() + } + if includeSchema { var ( - colName, dataType string - identGen, identIncrement, comment sql.NullString - ordinalPos int - numericPrec, numericPrecRadix, numericScale, datetimePrec sql.NullInt16 - isNullable, isIdent bool - charMaxLength, charOctetLength sql.NullInt32 - prevKey internal.CatalogAndSchema curTableInfo *internal.TableInfo fieldList = make([]arrow.Field, 0) ) - for rows.Next() { - // order here matches the order of the columns requested in the query - err = rows.Scan(&tblCat, &tblSchema, &tblName, &colName, - &ordinalPos, &isNullable, &dataType, &numericPrec, - &numericPrecRadix, &numericScale, &isIdent, &identGen, - &identIncrement, &charMaxLength, &charOctetLength, &datetimePrec, &comment) - if err != nil { - err = errToAdbcErr(adbc.StatusIO, err) - return + for _, data := range metadataRecords { + if !data.Dbname.Valid || !data.Schema.Valid || !data.TblName.Valid { + continue } - key := internal.CatalogAndSchema{Catalog: tblCat, Schema: tblSchema} - if prevKey != key || (curTableInfo != nil && curTableInfo.Name != tblName) { + key := internal.CatalogAndSchema{Catalog: data.Dbname.String, Schema: data.Schema.String} + if prevKey != key || (curTableInfo != nil && curTableInfo.Name != data.TblName.String) { if len(fieldList) > 0 && curTableInfo != nil { curTableInfo.Schema = arrow.NewSchema(fieldList, nil) fieldList = fieldList[:0] @@ -638,7 +507,7 @@ func (c *cnxn) getObjectsTables(ctx context.Context, depth adbc.ObjectDepth, cat info := result[key] for i := range info { - if info[i].Name == tblName { + if info[i].Name == data.TblName.String { curTableInfo = &info[i] break } @@ -646,7 +515,7 @@ func (c *cnxn) getObjectsTables(ctx context.Context, depth adbc.ObjectDepth, cat } prevKey = key - fieldList = append(fieldList, toField(colName, isNullable, dataType, numericPrec, numericPrecRadix, numericScale, isIdent, c.useHighPrecision, identGen, identIncrement, charMaxLength, charOctetLength, datetimePrec, comment, ordinalPos)) + fieldList = append(fieldList, toField(data.ColName, data.IsNullable, data.DataType, data.NumericPrec, data.NumericPrecRadix, data.NumericScale, data.IsIdent, c.useHighPrecision, data.IdentGen, data.IdentIncrement, data.CharMaxLength, data.CharOctetLength, data.DatetimePrec, data.Comment, data.OrdinalPos)) } if len(fieldList) > 0 && curTableInfo != nil { @@ -656,6 +525,429 @@ func (c *cnxn) getObjectsTables(ctx context.Context, depth adbc.ObjectDepth, cat return } +func (c *cnxn) populateMetadata(ctx context.Context, depth adbc.ObjectDepth, catalog *string, dbSchema *string, tableName *string, columnName *string, tableType []string) ([]internal.Metadata, error) { + var metadataRecords []internal.Metadata + var catalogMetadataRecords []internal.Metadata + + rows, err := c.sqldb.QueryContext(ctx, prepareCatalogsSQL(), nil) + if err != nil { + return nil, err + } + + for rows.Next() { + var data internal.Metadata + var skipDbNullField, skipSchemaNullField sql.NullString + // schema for SHOW TERSE DATABASES is: + // created_on:timestamp, name:text, kind:null, database_name:null, schema_name:null + // the last three columns are always null because they are not applicable for databases + // so we want values[1].(string) for the name + if err := rows.Scan(&data.Created, &data.Dbname, &data.Kind, &skipDbNullField, &skipSchemaNullField); err != nil { + return nil, errToAdbcErr(adbc.StatusInvalidData, err) + } + + // SNOWFLAKE catalog contains functions and no tables + if data.Dbname.Valid && data.Dbname.String == "SNOWFLAKE" { + continue + } + + if depth == adbc.ObjectDepthCatalogs { + metadataRecords = append(metadataRecords, data) + } else { + catalogMetadataRecords = append(catalogMetadataRecords, data) + } + } + + if depth == adbc.ObjectDepthCatalogs { + return metadataRecords, nil + } + + catalogNames, err := getCatalogNames(catalogMetadataRecords, catalog) + if err != nil { + return nil, errToAdbcErr(adbc.StatusInvalidData, err) + } + + if depth == adbc.ObjectDepthDBSchemas { + query := prepareDbSchemasSQL(catalogNames, catalog, dbSchema) + rows, err := c.sqldb.QueryContext(ctx, query) + if err != nil { + return nil, errToAdbcErr(adbc.StatusIO, err) + } + defer rows.Close() + + for rows.Next() { + var data internal.Metadata + if err = rows.Scan(&data.Dbname, &data.Schema); err != nil { + return nil, errToAdbcErr(adbc.StatusIO, err) + } + metadataRecords = append(metadataRecords, data) + } + } else if depth == adbc.ObjectDepthTables { + query := prepareTablesSQL(catalogNames, catalog, dbSchema, tableName, tableType) + rows, err := c.sqldb.QueryContext(ctx, query) + if err != nil { + return nil, errToAdbcErr(adbc.StatusIO, err) + } + defer rows.Close() + + for rows.Next() { + var data internal.Metadata + if err = rows.Scan(&data.Dbname, &data.Schema, &data.TblName, &data.TblType); err != nil { + return nil, errToAdbcErr(adbc.StatusIO, err) + } + metadataRecords = append(metadataRecords, data) + } + } else { + query := prepareColumnsSQL(catalogNames, catalog, dbSchema, tableName, columnName) + rows, err := c.sqldb.QueryContext(ctx, query) + if err != nil { + return nil, errToAdbcErr(adbc.StatusIO, err) + } + defer rows.Close() + + var data internal.Metadata + + for rows.Next() { + // order here matches the order of the columns requested in the query + err = rows.Scan(&data.TblType, &data.Dbname, &data.Schema, &data.TblName, &data.ColName, + &data.OrdinalPos, &data.IsNullable, &data.DataType, &data.NumericPrec, + &data.NumericPrecRadix, &data.NumericScale, &data.IsIdent, &data.IdentGen, + &data.IdentIncrement, &data.CharMaxLength, &data.CharOctetLength, &data.DatetimePrec, &data.Comment) + if err != nil { + return nil, errToAdbcErr(adbc.StatusIO, err) + } + metadataRecords = append(metadataRecords, data) + } + } + + return metadataRecords, nil +} + +func getCatalogNames(metadataRecords []internal.Metadata, catalog *string) ([]string, error) { + catalogNames := make([]string, 0) + var catalogPattern *regexp.Regexp + var err error + if catalogPattern, err = internal.PatternToRegexp(catalog); err != nil { + return nil, adbc.Error{ + Msg: err.Error(), + Code: adbc.StatusInvalidArgument, + } + } + + for _, data := range metadataRecords { + if data.Dbname.Valid && data.Dbname.String == "SNOWFLAKE" { + continue + } + if catalogPattern != nil && !catalogPattern.MatchString(data.Dbname.String) { + continue + } + + catalogNames = append(catalogNames, data.Dbname.String) + } + return catalogNames, nil +} + +func prepareCatalogsSQL() string { + return "SHOW TERSE DATABASES" +} + +// func prepareDbSchemasSQL(catalog *string, dbSchema *string) string { +// conditions := make([]string, 0) +// if catalog != nil && *catalog != "" { +// conditions = append(conditions, ` CATALOG_NAME ILIKE \'`+*catalog+`\'`) +// } +// if dbSchema != nil && *dbSchema != "" { +// conditions = append(conditions, ` SCHEMA_NAME ILIKE \'`+*dbSchema+`\'`) +// } + +// cond := strings.Join(conditions, " AND ") +// if cond != "" { +// cond = `statement := 'SELECT * FROM (' || statement || ') WHERE ` + cond + `';` +// } + +// queryPrefix := `DECLARE +// c1 CURSOR FOR SELECT DATABASE_NAME FROM INFORMATION_SCHEMA.DATABASES; +// res RESULTSET; +// counter INTEGER DEFAULT 0; +// statement VARCHAR DEFAULT ''; +// BEGIN +// FOR rec IN c1 DO +// IF (counter > 0) THEN +// statement := statement || ' UNION ALL '; +// END IF; +// statement := statement || ' SELECT CATALOG_NAME, SCHEMA_NAME FROM "' || rec.database_name || '".INFORMATION_SCHEMA.SCHEMATA'; +// counter := counter + 1; +// END FOR; +// ` +// if catalog != nil && *catalog != "" { +// queryPrefix = `DECLARE +// c1 CURSOR FOR SELECT DATABASE_NAME FROM INFORMATION_SCHEMA.DATABASES WHERE DATABASE_NAME ILIKE '` + *catalog + `';` + +// `res RESULTSET; +// counter INTEGER DEFAULT 0; +// statement VARCHAR DEFAULT ''; +// BEGIN +// FOR rec IN c1 DO +// IF (counter > 0) THEN +// statement := statement || ' UNION ALL '; +// END IF; +// statement := statement || ' SELECT CATALOG_NAME, SCHEMA_NAME FROM "' || rec.database_name || '".INFORMATION_SCHEMA.SCHEMATA'; +// counter := counter + 1; +// END FOR; +// ` +// } +// const querySuffix = ` +// res := (EXECUTE IMMEDIATE :statement); +// RETURN TABLE (res); +// END;` + +// query := queryPrefix + cond + querySuffix +// return query +// } + +// func prepareTablesSQL(catalog *string, dbSchema *string, tableName *string, tableType []string) string { +// conditions := make([]string, 0) +// if catalog != nil && *catalog != "" { +// conditions = append(conditions, ` TABLE_CATALOG ILIKE \'`+*catalog+`\'`) +// } +// if dbSchema != nil && *dbSchema != "" { +// conditions = append(conditions, ` TABLE_SCHEMA ILIKE \'`+*dbSchema+`\'`) +// } +// if tableName != nil && *tableName != "" { +// conditions = append(conditions, ` TABLE_NAME ILIKE \'`+*tableName+`\'`) +// } + +// // first populate the tables and table types +// var tblConditions []string +// if len(tableType) > 0 { +// tblConditions = append(conditions, ` TABLE_TYPE IN (\'`+strings.Join(tableType, `\',\'`)+`\')`) +// } else { +// tblConditions = conditions +// } + +// queryPrefix := `DECLARE +// c1 CURSOR FOR SELECT DATABASE_NAME FROM INFORMATION_SCHEMA.DATABASES; +// res RESULTSET; +// counter INTEGER DEFAULT 0; +// statement VARCHAR DEFAULT ''; +// BEGIN +// FOR rec IN c1 DO +// IF (counter > 0) THEN +// statement := statement || ' UNION ALL '; +// END IF; +// ` +// if catalog != nil && *catalog != "" { +// queryPrefix = `DECLARE +// c1 CURSOR FOR SELECT DATABASE_NAME FROM INFORMATION_SCHEMA.DATABASES WHERE DATABASE_NAME ILIKE '` + *catalog + `';` + +// `res RESULTSET; +// counter INTEGER DEFAULT 0; +// statement VARCHAR DEFAULT ''; +// BEGIN +// FOR rec IN c1 DO +// IF (counter > 0) THEN +// statement := statement || ' UNION ALL '; +// END IF; +// ` +// } +// const noSchema = `statement := statement || ' SELECT table_catalog, table_schema, table_name, table_type FROM "' || rec.database_name || '".INFORMATION_SCHEMA.TABLES'; +// counter := counter + 1; +// END FOR; +// ` +// const querySuffix = ` +// res := (EXECUTE IMMEDIATE :statement); +// RETURN TABLE (res); +// END;` + +// cond := strings.Join(tblConditions, " AND ") +// if cond != "" { +// cond = `statement := 'SELECT * FROM (' || statement || ') WHERE ` + cond + `';` +// } +// query := queryPrefix + noSchema + cond + querySuffix +// return query +// } + +func prepareDbSchemasSQL(catalogNames []string, catalog *string, dbSchema *string) string { + conditions := make([]string, 0) + if catalog != nil && *catalog != "" { + conditions = append(conditions, ` CATALOG_NAME ILIKE '`+*catalog+`'`) + } + if dbSchema != nil && *dbSchema != "" { + conditions = append(conditions, ` SCHEMA_NAME ILIKE '`+*dbSchema+`'`) + } + + cond := strings.Join(conditions, " AND ") + + query := "" + for _, catalog_name := range catalogNames { + if query != "" { + query += " UNION ALL " + } + query += `SELECT CATALOG_NAME, SCHEMA_NAME FROM "` + catalog_name + `".INFORMATION_SCHEMA.SCHEMATA` + } + + query = `SELECT * FROM (` + query + `)` + if cond != "" { + query += " WHERE " + cond + } + + return query +} + +func prepareTablesSQL(catalogNames []string, catalog *string, dbSchema *string, tableName *string, tableType []string) string { + conditions := make([]string, 0) + if catalog != nil && *catalog != "" { + conditions = append(conditions, ` TABLE_CATALOG ILIKE '`+*catalog+`'`) + } + if dbSchema != nil && *dbSchema != "" { + conditions = append(conditions, ` TABLE_SCHEMA ILIKE '`+*dbSchema+`'`) + } + if tableName != nil && *tableName != "" { + conditions = append(conditions, ` TABLE_NAME ILIKE '`+*tableName+`'`) + } + + var tblConditions []string + if len(tableType) > 0 { + tblConditions = append(conditions, ` TABLE_TYPE IN ('`+strings.Join(tableType, `','`)+`')`) + } else { + tblConditions = conditions + } + + cond := strings.Join(tblConditions, " AND ") + query := "" + for _, catalog_name := range catalogNames { + if query != "" { + query += " UNION ALL " + } + query += `SELECT table_catalog, table_schema, table_name, table_type FROM "` + catalog_name + `".INFORMATION_SCHEMA.TABLES` + } + + query = `SELECT * FROM (` + query + `)` + if cond != "" { + query += " WHERE " + cond + } + return query +} + +func prepareColumnsSQL(catalogNames []string, catalog *string, dbSchema *string, tableName *string, columnName *string) string { + conditions := make([]string, 0) + if catalog != nil && *catalog != "" { + conditions = append(conditions, ` TABLE_CATALOG ILIKE '`+*catalog+`'`) + } + if dbSchema != nil && *dbSchema != "" { + conditions = append(conditions, ` TABLE_SCHEMA ILIKE '`+*dbSchema+`'`) + } + if tableName != nil && *tableName != "" { + conditions = append(conditions, ` TABLE_NAME ILIKE '`+*tableName+`'`) + } + if columnName != nil && *columnName != "" { + conditions = append(conditions, ` column_name ILIKE '`+*columnName+`'`) + } + cond := strings.Join(conditions, " AND ") + if cond != "" { + cond = " WHERE " + cond + } + + prefixQuery := "" + for _, catalog_name := range catalogNames { + if prefixQuery != "" { + prefixQuery += " UNION ALL " + } + prefixQuery += `SELECT T.table_type, + C.table_catalog, C.table_schema, C.table_name, C.column_name, + C.ordinal_position, C.is_nullable::boolean, C.data_type, C.numeric_precision, + C.numeric_precision_radix, C.numeric_scale, C.is_identity::boolean, + C.identity_generation, C.identity_increment, + C.character_maximum_length, C.character_octet_length, C.datetime_precision, C.comment + FROM + "` + catalog_name + `".INFORMATION_SCHEMA.TABLES AS T + JOIN + "` + catalog_name + `".INFORMATION_SCHEMA.COLUMNS AS C + ON + T.table_catalog = C.table_catalog + AND T.table_schema = C.table_schema + AND t.table_name = C.table_name` + } + + prefixQuery = `SELECT * FROM (` + prefixQuery + `)` + ordering := ` ORDER BY table_catalog, table_schema, table_name, ordinal_position` + query := prefixQuery + cond + ordering + return query +} + +// func prepareColumnsSQL(catalog *string, dbSchema *string, tableName *string, columnName *string) string { +// conditions := make([]string, 0) +// if catalog != nil && *catalog != "" { +// conditions = append(conditions, ` TABLE_CATALOG ILIKE \'`+*catalog+`\'`) +// } +// if dbSchema != nil && *dbSchema != "" { +// conditions = append(conditions, ` TABLE_SCHEMA ILIKE \'`+*dbSchema+`\'`) +// } +// if tableName != nil && *tableName != "" { +// conditions = append(conditions, ` TABLE_NAME ILIKE \'`+*tableName+`\'`) +// } + +// if columnName != nil && *columnName != "" { +// conditions = append(conditions, ` column_name ILIKE \'`+*columnName+`\'`) +// } +// cond := strings.Join(conditions, " AND ") +// if cond != "" { +// cond = " WHERE " + cond +// } +// cond = `statement := 'SELECT * FROM (' || statement || ')` + cond + +// ` ORDER BY table_catalog, table_schema, table_name, ordinal_position';` + +// var queryPrefix = `DECLARE +// c1 CURSOR FOR SELECT DATABASE_NAME FROM INFORMATION_SCHEMA.DATABASES; +// res RESULTSET; +// counter INTEGER DEFAULT 0; +// statement VARCHAR DEFAULT ''; +// BEGIN +// FOR rec IN c1 DO +// IF (counter > 0) THEN +// statement := statement || ' UNION ALL '; +// END IF; +// ` + +// const getSchema = `statement := statement || +// 'SELECT T.table_type, +// C.table_catalog, C.table_schema, C.table_name, C.column_name, +// C.ordinal_position, C.is_nullable::boolean, C.data_type, C.numeric_precision, +// C.numeric_precision_radix, C.numeric_scale, C.is_identity::boolean, +// C.identity_generation, C.identity_increment, +// C.character_maximum_length, C.character_octet_length, C.datetime_precision, C.comment +// FROM +// "' || rec.database_name || '".INFORMATION_SCHEMA.TABLES AS T +// JOIN +// "' || rec.database_name || '".INFORMATION_SCHEMA.COLUMNS AS C +// ON +// T.table_catalog = C.table_catalog +// AND T.table_schema = C.table_schema +// AND t.table_name = C.table_name'; + +// counter := counter + 1; +// END FOR; +// ` + +// const querySuffix = ` +// res := (EXECUTE IMMEDIATE :statement); +// RETURN TABLE (res); +// END;` + +// if catalog != nil && *catalog != "" { +// queryPrefix = `DECLARE +// c1 CURSOR FOR SELECT DATABASE_NAME FROM INFORMATION_SCHEMA.DATABASES WHERE DATABASE_NAME ILIKE '` + *catalog + `';` + +// `res RESULTSET; +// counter INTEGER DEFAULT 0; +// statement VARCHAR DEFAULT ''; +// BEGIN +// FOR rec IN c1 DO +// IF (counter > 0) THEN +// statement := statement || ' UNION ALL '; +// END IF; +// ` +// } +// query := queryPrefix + getSchema + cond + querySuffix +// return query +// } + func descToField(name, typ, isnull, primary string, comment sql.NullString) (field arrow.Field, err error) { field.Name = strings.ToLower(name) if isnull == "Y" {