From 3756eaf83a9fa5ba0a2db8ae9a632a8119275b6d Mon Sep 17 00:00:00 2001 From: Rahul Joshi Date: Tue, 23 Jul 2024 10:56:00 +0200 Subject: [PATCH 01/13] structural and content changes to the sql_database doc --- .../verified-sources/sql_database.md | 371 ++++++++++-------- 1 file changed, 215 insertions(+), 156 deletions(-) diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md index a5869e99bd..5e638fc0da 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md @@ -50,120 +50,247 @@ We support all [SQLAlchemy dialects](https://docs.sqlalchemy.org/en/20/dialects/ Note that there many unofficial dialects, such as [DuckDB](https://duckdb.org/). ::: -## Setup Guide +## Setup -1. ### Initialize the verified source +To connect to your SQL database using `dlt` follow these steps: -To get started with your data pipeline, follow these steps: +1. Initialize a `dlt` project in the current working directory by running the following command: -1. Enter the following command: + ```sh + dlt init sql_database duckdb + ``` - ```sh - dlt init sql_database duckdb - ``` - - It will initialize - [the pipeline example](https://github.com/dlt-hub/verified-sources/blob/master/sources/sql_database_pipeline.py) - with an SQL database as the [source](../../general-usage/source) and + This will add necessary files and configurations for a `dlt` pipeline with SQL database as the [source](../../general-usage/source) and [DuckDB](../destinations/duckdb.md) as the [destination](../destinations). + + :::tip + If you'd like to use a different destination, simply replace `duckdb` with the name of your preferred [destination](../destinations). + ::: + +2. Add credentials for your SQL database + + To connect to your SQL database, `dlt` would need to authenticate using necessary credentials. To enable this, paste your credentials in the `secrets.toml` file created inside the `.dlt/` folder in the following format: + ```toml + [sources.sql_database.credentials] + drivername = "mysql+pymysql" # driver name for the database + database = "Rfam" # database name + username = "rfamro" # username associated with the database + host = "mysql-rfam-public.ebi.ac.uk" # host address + port = "4497" # port required for connection + ``` + + Alternatively, you can also authenticate using connection strings: + ```toml + [sources.sql_database.credentials] + credentials="mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam" + ``` + + To learn more about how to pass credentials into your `dlt` pipeline see [here](../../walkthroughs/add_credentials.md). + + #### Credentials format for SQL databases + `sql_database` uses SQLAlchemy to create database connections and reflect table schemas. You can pass credentials using + [database urls](https://docs.sqlalchemy.org/en/20/core/engines.html#database-urls). For example: + + "mysql+pymysql://rfamro:PWD@mysql-rfam-public.ebi.ac.uk:4497/Rfam"` + + will connect to `myssql` database with a name `Rfam` using `pymysql` dialect. The database host is at `mysql-rfam-public.ebi.ac.uk`, port `4497`. + User name is `rfmaro` and password is `PWD`. + +3. Add credentials for your destination (if necessary) + Depending on which [destination](../destinations) you're loading into, you might also need to add your destination credentials. For more information, read the [General Usage: Credentials.](../../credentials) - :::tip - If you'd like to use a different destination, simply replace `duckdb` with the name of your preferred [destination](../destinations). +4. Install any necessary dependencies + + ```sh + pip install -r requirements.txt + ``` + +5. Run the pipeline + + ```sh + python sql_database_pipeline.py + ``` + + +6. Make sure everything is loaded as expected with + ```sh + dlt pipeline show + ``` + + :::note + The pipeline_name for the above example is `rfam`, you may also use any + custom name instead. ::: -1. After running this command, a new directory will be created with the necessary files and - configuration settings to get started. +## Configuring the SQL database verified source -For more information, read the guide on [how to add a verified source](../../walkthroughs/add-a-verified-source). +The SQL database verified source has the sources and resources: +1. `sql_database`: a `dlt` source which can be used to load multiple tables and views from a SQL database +2. `sql_table` - a `dlt` resource that loads a single table from the SQL database -2. ### Add credentials +Read more about sources and resources here: [General Usage: Source](../../general-usage/source.md) and [General Usage: Resource](../../general-usage/resource.md). -1. In the `.dlt` folder, there's a file called `secrets.toml`. It's where you store sensitive - information securely, like access tokens. Keep this file safe. +#### Examples for how these sources and resources can be used: - Here's what the `secrets.toml` looks like: +**Load all the tables from a database** +Calling `sql_database()` loads all tables from the database. +```python +def load_entire_database() -> None: - ```toml - [sources.sql_database.credentials] - drivername = "mysql+pymysql" # driver name for the database - database = "Rfam" # database name - username = "rfamro" # username associated with the database - host = "mysql-rfam-public.ebi.ac.uk" # host address - port = "4497" # port required for connection - ``` + # Define the pipeline + pipeline = dlt.pipeline( + pipeline_name="rfam", + destination='synapse', + dataset_name="rfam_data" + ) -1. Alternatively, you can also provide credentials in "secrets.toml" as: + # Fetch all the tables from the database + source = sql_database() - ```toml - [sources.sql_database] - credentials="mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam" - ``` - > See - > [pipeline example](https://github.com/dlt-hub/verified-sources/blob/master/sources/sql_database_pipeline.py) - > for details. + # Run the pipeline + info = pipeline.run(source, write_disposition="replace") + + # Print load info + print(info) +``` +**Load select tables from a database** +Calling `sql_database().with_resources("family", "clan")` loads only the tables `"family"` and `"clan"` from the database. +```python +def load_select_tables_from_database() -> None: -1. Finally, follow the instructions in [Destinations](../destinations/) to add credentials for your chosen destination. This will ensure that your data is properly routed. + # Define the pipeline + pipeline = dlt.pipeline( + pipeline_name="rfam", + destination="postgres", + dataset_name="rfam_data" + ) -For more information, read the [General Usage: Credentials.](../../general-usage/credentials) + # Fetch tables "family" and "clan" + source = sql_database().with_resources("family", "clan") -#### Credentials format + # Run the pipeline + info = pipeline.run(source) -`sql_database` uses SQLAlchemy to create database connections and reflect table schemas. You can pass credentials using -[database urls](https://docs.sqlalchemy.org/en/20/core/engines.html#database-urls). For example: + # Print load info + print(info) -"mysql+pymysql://rfamro:PWD@mysql-rfam-public.ebi.ac.uk:4497/Rfam"` +``` + +**Load a single table** +Calling `sql_table(table="family")` fetches only the table `"family"` +```python +def load_select_tables_from_database() -> None: + + # Define the pipeline + pipeline = dlt.pipeline( + pipeline_name="rfam", + destination="duckdb", + dataset_name="rfam_data" + ) -will connect to `myssql` database with a name `Rfam` using `pymysql` dialect. The database host is at `mysql-rfam-public.ebi.ac.uk`, port `4497`. -User name is `rfmaro` and password is `PWD`. + # Fetch the table "family" + table = sql_table(table="family") -3. ### Run the pipeline + # Run the pipeline + info = pipeline.run(table) -1. Install the necessary dependencies by running the following command: + # Print load info + print(info) + +``` + +:::tip +We intend our sources to be fully hackable. Feel free to change the source code of the sources and resources to customize it to your needs::: + +## Configuring connection to the SQL database + +### Passing credentials in `secrets.toml` or as environment variables + +By default, `dlt` looks for credentials inside `.dlt/secrets.toml` or in the environment variables. Read [here](../../walkthroughs/add_credentials.md) for more information on how to add your credentials. + + +### Passing credentials explicitly +It is also possible to explicitly pass credentials inside the source. Example: +```python +from dlt.sources.credentials import ConnectionStringCredentials +from sql_database import sql_table + +credentials = ConnectionStringCredentials( + "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam" +) - ```sh - pip install -r requirements.txt +source = sql_table(credentials).with_resource("family") +``` +:::Note: It is recommended to configure credentials in `.dlt/secrets.toml` and to not include any sensitive information in the pipeline code::: + +### Other connection options +#### Using SqlAlchemy Engine as credentials +You are able to pass an instance of **SqlAlchemy** `Engine` instance instead of credentials: +```py +from sqlalchemy import create_engine + +engine = create_engine("mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam") +table = sql_table(engine, table="chat_message", schema="data") +``` +Engine is used by `dlt` to open database connections and can work across multiple threads so is compatible with `parallelize` setting of dlt sources and resources. + +#### Connect to mysql with SSL +Here, we use the `mysql` and `pymysql` dialects to set up an SSL connection to a server, with all information taken from the [SQLAlchemy docs](https://docs.sqlalchemy.org/en/14/dialects/mysql.html#ssl-connections). + +1. To enforce SSL on the client without a client certificate you may pass the following DSN: + + ```toml + sources.sql_database.credentials="mysql+pymysql://root:@:3306/mysql?ssl_ca=" ``` -1. Run the verified source by entering: +1. You can also pass the server's public certificate (potentially bundled with your pipeline) and disable host name checks: - ```sh - python sql_database_pipeline.py + ```toml + sources.sql_database.credentials="mysql+pymysql://root:@:3306/mysql?ssl_ca=server-ca.pem&ssl_check_hostname=false" ``` -1. Make sure that everything is loaded as expected with: +1. For servers requiring a client certificate, provide the client's private key (a secret value). In Airflow, this is usually saved as a variable and exported to a file before use. The server certificate is omitted in the example below: - ```sh - dlt pipeline show + ```toml + sources.sql_database.credentials="mysql+pymysql://root:@35.203.96.191:3306/mysql?ssl_ca=&ssl_cert=client-cert.pem&ssl_key=client-key.pem" ``` - :::note - The pipeline_name for the above example is `rfam`, you may also use any - custom name instead. - ::: +#### SQL Server connection options -## Source and resource functions -Import `sql_database` and `sql_table` functions as follows: -```py -from sql_database import sql_database, sql_table +**To connect to an `mssql` server using Windows authentication**, include `trusted_connection=yes` in the connection string. + +```toml +sources.sql_database.credentials="mssql+pyodbc://loader.database.windows.net/dlt_data?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server" ``` -and read the docstrings to learn about available options. -:::tip -We intend our sources to be fully hackable. Feel free to change the code of the source to customize it to your needs -::: +**To connect to a local sql server instance running without SSL** pass `encrypt=no` parameter: +```toml +sources.sql_database.credentials="mssql+pyodbc://loader:loader@localhost/dlt_data?encrypt=no&driver=ODBC+Driver+17+for+SQL+Server" +``` + +**To allow self signed SSL certificate** when you are getting `certificate verify failed:unable to get local issuer certificate`: +```toml +sources.sql_database.credentials="mssql+pyodbc://loader:loader@localhost/dlt_data?TrustServerCertificate=yes&driver=ODBC+Driver+17+for+SQL+Server" +``` + +**To use long strings (>8k) and avoid collation errors**: +```toml +sources.sql_database.credentials="mssql+pyodbc://loader:loader@localhost/dlt_data?LongAsMax=yes&driver=ODBC+Driver+17+for+SQL+Server" +``` + +## Configuring the backend -## Pick the right backend to load table data Table backends convert stream of rows from database tables into batches in various formats. The default backend **sqlalchemy** is following standard `dlt` behavior of extracting and normalizing Python dictionaries. We recommend it for smaller tables, initial development work and when minimal dependencies or pure Python environment is required. This backend is also the slowest. Database tables are structured data and other backends speed up dealing with such data significantly. The **pyarrow** will convert rows into `arrow` tables, has good performance, preserves exact database types and we recommend it for large tables. -### **sqlalchemy** backend +### **sqlalchemy** **sqlalchemy** (the default) yields table data as list of Python dictionaries. This data goes through regular extract and normalize steps and does not require additional dependencies to be installed. It is the most robust (works with any destination, correctly represents data types) but also the slowest. You can use `reflection_level="full_with_precision"` to pass exact database types to `dlt` schema. -### **pyarrow** backend +### **pyarrow** **pyarrow** yields data as Arrow tables. It uses **SqlAlchemy** to read rows in batches but then immediately converts them into `ndarray`, transposes it and uses to set columns in an arrow table. This backend always fully reflects the database table and preserves original types ie. **decimal** / **numeric** will be extracted without loss of precision. If the destination loads parquet files, this backend will skip `dlt` normalizer and you can gain two orders of magnitude (20x - 30x) speed increase. @@ -192,7 +319,7 @@ info = pipeline.run(sql_alchemy_source) print(info) ``` -### **pandas** backend +### **pandas** **pandas** backend yield data as data frames using the `pandas.io.sql` module. `dlt` use **pyarrow** dtypes by default as they generate more stable typing. @@ -232,7 +359,7 @@ info = pipeline.run(sql_alchemy_source) print(info) ``` -### **connectorx** backend +### **connectorx** [connectorx](https://sfu-db.github.io/connector-x/intro.html) backend completely skips **sqlalchemy** when reading table rows, in favor of doing that in rust. This is claimed to be significantly faster than any other method (confirmed only on postgres - see next chapter). With the default settings it will emit **pyarrow** tables, but you can configure it via **backend_kwargs**. There are certain limitations when using this backend: @@ -247,7 +374,7 @@ There are certain limitations when using this backend: from sources.sql_database.helpers import unwrap_json_connector_x ``` -Note: dlt will still use the reflected source database types to create destination tables. It is up to the destination to reconcile / parse type differences. Please note that you' still lose precision on decimals with default settings. +Note: `dlt` will still use the reflected source database types to create destination tables. It is up to the destination to reconcile / parse type differences. Please note that you' still lose precision on decimals with default settings. ```py """Uses unsw_flow dataset (~2mln rows, 25+ columns) to test connectorx speed""" @@ -284,7 +411,7 @@ print(info) ``` With dataset above and local postgres instance, connectorx is 2x faster than pyarrow backend. -### Notes on source databases +### Troubleshooting with backends #### Oracle 1. When using **oracledb** dialect in thin mode we are getting protocol errors. Use thick mode or **cx_oracle** (old) client. @@ -301,14 +428,16 @@ With dataset above and local postgres instance, connectorx is 2x faster than pya #### Postgres / MSSQL No issues found. Postgres is the only backend where we observed 2x speedup with connector x. On other db systems it performs same as `pyarrrow` backend or slower. +## Additional configurations + +### Incremental Loading -## Incremental Loading Efficient data management often requires loading only new or updated data from your SQL databases, rather than reprocessing the entire dataset. This is where incremental loading comes into play. Incremental loading uses a cursor column (e.g., timestamp or auto-incrementing ID) to load only data newer than a specified initial value, enhancing efficiency by reducing processing time and resource use. -### Configuring Incremental Loading +#### How to configure 1. **Choose a Cursor Column**: Identify a column in your SQL table that can serve as a reliable indicator of new or updated rows. Common choices include timestamp columns or auto-incrementing IDs. 1. **Set an Initial Value**: Choose a starting value for the cursor to begin loading data. This could be a specific timestamp or ID from which you wish to start loading data. 1. **Deduplication**: When using incremental loading, the system automatically handles the deduplication of rows based on the primary key (if available) or row hash for tables without a primary key. @@ -316,7 +445,7 @@ Incremental loading uses a cursor column (e.g., timestamp or auto-incrementing I certain range. 1. **Order returned rows**. Set `row_order` to `asc` or `desc` to order returned rows. -#### Incremental Loading Example +#### Example 1. Consider a table with a `last_modified` timestamp column. By setting this column as your cursor and specifying an initial value, the loader generates a SQL query filtering rows with `last_modified` values greater than the specified initial value. @@ -370,21 +499,15 @@ certain range. * `apply_hints` is a powerful method that enables schema modifications after resource creation, like adjusting write disposition and primary keys. You can choose from various tables and use `apply_hints` multiple times to create pipelines with merged, appended, or replaced resources. ::: -## Run on Airflow -When running on Airflow -1. Use `dlt` [Airflow Helper](../../walkthroughs/deploy-a-pipeline/deploy-with-airflow-composer.md#2-modify-dag-file) to create tasks from `sql_database` source. You should be able to run table extraction in parallel with `parallel-isolated` source->DAG conversion. -2. Reflect tables at runtime with `defer_table_reflect` argument. -3. Set `allow_external_schedulers` to load data using [Airflow intervals](../../general-usage/incremental-loading.md#using-airflow-schedule-for-backfill-and-incremental-loading). +### Parallelize extraction -## Parallel extraction You can extract each table in a separate thread (no multiprocessing at this point). This will decrease loading time if your queries take time to execute or your network latency/speed is low. ```py database = sql_database().parallelize() table = sql_table().parallelize() ``` -## Column reflection - +### Column reflection Columns and their data types are reflected with SQLAlchemy. The SQL types are then mapped to `dlt` types. Most types are supported. @@ -438,7 +561,7 @@ source = sql_database( dlt.pipeline("demo").run(source) ``` -## Extended configuration +### Configuring with toml/environment variables You are able to configure most of the arguments to `sql_database` and `sql_table` via toml files and environment variables. This is particularly useful with `sql_table` because you can maintain a separate configuration for each table (below we show **secrets.toml** and **config.toml**, you are free to combine them into one.): ```toml @@ -485,79 +608,15 @@ SOURCES__SQL_DATABASE__CHUNK_SIZE=1000 SOURCES__SQL_DATABASE__CHAT_MESSAGE__INCREMENTAL__CURSOR_PATH=updated_at ``` -### Configuring incremental loading -`dlt.sources.incremental` class is a [config spec](https://dlthub.com/docs/general-usage/credentials/config_specs) and can be configured like any other spec, here's an example that sets all possible options: -```toml -[sources.sql_database.chat_message.incremental] -cursor_path="updated_at" -initial_value=2024-05-27T07:32:00Z -end_value=2024-05-28T07:32:00Z -row_order="asc" -allow_external_schedulers=false -``` -Please note that we specify date times in **toml** as initial and end value. For env variables only strings are currently supported. +## Extended Usage +### Running on Airflow +When running on Airflow +1. Use `dlt` [Airflow Helper](../../walkthroughs/deploy-a-pipeline/deploy-with-airflow-composer.md#2-modify-dag-file) to create tasks from `sql_database` source. You should be able to run table extraction in parallel with `parallel-isolated` source->DAG conversion. +2. Reflect tables at runtime with `defer_table_reflect` argument. +3. Set `allow_external_schedulers` to load data using [Airflow intervals](../../general-usage/incremental-loading.md#using-airflow-schedule-for-backfill-and-incremental-loading). -### Use SqlAlchemy Engine as credentials -You are able to pass an instance of **SqlAlchemy** `Engine` instance instead of credentials: -```py -from sqlalchemy import create_engine - -engine = create_engine("mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam") -table = sql_table(engine, table="chat_message", schema="data") -``` -Engine is used by `dlt` to open database connections and can work across multiple threads so is compatible with `parallelize` setting of dlt sources and resources. - - -## Troubleshooting - -### Connect to mysql with SSL -Here, we use the `mysql` and `pymysql` dialects to set up an SSL connection to a server, with all information taken from the [SQLAlchemy docs](https://docs.sqlalchemy.org/en/14/dialects/mysql.html#ssl-connections). - -1. To enforce SSL on the client without a client certificate you may pass the following DSN: - - ```toml - sources.sql_database.credentials="mysql+pymysql://root:@:3306/mysql?ssl_ca=" - ``` - -1. You can also pass the server's public certificate (potentially bundled with your pipeline) and disable host name checks: - - ```toml - sources.sql_database.credentials="mysql+pymysql://root:@:3306/mysql?ssl_ca=server-ca.pem&ssl_check_hostname=false" - ``` - -1. For servers requiring a client certificate, provide the client's private key (a secret value). In Airflow, this is usually saved as a variable and exported to a file before use. The server certificate is omitted in the example below: - - ```toml - sources.sql_database.credentials="mysql+pymysql://root:@35.203.96.191:3306/mysql?ssl_ca=&ssl_cert=client-cert.pem&ssl_key=client-key.pem" - ``` - -### SQL Server connection options - -**To connect to an `mssql` server using Windows authentication**, include `trusted_connection=yes` in the connection string. - -```toml -sources.sql_database.credentials="mssql+pyodbc://loader.database.windows.net/dlt_data?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server" -``` - -**To connect to a local sql server instance running without SSL** pass `encrypt=no` parameter: -```toml -sources.sql_database.credentials="mssql+pyodbc://loader:loader@localhost/dlt_data?encrypt=no&driver=ODBC+Driver+17+for+SQL+Server" -``` - -**To allow self signed SSL certificate** when you are getting `certificate verify failed:unable to get local issuer certificate`: -```toml -sources.sql_database.credentials="mssql+pyodbc://loader:loader@localhost/dlt_data?TrustServerCertificate=yes&driver=ODBC+Driver+17+for+SQL+Server" -``` - -***To use long strings (>8k) and avoid collation errors**: -```toml -sources.sql_database.credentials="mssql+pyodbc://loader:loader@localhost/dlt_data?LongAsMax=yes&driver=ODBC+Driver+17+for+SQL+Server" -``` - -## Customizations -### Transform the data in Python before it is loaded - +### Transforming data before load You have direct access to all resources (that represent tables) and you can modify hints, add python transforms, parallelize execution etc. as for any other resource. Below we show you an example on how to pseudonymize the data before it is loaded by using deterministic hashing. From 53c5a90804910fd190ae6c0541463b486c8fdeaf Mon Sep 17 00:00:00 2001 From: Rahul Joshi Date: Tue, 23 Jul 2024 11:33:02 +0200 Subject: [PATCH 02/13] fixing language in code snippets --- .../docs/dlt-ecosystem/verified-sources/sql_database.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md index 5e638fc0da..50f26a03b1 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md @@ -134,7 +134,7 @@ Read more about sources and resources here: [General Usage: Source](../../genera **Load all the tables from a database** Calling `sql_database()` loads all tables from the database. -```python +```py def load_entire_database() -> None: # Define the pipeline @@ -155,7 +155,7 @@ def load_entire_database() -> None: ``` **Load select tables from a database** Calling `sql_database().with_resources("family", "clan")` loads only the tables `"family"` and `"clan"` from the database. -```python +```py def load_select_tables_from_database() -> None: # Define the pipeline @@ -178,7 +178,7 @@ def load_select_tables_from_database() -> None: **Load a single table** Calling `sql_table(table="family")` fetches only the table `"family"` -```python +```py def load_select_tables_from_database() -> None: # Define the pipeline @@ -211,7 +211,7 @@ By default, `dlt` looks for credentials inside `.dlt/secrets.toml` or in the env ### Passing credentials explicitly It is also possible to explicitly pass credentials inside the source. Example: -```python +```py from dlt.sources.credentials import ConnectionStringCredentials from sql_database import sql_table From 99ba1276bc44e03f2511813fe8e75b8c7b66b33f Mon Sep 17 00:00:00 2001 From: Rahul Joshi Date: Tue, 23 Jul 2024 13:08:59 +0200 Subject: [PATCH 03/13] fixing broken link --- .../website/docs/dlt-ecosystem/verified-sources/sql_database.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md index 50f26a03b1..61e6280f27 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md @@ -97,7 +97,7 @@ To connect to your SQL database using `dlt` follow these steps: User name is `rfmaro` and password is `PWD`. 3. Add credentials for your destination (if necessary) - Depending on which [destination](../destinations) you're loading into, you might also need to add your destination credentials. For more information, read the [General Usage: Credentials.](../../credentials) + Depending on which [destination](../destinations) you're loading into, you might also need to add your destination credentials. For more information, read the [General Usage: Credentials.](../../general-usage/credentials) 4. Install any necessary dependencies From b34129082f9d23bc7302a0e116d6a3391fc847d6 Mon Sep 17 00:00:00 2001 From: Rahul Joshi Date: Mon, 5 Aug 2024 15:50:49 +0200 Subject: [PATCH 04/13] updating content + structure based on feedback --- .../verified-sources/sql_database.md | 480 ++++++++---------- 1 file changed, 222 insertions(+), 258 deletions(-) diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md index 61e6280f27..f795f4a37d 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md @@ -12,7 +12,7 @@ import Header from './_source-info-header.md'; SQL databases are management systems (DBMS) that store data in a structured format, commonly used for efficient and reliable data retrieval. -Our SQL Database verified source loads data to your specified destination using SQLAlchemy, pyarrow, pandas or ConnectorX +Our SQL Database verified source loads data to your specified destination using SQLAlchemy, pyarrow, pandas, or ConnectorX :::tip View the pipeline example [here](https://github.com/dlt-hub/verified-sources/blob/master/sources/sql_database_pipeline.py). @@ -30,13 +30,13 @@ Sources and resources that can be loaded using this verified source are: We support all [SQLAlchemy dialects](https://docs.sqlalchemy.org/en/20/dialects/), which include, but are not limited to, the following database engines: -* PostgreSQL -* MySQL +* [PostgreSQL](#postgres--mssql) +* [MySQL](#mysql) * SQLite -* Oracle -* Microsoft SQL Server +* [Oracle](#oracle) +* [Microsoft SQL Server](#postgres--mssql) * MariaDB -* IBM DB2 and Informix +* [IBM DB2 and Informix](#db2) * Google BigQuery * Snowflake * Redshift @@ -87,17 +87,9 @@ To connect to your SQL database using `dlt` follow these steps: To learn more about how to pass credentials into your `dlt` pipeline see [here](../../walkthroughs/add_credentials.md). - #### Credentials format for SQL databases - `sql_database` uses SQLAlchemy to create database connections and reflect table schemas. You can pass credentials using - [database urls](https://docs.sqlalchemy.org/en/20/core/engines.html#database-urls). For example: - - "mysql+pymysql://rfamro:PWD@mysql-rfam-public.ebi.ac.uk:4497/Rfam"` - - will connect to `myssql` database with a name `Rfam` using `pymysql` dialect. The database host is at `mysql-rfam-public.ebi.ac.uk`, port `4497`. - User name is `rfmaro` and password is `PWD`. - 3. Add credentials for your destination (if necessary) - Depending on which [destination](../destinations) you're loading into, you might also need to add your destination credentials. For more information, read the [General Usage: Credentials.](../../general-usage/credentials) + + Depending on which [destination](../destinations) you're loading into, you might also need to add your destination credentials. For more information read the [General Usage: Credentials.](../../general-usage/credentials) 4. Install any necessary dependencies @@ -119,97 +111,111 @@ To connect to your SQL database using `dlt` follow these steps: :::note The pipeline_name for the above example is `rfam`, you may also use any - custom name instead. - ::: + custom name instead. ::: + -## Configuring the SQL database verified source +## How to use -The SQL database verified source has the sources and resources: +The SQL Database verified source has two in-built sources and resources: 1. `sql_database`: a `dlt` source which can be used to load multiple tables and views from a SQL database -2. `sql_table` - a `dlt` resource that loads a single table from the SQL database +2. `sql_table`: a `dlt` resource that loads a single table from the SQL database Read more about sources and resources here: [General Usage: Source](../../general-usage/source.md) and [General Usage: Resource](../../general-usage/resource.md). -#### Examples for how these sources and resources can be used: +#### Examples: -**Load all the tables from a database** +1. **Load all the tables from a database** Calling `sql_database()` loads all tables from the database. -```py -def load_entire_database() -> None: + ```py + def load_entire_database() -> None: - # Define the pipeline - pipeline = dlt.pipeline( - pipeline_name="rfam", - destination='synapse', - dataset_name="rfam_data" - ) + # Define the pipeline + pipeline = dlt.pipeline( + pipeline_name="rfam", + destination='synapse', + dataset_name="rfam_data" + ) - # Fetch all the tables from the database - source = sql_database() + # Fetch all the tables from the database + source = sql_database() - # Run the pipeline - info = pipeline.run(source, write_disposition="replace") + # Run the pipeline + info = pipeline.run(source, write_disposition="replace") - # Print load info - print(info) -``` -**Load select tables from a database** + # Print load info + print(info) + ``` +2. **Load select tables from a database** Calling `sql_database().with_resources("family", "clan")` loads only the tables `"family"` and `"clan"` from the database. -```py -def load_select_tables_from_database() -> None: + ```py + def load_select_tables_from_database() -> None: - # Define the pipeline - pipeline = dlt.pipeline( - pipeline_name="rfam", - destination="postgres", - dataset_name="rfam_data" - ) + # Define the pipeline + pipeline = dlt.pipeline( + pipeline_name="rfam", + destination="postgres", + dataset_name="rfam_data" + ) - # Fetch tables "family" and "clan" - source = sql_database().with_resources("family", "clan") + # Fetch tables "family" and "clan" + source = sql_database().with_resources("family", "clan") - # Run the pipeline - info = pipeline.run(source) + # Run the pipeline + info = pipeline.run(source) - # Print load info - print(info) + # Print load info + print(info) -``` + ``` -**Load a single table** +3. **Load a standalone table** Calling `sql_table(table="family")` fetches only the table `"family"` -```py -def load_select_tables_from_database() -> None: + ```py + def load_select_tables_from_database() -> None: - # Define the pipeline - pipeline = dlt.pipeline( - pipeline_name="rfam", - destination="duckdb", - dataset_name="rfam_data" - ) + # Define the pipeline + pipeline = dlt.pipeline( + pipeline_name="rfam", + destination="duckdb", + dataset_name="rfam_data" + ) - # Fetch the table "family" - table = sql_table(table="family") + # Fetch the table "family" + table = sql_table(table="family") - # Run the pipeline - info = pipeline.run(table) + # Run the pipeline + info = pipeline.run(table) - # Print load info - print(info) + # Print load info + print(info) -``` + ``` :::tip -We intend our sources to be fully hackable. Feel free to change the source code of the sources and resources to customize it to your needs::: +We intend our sources to be fully hackable. Feel free to change the source code of the sources and resources to customize it to your needs. ::: + ## Configuring connection to the SQL database -### Passing credentials in `secrets.toml` or as environment variables +### Connection string format +`sql_database` uses SQLAlchemy to create database connections and reflect table schemas. You can pass credentials using +[database urls](https://docs.sqlalchemy.org/en/20/core/engines.html#database-urls). For example, to use the `pymysql` dialect to connect to a `myssql` database `Rfam` with user name `rfmaro` password `PWD` host `mysql-rfam-public.ebi.ac.uk` and port `4497`, you would construct your connection string as follows: + +"mysql+pymysql://rfamro:PWD@mysql-rfam-public.ebi.ac.uk:4497/Rfam"` + +Database-specific drivers can be passed into the connection string using query parameters. For example, to connect to Microsoft SQL Server using the ODBC Driver, you would need to pass the driver as a query parameter as follows: -By default, `dlt` looks for credentials inside `.dlt/secrets.toml` or in the environment variables. Read [here](../../walkthroughs/add_credentials.md) for more information on how to add your credentials. +"mssql+pyodbc://username:password@server/database?driver=ODBC+Driver+17+for+SQL+Server" -### Passing credentials explicitly +### Adding credentials to the `dlt` pipeline + +#### Setting them in `secrets.toml` or as environment variables (Recommended) + +By default, `dlt` looks for credentials inside `.dlt/secrets.toml` or in the environment variables. See Step 2 of the [setup](#setup) for how to set credentials inside `secrets.toml`. For more information on passing credentials through `.toml` or as enviroment variables, read [here](../../walkthroughs/add_credentials.md). + + +#### Passing them directly in the script It is also possible to explicitly pass credentials inside the source. Example: ```py from dlt.sources.credentials import ConnectionStringCredentials @@ -225,14 +231,14 @@ source = sql_table(credentials).with_resource("family") ### Other connection options #### Using SqlAlchemy Engine as credentials -You are able to pass an instance of **SqlAlchemy** `Engine` instance instead of credentials: +You are able to pass an instance of SqlAlchemy Engine instead of credentials: ```py from sqlalchemy import create_engine engine = create_engine("mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam") table = sql_table(engine, table="chat_message", schema="data") ``` -Engine is used by `dlt` to open database connections and can work across multiple threads so is compatible with `parallelize` setting of dlt sources and resources. +This engine is used by `dlt` to open database connections and can work across multiple threads so is compatible with `parallelize` setting of dlt sources and resources. #### Connect to mysql with SSL Here, we use the `mysql` and `pymysql` dialects to set up an SSL connection to a server, with all information taken from the [SQLAlchemy docs](https://docs.sqlalchemy.org/en/14/dialects/mysql.html#ssl-connections). @@ -280,20 +286,19 @@ sources.sql_database.credentials="mssql+pyodbc://loader:loader@localhost/dlt_dat ## Configuring the backend -Table backends convert stream of rows from database tables into batches in various formats. The default backend **sqlalchemy** is following standard `dlt` behavior of -extracting and normalizing Python dictionaries. We recommend it for smaller tables, initial development work and when minimal dependencies or pure Python environment is required. This backend is also the slowest. -Database tables are structured data and other backends speed up dealing with such data significantly. The **pyarrow** will convert rows into `arrow` tables, has -good performance, preserves exact database types and we recommend it for large tables. +Table backends convert streams of rows from database tables into batches in various formats. The default backend **sqlalchemy** follows standard `dlt` behavior of +extracting and normalizing Python dictionaries. We recommend this for smaller tables, initial development work, and when minimal dependencies or a pure Python environment is required. This backend is also the slowest. Other backends make use of the structured data format of the tables and provide significant improvement in speeds. For example, the **pyarrow** backend converts rows into `arrow` tables, which results in +good performance and preserves exact database types. We recommend using this backend for larger tables. ### **sqlalchemy** -**sqlalchemy** (the default) yields table data as list of Python dictionaries. This data goes through regular extract -and normalize steps and does not require additional dependencies to be installed. It is the most robust (works with any destination, correctly represents data types) but also the slowest. You can use `reflection_level="full_with_precision"` to pass exact database types to `dlt` schema. +The **sqlalchemy** backend (the default) yields table data as a list of Python dictionaries. This data goes through regular extract +and normalize steps and does not require additional dependencies to be installed. It is the most robust (works with any destination, correctly represents data types) but also the slowest. You can set `reflection_level="full_with_precision"` to pass exact database types to `dlt` schema. ### **pyarrow** -**pyarrow** yields data as Arrow tables. It uses **SqlAlchemy** to read rows in batches but then immediately converts them into `ndarray`, transposes it and uses to set columns in an arrow table. This backend always fully -reflects the database table and preserves original types ie. **decimal** / **numeric** will be extracted without loss of precision. If the destination loads parquet files, this backend will skip `dlt` normalizer and you can gain two orders of magnitude (20x - 30x) speed increase. +The **pyarrow** backend yields data as Arrow tables. It uses **SqlAlchemy** to read rows in batches but then immediately converts them into `ndarray`, transposes it, and sets it as columns in an arrow table. This backend always fully +reflects the database table and preserves original types (i.e. **decimal** / **numeric** data will be extracted without loss of precision). If the destination loads parquet files, this backend will skip `dlt` normalizer and you can gain two orders of magnitude (20x - 30x) speed increase. Note that if **pandas** is installed, we'll use it to convert SqlAlchemy tuples into **ndarray** as it seems to be 20-30% faster than using **numpy** directly. @@ -321,18 +326,18 @@ print(info) ### **pandas** -**pandas** backend yield data as data frames using the `pandas.io.sql` module. `dlt` use **pyarrow** dtypes by default as they generate more stable typing. +The **pandas** backend yields data as data frames using the `pandas.io.sql` module. `dlt` uses **pyarrow** dtypes by default as they generate more stable typing. -With default settings, several database types will be coerced to dtypes in yielded data frame: -* **decimal** are mapped to doubles so it is possible to lose precision. +With the default settings, several database types will be coerced to dtypes in the yielded data frame: +* **decimal** is mapped to doubles so it is possible to lose precision * **date** and **time** are mapped to strings -* all types are nullable. +* all types are nullable Note: `dlt` will still use the reflected source database types to create destination tables. It is up to the destination to reconcile / parse -type differences. Most of the destinations will be able to parse date/time strings and convert doubles into decimals (Please note that you' still lose precision on decimals with default settings.). **However we strongly suggest +type differences. Most of the destinations will be able to parse date/time strings and convert doubles into decimals (Please note that you'll still lose precision on decimals with default settings.). **However we strongly suggest not to use pandas backend if your source tables contain date, time or decimal columns** -Example: Use `backend_kwargs` to pass [backend-specific settings](https://pandas.pydata.org/docs/reference/api/pandas.read_sql_table.html) ie. `coerce_float`. Internally dlt uses `pandas.io.sql._wrap_result` to generate panda frames. +To adjust [backend-specific settings,](https://pandas.pydata.org/docs/reference/api/pandas.read_sql_table.html) pass it in the `backend_kwargs` parameter. For example, below we set `coerce_float` to `False`: ```py import sqlalchemy as sa @@ -360,21 +365,21 @@ print(info) ``` ### **connectorx** -[connectorx](https://sfu-db.github.io/connector-x/intro.html) backend completely skips **sqlalchemy** when reading table rows, in favor of doing that in rust. This is claimed to be significantly faster than any other method (confirmed only on postgres - see next chapter). With the default settings it will emit **pyarrow** tables, but you can configure it via **backend_kwargs**. +The [connectorx](https://sfu-db.github.io/connector-x/intro.html) backend completely skips **sqlalchemy** when reading table rows, in favor of doing that in rust. This is claimed to be significantly faster than any other method (validated only on postgres). With the default settings it will emit **pyarrow** tables, but you can configure it via **backend_kwargs**. There are certain limitations when using this backend: * it will ignore `chunk_size`. **connectorx** cannot yield data in batches. -* in many cases it requires a connection string that differs from **sqlalchemy** connection string. Use `conn` argument in **backend_kwargs** to set it up. -* it will convert **decimals** to **doubles** so you'll will lose precision. +* in many cases it requires a connection string that differs from the **sqlalchemy** connection string. Use the `conn` argument in **backend_kwargs** to set this. +* it will convert **decimals** to **doubles**, so you'll will lose precision. * nullability of the columns is ignored (always true) * it uses different database type mappings for each database type. [check here for more details](https://sfu-db.github.io/connector-x/databases.html) -* JSON fields (at least those coming from postgres) are double wrapped in strings. Here's a transform to be added with `add_map` that will unwrap it: +* JSON fields (at least those coming from postgres) are double wrapped in strings. To unwrap this, you can pass the in-built transformation function `unwrap_json_connector_x` (for example, with `add_map`): -```py -from sources.sql_database.helpers import unwrap_json_connector_x -``` + ```py + from sources.sql_database.helpers import unwrap_json_connector_x + ``` -Note: `dlt` will still use the reflected source database types to create destination tables. It is up to the destination to reconcile / parse type differences. Please note that you' still lose precision on decimals with default settings. +Note: `dlt` will still use the reflected source database types to create destination tables. It is up to the destination to reconcile / parse type differences. Please note that you'll still lose precision on decimals with default settings. ```py """Uses unsw_flow dataset (~2mln rows, 25+ columns) to test connectorx speed""" @@ -409,26 +414,30 @@ info = pipeline.run( ) print(info) ``` -With dataset above and local postgres instance, connectorx is 2x faster than pyarrow backend. +With the dataset above and a local postgres instance, the connectorx backend is 2x faster than the pyarrow backend. -### Troubleshooting with backends +### Specific database notes #### Oracle -1. When using **oracledb** dialect in thin mode we are getting protocol errors. Use thick mode or **cx_oracle** (old) client. +1. When using the **oracledb** dialect in thin mode we are getting protocol errors. Use thick mode or **cx_oracle** (old) client. 2. Mind that **sqlalchemy** translates Oracle identifiers into lower case! Keep the default `dlt` naming convention (`snake_case`) when loading data. We'll support more naming conventions soon. -3. Connectorx is for some reason slower for Oracle than `pyarrow` backend. +3. Connectorx is for some reason slower for Oracle than the `pyarrow` backend. + +See [here](https://github.com/dlt-hub/sql_database_benchmarking/tree/main/oracledb#installing-and-setting-up-oracle-db) for information and code on setting up and benchmarking on Oracle. #### DB2 1. Mind that **sqlalchemy** translates DB2 identifiers into lower case! Keep the default `dlt` naming convention (`snake_case`) when loading data. We'll support more naming conventions soon. -2. DB2 `DOUBLE` type is mapped to `Numeric` SqlAlchemy type with default precision, still `float` python types are returned. That requires `dlt` to perform additional casts. The cost of the cast however is minuscule compared to the cost of reading rows from database +2. The DB2 type `DOUBLE` gets incorrectly mapped to the python type `float` (instead of the SqlAlchemy type `Numeric` with default precision). This requires `dlt` to perform additional casts. The cost of the cast, however, is minuscule compared to the cost of reading rows from database. + +See [here](https://github.com/dlt-hub/sql_database_benchmarking/tree/main/db2#installing-and-setting-up-db2) for information and code on setting up and benchmarking on db2. #### MySQL -1. SqlAlchemy dialect converts doubles to decimals, we disable that behavior via table adapter in our demo pipeline +1. The **SqlAlchemy** dialect converts doubles to decimals. (This can be disabled via the table adapter argument as shown in the code example [here](#pyarrow)) #### Postgres / MSSQL -No issues found. Postgres is the only backend where we observed 2x speedup with connector x. On other db systems it performs same as `pyarrrow` backend or slower. +No issues were found for these databases. Postgres is the only backend where we observed 2x speedup with connectorx (see [here](https://github.com/dlt-hub/sql_database_benchmarking/tree/main/postgres) for the benchmarking code). On other db systems it performs the same as (or some times worse than) the `pyarrrow` backend. -## Additional configurations +## Advanced configuration ### Incremental Loading @@ -445,63 +454,49 @@ Incremental loading uses a cursor column (e.g., timestamp or auto-incrementing I certain range. 1. **Order returned rows**. Set `row_order` to `asc` or `desc` to order returned rows. -#### Example -1. Consider a table with a `last_modified` timestamp column. By setting this column as your cursor and specifying an - initial value, the loader generates a SQL query filtering rows with `last_modified` values greater than the specified initial value. +#### Examples +1. Incremental loading with the resource `sql_table` + Conside a table "family" with a timestamp column "last_modified" that indicates when a row was last modified. To ensure that only rows modified after midnight (00:00:00) on January 1, 2024, are loaded, you would set "last_modified" timestamp as the cursor as follows: ```py from sql_database import sql_table from datetime import datetime # Example: Incrementally loading a table based on a timestamp column table = sql_table( - table='your_table_name', + table='family', incremental=dlt.sources.incremental( 'last_modified', # Cursor column name - initial_value=datetime(2024, 1, 1) # Initial cursor value + initial_value=pendulum.DateTime(2024, 1, 1, 0, 0, 0) # Initial cursor value ) ) info = pipeline.extract(table, write_disposition="merge") print(info) ``` - -1. To incrementally load the "family" table using the sql_database source method: - - ```py - source = sql_database().with_resources("family") - #using the "updated" field as an incremental field using initial value of January 1, 2022, at midnight - source.family.apply_hints(incremental=dlt.sources.incremental("updated"),initial_value=pendulum.DateTime(2022, 1, 1, 0, 0, 0)) - #running the pipeline - info = pipeline.run(source, write_disposition="merge") - print(info) - ``` - In this example, we load data from the `family` table, using the `updated` column for incremental loading. In the first run, the process loads all data starting from midnight (00:00:00) on January 1, 2022. Subsequent runs perform incremental loading, guided by the values in the `updated` field. - -1. To incrementally load the "family" table using the 'sql_table' resource. - - ```py - family = sql_table( - table="family", - incremental=dlt.sources.incremental( - "updated", initial_value=pendulum.datetime(2022, 1, 1, 0, 0, 0) - ), - ) - # Running the pipeline - info = pipeline.extract(family, write_disposition="merge") - print(info) - ``` - - This process initially loads all data from the `family` table starting at midnight on January 1, 2022. For later runs, it uses the `updated` field for incremental loading as well. + Behind the scene, the loader generates a SQL query filtering rows with `last_modified` values greater than the incremental value. In the first run, this is the initial value (midnight (00:00:00) January 1, 2024). + In subsequent runs, it is the latest value of "last_modified" that `dlt` stores in [state](https://dlthub.com/docs/general-usage/state). + +2. Incremental loading with the source `sql_database` + To achieve the same using the `sql_database` source, you would specify your cursor as follows: + + ```py + source = sql_database().with_resources("family") + #using the "last_modified" field as an incremental field using initial value of midnight January 1, 2024 + source.family.apply_hints(incremental=dlt.sources.incremental("updated"),initial_value=pendulum.DateTime(2024, 1, 1, 0, 0, 0)) + #running the pipeline + info = pipeline.run(source, write_disposition="merge") + print(info) + ``` :::info - * For merge write disposition, the source table needs a primary key, which `dlt` automatically sets up. + * For merge write disposition, the source table needs a primary key, which `dlt` automatically sets up. * `apply_hints` is a powerful method that enables schema modifications after resource creation, like adjusting write disposition and primary keys. You can choose from various tables and use `apply_hints` multiple times to create pipelines with merged, appended, or replaced resources. ::: ### Parallelize extraction -You can extract each table in a separate thread (no multiprocessing at this point). This will decrease loading time if your queries take time to execute or your network latency/speed is low. +You can extract each table in a separate thread (no multiprocessing at this point). This will decrease loading time if your queries take time to execute or your network latency/speed is low. To enable this, declare your sources/resources as follows: ```py database = sql_database().parallelize() table = sql_table().parallelize() @@ -517,29 +512,24 @@ The `reflection_level` argument controls how much information is reflected: - `reflection_level = "full"`: Column names, nullability, and data types are detected. For decimal types we always add precision and scale. **This is the default.** - `reflection_level = "full_with_precision"`: Column names, nullability, data types, and precision/scale are detected, also for types like text and binary. Integer sizes are set to bigint and to int for all other types. -If the SQL type is unknown or not supported by `dlt` the column is skipped when using the `pyarrow` backend. -In other backend the type is inferred from data regardless of `reflection_level`, this often works, some types are coerced to strings -and `dataclass` based values from sqlalchemy are inferred as `complex` (JSON in most destinations). - +If the SQL type is unknown or not supported by `dlt`, then, in the pyarrow backend, the column will be skipped, whereas in the other backends the type will be inferred directly from the data irrespective of the `reflection_level` specified. In the latter case, this often means that some types are coerced to strings and `dataclass` based values from sqlalchemy are inferred as `complex` (JSON in most destinations). :::tip -If you use **full** (and above) reflection level you may encounter a situation where the data returned by sql alchemy or pyarrow backend -does not match the reflected data types. Most common symptoms are: +If you use reflection level **full** / **full_with_precision** you may encounter a situation where the data returned by sqlalchemy or pyarrow backend does not match the reflected data types. Most common symptoms are: 1. The destination complains that it cannot cast one type to another for a certain column. For example `connector-x` returns TIME in nanoseconds and BigQuery sees it as bigint and fails to load. -2. You get `SchemaCorruptedException` or other coercion error during `normalize` step. +2. You get `SchemaCorruptedException` or other coercion error during the `normalize` step. In that case you may try **minimal** reflection level where all data types are inferred from the returned data. From our experience this prevents most of the coercion problems. ::: -You can also override the sql type by passing a `type_adapter_callback` function. -This function takes an `sqlalchemy` data type and returns a new type (or `None` to force the column to be inferred from the data). +You can also override the sql type by passing a `type_adapter_callback` function. This function takes a `sqlalchemy` data type as input and returns a new type (or `None` to force the column to be inferred from the data) as output. -This is useful for example when: +This is useful, for example, when: - You're loading a data type which is not supported by the destination (e.g. you need JSON type columns to be coerced to string) -- You're using an sqlalchemy dialect which uses custom types that don't inherit from standard sqlalchemy types. +- You're using a sqlalchemy dialect which uses custom types that don't inherit from standard sqlalchemy types. - For certain types you prefer `dlt` to infer data type from the data and you return `None` -Example, when loading timestamps from Snowflake you can make sure they translate to `timestamp` columns in the result schema: +Example, when loading timestamps from Snowflake, you ensure that they get translated into standard sqlalchemy `timestamp` columns in the resultant schema: ```py import dlt @@ -562,45 +552,49 @@ dlt.pipeline("demo").run(source) ``` ### Configuring with toml/environment variables -You are able to configure most of the arguments to `sql_database` and `sql_table` via toml files and environment variables. This is particularly useful with `sql_table` -because you can maintain a separate configuration for each table (below we show **secrets.toml** and **config.toml**, you are free to combine them into one.): -```toml -[sources.sql_database] -credentials="mssql+pyodbc://loader.database.windows.net/dlt_data?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server" -``` +You can set most of the arguments of `sql_database()` and `sql_table()` directly in the `.toml` files and/or as environment variables. `dlt` automatically injects these values into the pipeline script. -```toml -[sources.sql_database.chat_message] -backend="pandas" -chunk_size=1000 +This is particularly useful with `sql_table()` because you can maintain a separate configuration for each table (below we show **secrets.toml** and **config.toml**, you are free to combine them into one): -[sources.sql_database.chat_message.incremental] -cursor_path="updated_at" -``` -Example above will setup **backend** and **chunk_size** for a table with name **chat_message**. It will also enable incremental loading on a column named **updated_at**. -Table resource is instantiated as follows: -```py -table = sql_table(table="chat_message", schema="data") -``` +The examples below show how you can set arguments in any of the `.toml` files (`secrets.toml` or `config.toml`): +1. Specifying connection string: + ```toml + [sources.sql_database] + credentials="mssql+pyodbc://loader.database.windows.net/dlt_data?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server" + ``` +2. Setting parameters like backend, chunk_size, and incremental column for the table `chat_message`: + ```toml + [sources.sql_database.chat_message] + backend="pandas" + chunk_size=1000 -Similarly, you can configure `sql_database` source. -```toml -[sources.sql_database] -credentials="mssql+pyodbc://loader.database.windows.net/dlt_data?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server" -schema="data" -backend="pandas" -chunk_size=1000 - -[sources.sql_database.chat_message.incremental] -cursor_path="updated_at" -``` -Note that we are able to configure incremental loading per table, even if it is a part of a dlt source. Source below will extract data using **pandas** backend -with **chunk_size** 1000. **chat_message** table will load data incrementally using **updated_at** column. All other tables will load fully. -```py -database = sql_database() -``` + [sources.sql_database.chat_message.incremental] + cursor_path="updated_at" + ``` + This is especially useful with `sql_table()` in a situation where you may want to run this resource for multiple tables. Setting parameters like this would then give you a clean way of maintaing separate configurations for each table. + +3. Handling separate configurations for database and individual tables + When using the `sql_database()` source, you can separately configure the parameters for the database and for the individual tables. + ```toml + [sources.sql_database] + credentials="mssql+pyodbc://loader.database.windows.net/dlt_data?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server" + schema="data" + backend="pandas" + chunk_size=1000 + + [sources.sql_database.chat_message.incremental] + cursor_path="updated_at" + ``` -You can configure all the arguments this way (except adapter callback function). [Standard dlt rules apply](https://dlthub.com/docs/general-usage/credentials/configuration#configure-dlt-sources-and-resources). You can use environment variables [by translating the names properly](https://dlthub.com/docs/general-usage/credentials/config_providers#toml-vs-environment-variables) ie. + The resulting source created below will extract data using **pandas** backend with **chunk_size** 1000. The table **chat_message** will load data incrementally using **updated_at** column. All the other tables will not use incremental loading, and will instead load the full data. + + ```py + database = sql_database() + ``` + +You'll be able to configure all the arguments this way (except adapter callback function). [Standard dlt rules apply](https://dlthub.com/docs/general-usage/credentials/configuration#configure-dlt-sources-and-resources). + +It is also possible to set these arguments as environment variables [using the proper naming convention](https://dlthub.com/docs/general-usage/credentials/config_providers#toml-vs-environment-variables): ```sh SOURCES__SQL_DATABASE__CREDENTIALS="mssql+pyodbc://loader.database.windows.net/dlt_data?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server" SOURCES__SQL_DATABASE__BACKEND=pandas @@ -611,79 +605,51 @@ SOURCES__SQL_DATABASE__CHAT_MESSAGE__INCREMENTAL__CURSOR_PATH=updated_at ## Extended Usage ### Running on Airflow -When running on Airflow -1. Use `dlt` [Airflow Helper](../../walkthroughs/deploy-a-pipeline/deploy-with-airflow-composer.md#2-modify-dag-file) to create tasks from `sql_database` source. You should be able to run table extraction in parallel with `parallel-isolated` source->DAG conversion. +When running on Airflow: +1. Use the `dlt` [Airflow Helper](../../walkthroughs/deploy-a-pipeline/deploy-with-airflow-composer.md#2-modify-dag-file) to create tasks from the `sql_database` source. You should be able to run table extraction in parallel with `parallel-isolated` source->DAG conversion. 2. Reflect tables at runtime with `defer_table_reflect` argument. 3. Set `allow_external_schedulers` to load data using [Airflow intervals](../../general-usage/incremental-loading.md#using-airflow-schedule-for-backfill-and-incremental-loading). -### Transforming data before load -You have direct access to all resources (that represent tables) and you can modify hints, add python transforms, parallelize execution etc. as for any other -resource. Below we show you an example on how to pseudonymize the data before it is loaded by using deterministic hashing. - -1. Configure the pipeline by specifying the pipeline name, destination, and dataset as follows: - - ```py - pipeline = dlt.pipeline( - pipeline_name="rfam", # Use a custom name if desired - destination="duckdb", # Choose the appropriate destination (e.g., duckdb, redshift, post) - dataset_name="rfam_data" # Use a custom name if desired - ) - ``` - -1. Pass your credentials using any of the methods [described above](#add-credentials). - -1. To load the entire database, use the `sql_database` source as: +### Transforming the data before load +You have direct access to the extracted data through the resource objects (`sql_table()` or `sql_database().with_resource())`), each of which represents a single SQL table. These objects are generators that yield +individual rows of the table which can be modified by using custom python functions. These functions can be applied to the resource using `add_map`. + + +Examples: +1. Pseudonymizing data to hide personally identifiable information (PII) before loading it to the destination. (See [here](https://dlthub.com/docs/general-usage/customising-pipelines/pseudonymizing_columns) for more information on pseudonymizing data with `dlt`) + + + ```py + import hashlib + + def pseudonymize_name(doc): + ''' + Pseudonmyisation is a deterministic type of PII-obscuring + Its role is to allow identifying users by their hash, + without revealing the underlying info. + ''' + # add a constant salt to generate + salt = 'WI@N57%zZrmk#88c' + salted_string = doc['rfam_acc'] + salt + sh = hashlib.sha256() + sh.update(salted_string.encode()) + hashed_string = sh.digest().hex() + doc['rfam_acc'] = hashed_string + return doc - ```py - source = sql_database() - info = pipeline.run(source, write_disposition="replace") - print(info) - ``` - -1. If you just need the "family" table, use: - - ```py - source = sql_database().with_resources("family") - #running the pipeline - info = pipeline.run(source, write_disposition="replace") - print(info) - ``` - -1. To pseudonymize columns and hide personally identifiable information (PII), refer to the - [documentation](https://dlthub.com/docs/general-usage/customising-pipelines/pseudonymizing_columns). - As an example, here's how to pseudonymize the "rfam_acc" column in the "family" table: - - ```py - import hashlib - - def pseudonymize_name(doc): - ''' - Pseudonmyisation is a deterministic type of PII-obscuring - Its role is to allow identifying users by their hash, - without revealing the underlying info. - ''' - # add a constant salt to generate - salt = 'WI@N57%zZrmk#88c' - salted_string = doc['rfam_acc'] + salt - sh = hashlib.sha256() - sh.update(salted_string.encode()) - hashed_string = sh.digest().hex() - doc['rfam_acc'] = hashed_string - return doc - - pipeline = dlt.pipeline( - # Configure the pipeline - ) - # using sql_database source to load family table and pseudonymize the column "rfam_acc" - source = sql_database().with_resources("family") - # modify this source instance's resource - source = source.family.add_map(pseudonymize_name) - # Run the pipeline. For a large db this may take a while - info = pipeline.run(source, write_disposition="replace") - print(info) - ``` + pipeline = dlt.pipeline( + # Configure the pipeline + ) + # using sql_database source to load family table and pseudonymize the column "rfam_acc" + source = sql_database().with_resources("family") + # modify this source instance's resource + source = source.family.add_map(pseudonymize_name) + # Run the pipeline. For a large db this may take a while + info = pipeline.run(source, write_disposition="replace") + print(info) + ``` -1. To exclude columns, such as the "rfam_id" column from the "family" table before loading: +1. Excluding unnecessary columns before load ```py def remove_columns(doc): @@ -702,6 +668,4 @@ resource. Below we show you an example on how to pseudonymize the data before it print(info) ``` -1. Remember to keep the pipeline name and destination dataset name consistent. The pipeline name is crucial for retrieving the [state](https://dlthub.com/docs/general-usage/state) from the last run, which is essential for incremental loading. Altering these names could initiate a "[full_refresh](https://dlthub.com/docs/general-usage/pipeline#do-experiments-with-full-refresh)", interfering with the metadata tracking necessary for [incremental loads](https://dlthub.com/docs/general-usage/incremental-loading). - From 18fd096d870ec767f86eca47413dea91857aa08b Mon Sep 17 00:00:00 2001 From: Rahul Joshi Date: Mon, 5 Aug 2024 16:29:38 +0200 Subject: [PATCH 05/13] fixing formatting --- .../verified-sources/sql_database.md | 42 ++++++++++--------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md index f795f4a37d..d06cc9ea9a 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md @@ -126,6 +126,7 @@ Read more about sources and resources here: [General Usage: Source](../../genera 1. **Load all the tables from a database** Calling `sql_database()` loads all tables from the database. + ```py def load_entire_database() -> None: @@ -147,6 +148,7 @@ Calling `sql_database()` loads all tables from the database. ``` 2. **Load select tables from a database** Calling `sql_database().with_resources("family", "clan")` loads only the tables `"family"` and `"clan"` from the database. + ```py def load_select_tables_from_database() -> None: @@ -170,6 +172,7 @@ Calling `sql_database().with_resources("family", "clan")` loads only the tables 3. **Load a standalone table** Calling `sql_table(table="family")` fetches only the table `"family"` + ```py def load_select_tables_from_database() -> None: @@ -192,7 +195,8 @@ Calling `sql_table(table="family")` fetches only the table `"family"` ``` :::tip -We intend our sources to be fully hackable. Feel free to change the source code of the sources and resources to customize it to your needs. ::: +We intend our sources to be fully hackable. Feel free to change the source code of the sources and resources to customize it to your needs. +::: ## Configuring connection to the SQL database @@ -227,7 +231,8 @@ credentials = ConnectionStringCredentials( source = sql_table(credentials).with_resource("family") ``` -:::Note: It is recommended to configure credentials in `.dlt/secrets.toml` and to not include any sensitive information in the pipeline code::: +::: +Note: It is recommended to configure credentials in `.dlt/secrets.toml` and to not include any sensitive information in the pipeline code. ::: ### Other connection options #### Using SqlAlchemy Engine as credentials @@ -618,7 +623,6 @@ individual rows of the table which can be modified by using custom python functi Examples: 1. Pseudonymizing data to hide personally identifiable information (PII) before loading it to the destination. (See [here](https://dlthub.com/docs/general-usage/customising-pipelines/pseudonymizing_columns) for more information on pseudonymizing data with `dlt`) - ```py import hashlib @@ -649,23 +653,23 @@ Examples: print(info) ``` -1. Excluding unnecessary columns before load +2. Excluding unnecessary columns before load - ```py - def remove_columns(doc): - del doc["rfam_id"] - return doc + ```py + def remove_columns(doc): + del doc["rfam_id"] + return doc - pipeline = dlt.pipeline( - # Configure the pipeline - ) - # using sql_database source to load family table and remove the column "rfam_id" - source = sql_database().with_resources("family") - # modify this source instance's resource - source = source.family.add_map(remove_columns) - # Run the pipeline. For a large db this may take a while - info = pipeline.run(source, write_disposition="replace") - print(info) - ``` + pipeline = dlt.pipeline( + # Configure the pipeline + ) + # using sql_database source to load family table and remove the column "rfam_id" + source = sql_database().with_resources("family") + # modify this source instance's resource + source = source.family.add_map(remove_columns) + # Run the pipeline. For a large db this may take a while + info = pipeline.run(source, write_disposition="replace") + print(info) + ``` From cbca233bc7a35f32d6ab7211d76e58aa9b7700e4 Mon Sep 17 00:00:00 2001 From: Rahul Joshi Date: Mon, 5 Aug 2024 16:48:14 +0200 Subject: [PATCH 06/13] fixing code formatting --- .../docs/dlt-ecosystem/verified-sources/sql_database.md | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md index d06cc9ea9a..0c721fafe8 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md @@ -125,7 +125,7 @@ Read more about sources and resources here: [General Usage: Source](../../genera #### Examples: 1. **Load all the tables from a database** -Calling `sql_database()` loads all tables from the database. +Calling `sql_database()` loads all tables from the database. ```py def load_entire_database() -> None: @@ -145,9 +145,10 @@ Calling `sql_database()` loads all tables from the database. # Print load info print(info) - ``` + ``` + 2. **Load select tables from a database** -Calling `sql_database().with_resources("family", "clan")` loads only the tables `"family"` and `"clan"` from the database. +Calling `sql_database().with_resources("family", "clan")` loads only the tables `"family"` and `"clan"` from the database. ```py def load_select_tables_from_database() -> None: @@ -168,7 +169,7 @@ Calling `sql_database().with_resources("family", "clan")` loads only the tables # Print load info print(info) - ``` + ``` 3. **Load a standalone table** Calling `sql_table(table="family")` fetches only the table `"family"` From 0c6c6433f58af808a81b7ab873cf1b338f2f32a7 Mon Sep 17 00:00:00 2001 From: Rahul Joshi Date: Mon, 5 Aug 2024 17:03:27 +0200 Subject: [PATCH 07/13] fixing indentation --- .../docs/dlt-ecosystem/verified-sources/sql_database.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md index 0c721fafe8..aced65384d 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md @@ -125,7 +125,7 @@ Read more about sources and resources here: [General Usage: Source](../../genera #### Examples: 1. **Load all the tables from a database** -Calling `sql_database()` loads all tables from the database. + Calling `sql_database()` loads all tables from the database. ```py def load_entire_database() -> None: @@ -148,7 +148,7 @@ Calling `sql_database()` loads all tables from the database. ``` 2. **Load select tables from a database** -Calling `sql_database().with_resources("family", "clan")` loads only the tables `"family"` and `"clan"` from the database. + Calling `sql_database().with_resources("family", "clan")` loads only the tables `"family"` and `"clan"` from the database. ```py def load_select_tables_from_database() -> None: @@ -172,7 +172,7 @@ Calling `sql_database().with_resources("family", "clan")` loads only the tables ``` 3. **Load a standalone table** -Calling `sql_table(table="family")` fetches only the table `"family"` + Calling `sql_table(table="family")` fetches only the table `"family"` ```py def load_select_tables_from_database() -> None: From 882bcdebf9ad62ea810b7642aa2d32fd98a199ae Mon Sep 17 00:00:00 2001 From: rahuljo Date: Mon, 9 Sep 2024 09:53:53 +0200 Subject: [PATCH 08/13] modifying based on comments and splitting into multiple pages --- .../verified-sources/sql_database.md | 676 ------------------ .../verified-sources/sql_database/advanced.md | 174 +++++ .../sql_database/configuration.md | 292 ++++++++ .../verified-sources/sql_database/index.md | 51 ++ .../verified-sources/sql_database/setup.md | 76 ++ .../sql_database/troubleshooting.md | 89 +++ .../verified-sources/sql_database/usage.md | 102 +++ docs/website/sidebars.js | 16 +- 8 files changed, 799 insertions(+), 677 deletions(-) delete mode 100644 docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md create mode 100644 docs/website/docs/dlt-ecosystem/verified-sources/sql_database/advanced.md create mode 100644 docs/website/docs/dlt-ecosystem/verified-sources/sql_database/configuration.md create mode 100644 docs/website/docs/dlt-ecosystem/verified-sources/sql_database/index.md create mode 100644 docs/website/docs/dlt-ecosystem/verified-sources/sql_database/setup.md create mode 100644 docs/website/docs/dlt-ecosystem/verified-sources/sql_database/troubleshooting.md create mode 100644 docs/website/docs/dlt-ecosystem/verified-sources/sql_database/usage.md diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md deleted file mode 100644 index aced65384d..0000000000 --- a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md +++ /dev/null @@ -1,676 +0,0 @@ ---- -title: 30+ SQL Databases -description: dlt pipeline for SQL Database -keywords: [sql connector, sql database pipeline, sql database] ---- -import Header from './_source-info-header.md'; - -# 30+ SQL Databases - -
- -SQL databases are management systems (DBMS) that store data in a structured format, commonly used -for efficient and reliable data retrieval. - -Our SQL Database verified source loads data to your specified destination using SQLAlchemy, pyarrow, pandas, or ConnectorX - -:::tip -View the pipeline example [here](https://github.com/dlt-hub/verified-sources/blob/master/sources/sql_database_pipeline.py). -::: - -Sources and resources that can be loaded using this verified source are: - -| Name | Description | -| ------------ | -------------------------------------------------------------------- | -| sql_database | Reflects the tables and views in SQL database and retrieves the data | -| sql_table | Retrieves data from a particular SQL database table | -| | | - -### Supported databases - -We support all [SQLAlchemy dialects](https://docs.sqlalchemy.org/en/20/dialects/), which include, but are not limited to, the following database engines: - -* [PostgreSQL](#postgres--mssql) -* [MySQL](#mysql) -* SQLite -* [Oracle](#oracle) -* [Microsoft SQL Server](#postgres--mssql) -* MariaDB -* [IBM DB2 and Informix](#db2) -* Google BigQuery -* Snowflake -* Redshift -* Apache Hive and Presto -* SAP Hana -* CockroachDB -* Firebird -* Teradata Vantage - -:::note -Note that there many unofficial dialects, such as [DuckDB](https://duckdb.org/). -::: - -## Setup - -To connect to your SQL database using `dlt` follow these steps: - -1. Initialize a `dlt` project in the current working directory by running the following command: - - ```sh - dlt init sql_database duckdb - ``` - - This will add necessary files and configurations for a `dlt` pipeline with SQL database as the [source](../../general-usage/source) and - [DuckDB](../destinations/duckdb.md) as the [destination](../destinations). - - :::tip - If you'd like to use a different destination, simply replace `duckdb` with the name of your preferred [destination](../destinations). - ::: - -2. Add credentials for your SQL database - - To connect to your SQL database, `dlt` would need to authenticate using necessary credentials. To enable this, paste your credentials in the `secrets.toml` file created inside the `.dlt/` folder in the following format: - ```toml - [sources.sql_database.credentials] - drivername = "mysql+pymysql" # driver name for the database - database = "Rfam" # database name - username = "rfamro" # username associated with the database - host = "mysql-rfam-public.ebi.ac.uk" # host address - port = "4497" # port required for connection - ``` - - Alternatively, you can also authenticate using connection strings: - ```toml - [sources.sql_database.credentials] - credentials="mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam" - ``` - - To learn more about how to pass credentials into your `dlt` pipeline see [here](../../walkthroughs/add_credentials.md). - -3. Add credentials for your destination (if necessary) - - Depending on which [destination](../destinations) you're loading into, you might also need to add your destination credentials. For more information read the [General Usage: Credentials.](../../general-usage/credentials) - -4. Install any necessary dependencies - - ```sh - pip install -r requirements.txt - ``` - -5. Run the pipeline - - ```sh - python sql_database_pipeline.py - ``` - - -6. Make sure everything is loaded as expected with - ```sh - dlt pipeline show - ``` - - :::note - The pipeline_name for the above example is `rfam`, you may also use any - custom name instead. ::: - - -## How to use - -The SQL Database verified source has two in-built sources and resources: -1. `sql_database`: a `dlt` source which can be used to load multiple tables and views from a SQL database -2. `sql_table`: a `dlt` resource that loads a single table from the SQL database - -Read more about sources and resources here: [General Usage: Source](../../general-usage/source.md) and [General Usage: Resource](../../general-usage/resource.md). - -#### Examples: - -1. **Load all the tables from a database** - Calling `sql_database()` loads all tables from the database. - - ```py - def load_entire_database() -> None: - - # Define the pipeline - pipeline = dlt.pipeline( - pipeline_name="rfam", - destination='synapse', - dataset_name="rfam_data" - ) - - # Fetch all the tables from the database - source = sql_database() - - # Run the pipeline - info = pipeline.run(source, write_disposition="replace") - - # Print load info - print(info) - ``` - -2. **Load select tables from a database** - Calling `sql_database().with_resources("family", "clan")` loads only the tables `"family"` and `"clan"` from the database. - - ```py - def load_select_tables_from_database() -> None: - - # Define the pipeline - pipeline = dlt.pipeline( - pipeline_name="rfam", - destination="postgres", - dataset_name="rfam_data" - ) - - # Fetch tables "family" and "clan" - source = sql_database().with_resources("family", "clan") - - # Run the pipeline - info = pipeline.run(source) - - # Print load info - print(info) - - ``` - -3. **Load a standalone table** - Calling `sql_table(table="family")` fetches only the table `"family"` - - ```py - def load_select_tables_from_database() -> None: - - # Define the pipeline - pipeline = dlt.pipeline( - pipeline_name="rfam", - destination="duckdb", - dataset_name="rfam_data" - ) - - # Fetch the table "family" - table = sql_table(table="family") - - # Run the pipeline - info = pipeline.run(table) - - # Print load info - print(info) - - ``` - -:::tip -We intend our sources to be fully hackable. Feel free to change the source code of the sources and resources to customize it to your needs. -::: - - -## Configuring connection to the SQL database - -### Connection string format -`sql_database` uses SQLAlchemy to create database connections and reflect table schemas. You can pass credentials using -[database urls](https://docs.sqlalchemy.org/en/20/core/engines.html#database-urls). For example, to use the `pymysql` dialect to connect to a `myssql` database `Rfam` with user name `rfmaro` password `PWD` host `mysql-rfam-public.ebi.ac.uk` and port `4497`, you would construct your connection string as follows: - -"mysql+pymysql://rfamro:PWD@mysql-rfam-public.ebi.ac.uk:4497/Rfam"` - -Database-specific drivers can be passed into the connection string using query parameters. For example, to connect to Microsoft SQL Server using the ODBC Driver, you would need to pass the driver as a query parameter as follows: - -"mssql+pyodbc://username:password@server/database?driver=ODBC+Driver+17+for+SQL+Server" - - -### Adding credentials to the `dlt` pipeline - -#### Setting them in `secrets.toml` or as environment variables (Recommended) - -By default, `dlt` looks for credentials inside `.dlt/secrets.toml` or in the environment variables. See Step 2 of the [setup](#setup) for how to set credentials inside `secrets.toml`. For more information on passing credentials through `.toml` or as enviroment variables, read [here](../../walkthroughs/add_credentials.md). - - -#### Passing them directly in the script -It is also possible to explicitly pass credentials inside the source. Example: -```py -from dlt.sources.credentials import ConnectionStringCredentials -from sql_database import sql_table - -credentials = ConnectionStringCredentials( - "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam" -) - -source = sql_table(credentials).with_resource("family") -``` -::: -Note: It is recommended to configure credentials in `.dlt/secrets.toml` and to not include any sensitive information in the pipeline code. ::: - -### Other connection options -#### Using SqlAlchemy Engine as credentials -You are able to pass an instance of SqlAlchemy Engine instead of credentials: -```py -from sqlalchemy import create_engine - -engine = create_engine("mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam") -table = sql_table(engine, table="chat_message", schema="data") -``` -This engine is used by `dlt` to open database connections and can work across multiple threads so is compatible with `parallelize` setting of dlt sources and resources. - -#### Connect to mysql with SSL -Here, we use the `mysql` and `pymysql` dialects to set up an SSL connection to a server, with all information taken from the [SQLAlchemy docs](https://docs.sqlalchemy.org/en/14/dialects/mysql.html#ssl-connections). - -1. To enforce SSL on the client without a client certificate you may pass the following DSN: - - ```toml - sources.sql_database.credentials="mysql+pymysql://root:@:3306/mysql?ssl_ca=" - ``` - -1. You can also pass the server's public certificate (potentially bundled with your pipeline) and disable host name checks: - - ```toml - sources.sql_database.credentials="mysql+pymysql://root:@:3306/mysql?ssl_ca=server-ca.pem&ssl_check_hostname=false" - ``` - -1. For servers requiring a client certificate, provide the client's private key (a secret value). In Airflow, this is usually saved as a variable and exported to a file before use. The server certificate is omitted in the example below: - - ```toml - sources.sql_database.credentials="mysql+pymysql://root:@35.203.96.191:3306/mysql?ssl_ca=&ssl_cert=client-cert.pem&ssl_key=client-key.pem" - ``` - -#### SQL Server connection options - -**To connect to an `mssql` server using Windows authentication**, include `trusted_connection=yes` in the connection string. - -```toml -sources.sql_database.credentials="mssql+pyodbc://loader.database.windows.net/dlt_data?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server" -``` - -**To connect to a local sql server instance running without SSL** pass `encrypt=no` parameter: -```toml -sources.sql_database.credentials="mssql+pyodbc://loader:loader@localhost/dlt_data?encrypt=no&driver=ODBC+Driver+17+for+SQL+Server" -``` - -**To allow self signed SSL certificate** when you are getting `certificate verify failed:unable to get local issuer certificate`: -```toml -sources.sql_database.credentials="mssql+pyodbc://loader:loader@localhost/dlt_data?TrustServerCertificate=yes&driver=ODBC+Driver+17+for+SQL+Server" -``` - -**To use long strings (>8k) and avoid collation errors**: -```toml -sources.sql_database.credentials="mssql+pyodbc://loader:loader@localhost/dlt_data?LongAsMax=yes&driver=ODBC+Driver+17+for+SQL+Server" -``` - -## Configuring the backend - -Table backends convert streams of rows from database tables into batches in various formats. The default backend **sqlalchemy** follows standard `dlt` behavior of -extracting and normalizing Python dictionaries. We recommend this for smaller tables, initial development work, and when minimal dependencies or a pure Python environment is required. This backend is also the slowest. Other backends make use of the structured data format of the tables and provide significant improvement in speeds. For example, the **pyarrow** backend converts rows into `arrow` tables, which results in -good performance and preserves exact database types. We recommend using this backend for larger tables. - -### **sqlalchemy** - -The **sqlalchemy** backend (the default) yields table data as a list of Python dictionaries. This data goes through regular extract -and normalize steps and does not require additional dependencies to be installed. It is the most robust (works with any destination, correctly represents data types) but also the slowest. You can set `reflection_level="full_with_precision"` to pass exact database types to `dlt` schema. - -### **pyarrow** - -The **pyarrow** backend yields data as Arrow tables. It uses **SqlAlchemy** to read rows in batches but then immediately converts them into `ndarray`, transposes it, and sets it as columns in an arrow table. This backend always fully -reflects the database table and preserves original types (i.e. **decimal** / **numeric** data will be extracted without loss of precision). If the destination loads parquet files, this backend will skip `dlt` normalizer and you can gain two orders of magnitude (20x - 30x) speed increase. - -Note that if **pandas** is installed, we'll use it to convert SqlAlchemy tuples into **ndarray** as it seems to be 20-30% faster than using **numpy** directly. - -```py -import sqlalchemy as sa -pipeline = dlt.pipeline( - pipeline_name="rfam_cx", destination="postgres", dataset_name="rfam_data_arrow" -) - -def _double_as_decimal_adapter(table: sa.Table) -> None: - """Emits decimals instead of floats.""" - for column in table.columns.values(): - if isinstance(column.type, sa.Float): - column.type.asdecimal = False - -sql_alchemy_source = sql_database( - "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam?&binary_prefix=true", - backend="pyarrow", - table_adapter_callback=_double_as_decimal_adapter -).with_resources("family", "genome") - -info = pipeline.run(sql_alchemy_source) -print(info) -``` - -### **pandas** - -The **pandas** backend yields data as data frames using the `pandas.io.sql` module. `dlt` uses **pyarrow** dtypes by default as they generate more stable typing. - -With the default settings, several database types will be coerced to dtypes in the yielded data frame: -* **decimal** is mapped to doubles so it is possible to lose precision -* **date** and **time** are mapped to strings -* all types are nullable - -Note: `dlt` will still use the reflected source database types to create destination tables. It is up to the destination to reconcile / parse -type differences. Most of the destinations will be able to parse date/time strings and convert doubles into decimals (Please note that you'll still lose precision on decimals with default settings.). **However we strongly suggest -not to use pandas backend if your source tables contain date, time or decimal columns** - -To adjust [backend-specific settings,](https://pandas.pydata.org/docs/reference/api/pandas.read_sql_table.html) pass it in the `backend_kwargs` parameter. For example, below we set `coerce_float` to `False`: - -```py -import sqlalchemy as sa -pipeline = dlt.pipeline( - pipeline_name="rfam_cx", destination="postgres", dataset_name="rfam_data_pandas_2" -) - -def _double_as_decimal_adapter(table: sa.Table) -> None: - """Emits decimals instead of floats.""" - for column in table.columns.values(): - if isinstance(column.type, sa.Float): - column.type.asdecimal = True - -sql_alchemy_source = sql_database( - "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam?&binary_prefix=true", - backend="pandas", - table_adapter_callback=_double_as_decimal_adapter, - chunk_size=100000, - # set coerce_float to False to represent them as string - backend_kwargs={"coerce_float": False, "dtype_backend": "numpy_nullable"}, -).with_resources("family", "genome") - -info = pipeline.run(sql_alchemy_source) -print(info) -``` - -### **connectorx** -The [connectorx](https://sfu-db.github.io/connector-x/intro.html) backend completely skips **sqlalchemy** when reading table rows, in favor of doing that in rust. This is claimed to be significantly faster than any other method (validated only on postgres). With the default settings it will emit **pyarrow** tables, but you can configure it via **backend_kwargs**. - -There are certain limitations when using this backend: -* it will ignore `chunk_size`. **connectorx** cannot yield data in batches. -* in many cases it requires a connection string that differs from the **sqlalchemy** connection string. Use the `conn` argument in **backend_kwargs** to set this. -* it will convert **decimals** to **doubles**, so you'll will lose precision. -* nullability of the columns is ignored (always true) -* it uses different database type mappings for each database type. [check here for more details](https://sfu-db.github.io/connector-x/databases.html) -* JSON fields (at least those coming from postgres) are double wrapped in strings. To unwrap this, you can pass the in-built transformation function `unwrap_json_connector_x` (for example, with `add_map`): - - ```py - from sources.sql_database.helpers import unwrap_json_connector_x - ``` - -Note: `dlt` will still use the reflected source database types to create destination tables. It is up to the destination to reconcile / parse type differences. Please note that you'll still lose precision on decimals with default settings. - -```py -"""Uses unsw_flow dataset (~2mln rows, 25+ columns) to test connectorx speed""" -import os -from dlt.destinations import filesystem - -unsw_table = sql_table( - "postgresql://loader:loader@localhost:5432/dlt_data", - "unsw_flow_7", - "speed_test", - # this is ignored by connectorx - chunk_size=100000, - backend="connectorx", - # keep source data types - reflection_level="full_with_precision", - # just to demonstrate how to setup a separate connection string for connectorx - backend_kwargs={"conn": "postgresql://loader:loader@localhost:5432/dlt_data"} -) - -pipeline = dlt.pipeline( - pipeline_name="unsw_download", - destination=filesystem(os.path.abspath("../_storage/unsw")), - progress="log", - dev_mode=True, -) - -info = pipeline.run( - unsw_table, - dataset_name="speed_test", - table_name="unsw_flow", - loader_file_format="parquet", -) -print(info) -``` -With the dataset above and a local postgres instance, the connectorx backend is 2x faster than the pyarrow backend. - -### Specific database notes - -#### Oracle -1. When using the **oracledb** dialect in thin mode we are getting protocol errors. Use thick mode or **cx_oracle** (old) client. -2. Mind that **sqlalchemy** translates Oracle identifiers into lower case! Keep the default `dlt` naming convention (`snake_case`) when loading data. We'll support more naming conventions soon. -3. Connectorx is for some reason slower for Oracle than the `pyarrow` backend. - -See [here](https://github.com/dlt-hub/sql_database_benchmarking/tree/main/oracledb#installing-and-setting-up-oracle-db) for information and code on setting up and benchmarking on Oracle. - -#### DB2 -1. Mind that **sqlalchemy** translates DB2 identifiers into lower case! Keep the default `dlt` naming convention (`snake_case`) when loading data. We'll support more naming conventions soon. -2. The DB2 type `DOUBLE` gets incorrectly mapped to the python type `float` (instead of the SqlAlchemy type `Numeric` with default precision). This requires `dlt` to perform additional casts. The cost of the cast, however, is minuscule compared to the cost of reading rows from database. - -See [here](https://github.com/dlt-hub/sql_database_benchmarking/tree/main/db2#installing-and-setting-up-db2) for information and code on setting up and benchmarking on db2. - -#### MySQL -1. The **SqlAlchemy** dialect converts doubles to decimals. (This can be disabled via the table adapter argument as shown in the code example [here](#pyarrow)) - -#### Postgres / MSSQL -No issues were found for these databases. Postgres is the only backend where we observed 2x speedup with connectorx (see [here](https://github.com/dlt-hub/sql_database_benchmarking/tree/main/postgres) for the benchmarking code). On other db systems it performs the same as (or some times worse than) the `pyarrrow` backend. - -## Advanced configuration - -### Incremental Loading - -Efficient data management often requires loading only new or updated data from your SQL databases, rather than reprocessing the entire dataset. This is where incremental loading comes into play. - -Incremental loading uses a cursor column (e.g., timestamp or auto-incrementing ID) to load only data newer than a specified initial value, enhancing efficiency by reducing processing time and resource use. - - -#### How to configure -1. **Choose a Cursor Column**: Identify a column in your SQL table that can serve as a reliable indicator of new or updated rows. Common choices include timestamp columns or auto-incrementing IDs. -1. **Set an Initial Value**: Choose a starting value for the cursor to begin loading data. This could be a specific timestamp or ID from which you wish to start loading data. -1. **Deduplication**: When using incremental loading, the system automatically handles the deduplication of rows based on the primary key (if available) or row hash for tables without a primary key. -1. **Set end_value for backfill**: Set `end_value` if you want to backfill data from -certain range. -1. **Order returned rows**. Set `row_order` to `asc` or `desc` to order returned rows. - -#### Examples - -1. Incremental loading with the resource `sql_table` - Conside a table "family" with a timestamp column "last_modified" that indicates when a row was last modified. To ensure that only rows modified after midnight (00:00:00) on January 1, 2024, are loaded, you would set "last_modified" timestamp as the cursor as follows: - ```py - from sql_database import sql_table - from datetime import datetime - - # Example: Incrementally loading a table based on a timestamp column - table = sql_table( - table='family', - incremental=dlt.sources.incremental( - 'last_modified', # Cursor column name - initial_value=pendulum.DateTime(2024, 1, 1, 0, 0, 0) # Initial cursor value - ) - ) - - info = pipeline.extract(table, write_disposition="merge") - print(info) - ``` - Behind the scene, the loader generates a SQL query filtering rows with `last_modified` values greater than the incremental value. In the first run, this is the initial value (midnight (00:00:00) January 1, 2024). - In subsequent runs, it is the latest value of "last_modified" that `dlt` stores in [state](https://dlthub.com/docs/general-usage/state). - -2. Incremental loading with the source `sql_database` - To achieve the same using the `sql_database` source, you would specify your cursor as follows: - - ```py - source = sql_database().with_resources("family") - #using the "last_modified" field as an incremental field using initial value of midnight January 1, 2024 - source.family.apply_hints(incremental=dlt.sources.incremental("updated"),initial_value=pendulum.DateTime(2024, 1, 1, 0, 0, 0)) - #running the pipeline - info = pipeline.run(source, write_disposition="merge") - print(info) - ``` - - :::info - * For merge write disposition, the source table needs a primary key, which `dlt` automatically sets up. - * `apply_hints` is a powerful method that enables schema modifications after resource creation, like adjusting write disposition and primary keys. You can choose from various tables and use `apply_hints` multiple times to create pipelines with merged, appended, or replaced resources. - ::: - -### Parallelize extraction - -You can extract each table in a separate thread (no multiprocessing at this point). This will decrease loading time if your queries take time to execute or your network latency/speed is low. To enable this, declare your sources/resources as follows: -```py -database = sql_database().parallelize() -table = sql_table().parallelize() -``` - -### Column reflection -Columns and their data types are reflected with SQLAlchemy. The SQL types are then mapped to `dlt` types. -Most types are supported. - -The `reflection_level` argument controls how much information is reflected: - -- `reflection_level = "minimal"`: Only column names and nullability are detected. Data types are inferred from the data. -- `reflection_level = "full"`: Column names, nullability, and data types are detected. For decimal types we always add precision and scale. **This is the default.** -- `reflection_level = "full_with_precision"`: Column names, nullability, data types, and precision/scale are detected, also for types like text and binary. Integer sizes are set to bigint and to int for all other types. - -If the SQL type is unknown or not supported by `dlt`, then, in the pyarrow backend, the column will be skipped, whereas in the other backends the type will be inferred directly from the data irrespective of the `reflection_level` specified. In the latter case, this often means that some types are coerced to strings and `dataclass` based values from sqlalchemy are inferred as `complex` (JSON in most destinations). -:::tip -If you use reflection level **full** / **full_with_precision** you may encounter a situation where the data returned by sqlalchemy or pyarrow backend does not match the reflected data types. Most common symptoms are: -1. The destination complains that it cannot cast one type to another for a certain column. For example `connector-x` returns TIME in nanoseconds -and BigQuery sees it as bigint and fails to load. -2. You get `SchemaCorruptedException` or other coercion error during the `normalize` step. -In that case you may try **minimal** reflection level where all data types are inferred from the returned data. From our experience this prevents -most of the coercion problems. -::: - -You can also override the sql type by passing a `type_adapter_callback` function. This function takes a `sqlalchemy` data type as input and returns a new type (or `None` to force the column to be inferred from the data) as output. - -This is useful, for example, when: -- You're loading a data type which is not supported by the destination (e.g. you need JSON type columns to be coerced to string) -- You're using a sqlalchemy dialect which uses custom types that don't inherit from standard sqlalchemy types. -- For certain types you prefer `dlt` to infer data type from the data and you return `None` - -Example, when loading timestamps from Snowflake, you ensure that they get translated into standard sqlalchemy `timestamp` columns in the resultant schema: - -```py -import dlt -from snowflake.sqlalchemy import TIMESTAMP_NTZ -import sqlalchemy as sa - -def type_adapter_callback(sql_type): - if isinstance(sql_type, TIMESTAMP_NTZ): # Snowflake does not inherit from sa.DateTime - return sa.DateTime(timezone=True) - return sql_type # Use default detection for other types - -source = sql_database( - "snowflake://user:password@account/database?&warehouse=WH_123", - reflection_level="full", - type_adapter_callback=type_adapter_callback, - backend="pyarrow" -) - -dlt.pipeline("demo").run(source) -``` - -### Configuring with toml/environment variables -You can set most of the arguments of `sql_database()` and `sql_table()` directly in the `.toml` files and/or as environment variables. `dlt` automatically injects these values into the pipeline script. - -This is particularly useful with `sql_table()` because you can maintain a separate configuration for each table (below we show **secrets.toml** and **config.toml**, you are free to combine them into one): - -The examples below show how you can set arguments in any of the `.toml` files (`secrets.toml` or `config.toml`): -1. Specifying connection string: - ```toml - [sources.sql_database] - credentials="mssql+pyodbc://loader.database.windows.net/dlt_data?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server" - ``` -2. Setting parameters like backend, chunk_size, and incremental column for the table `chat_message`: - ```toml - [sources.sql_database.chat_message] - backend="pandas" - chunk_size=1000 - - [sources.sql_database.chat_message.incremental] - cursor_path="updated_at" - ``` - This is especially useful with `sql_table()` in a situation where you may want to run this resource for multiple tables. Setting parameters like this would then give you a clean way of maintaing separate configurations for each table. - -3. Handling separate configurations for database and individual tables - When using the `sql_database()` source, you can separately configure the parameters for the database and for the individual tables. - ```toml - [sources.sql_database] - credentials="mssql+pyodbc://loader.database.windows.net/dlt_data?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server" - schema="data" - backend="pandas" - chunk_size=1000 - - [sources.sql_database.chat_message.incremental] - cursor_path="updated_at" - ``` - - The resulting source created below will extract data using **pandas** backend with **chunk_size** 1000. The table **chat_message** will load data incrementally using **updated_at** column. All the other tables will not use incremental loading, and will instead load the full data. - - ```py - database = sql_database() - ``` - -You'll be able to configure all the arguments this way (except adapter callback function). [Standard dlt rules apply](https://dlthub.com/docs/general-usage/credentials/configuration#configure-dlt-sources-and-resources). - -It is also possible to set these arguments as environment variables [using the proper naming convention](https://dlthub.com/docs/general-usage/credentials/config_providers#toml-vs-environment-variables): -```sh -SOURCES__SQL_DATABASE__CREDENTIALS="mssql+pyodbc://loader.database.windows.net/dlt_data?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server" -SOURCES__SQL_DATABASE__BACKEND=pandas -SOURCES__SQL_DATABASE__CHUNK_SIZE=1000 -SOURCES__SQL_DATABASE__CHAT_MESSAGE__INCREMENTAL__CURSOR_PATH=updated_at -``` - -## Extended Usage - -### Running on Airflow -When running on Airflow: -1. Use the `dlt` [Airflow Helper](../../walkthroughs/deploy-a-pipeline/deploy-with-airflow-composer.md#2-modify-dag-file) to create tasks from the `sql_database` source. You should be able to run table extraction in parallel with `parallel-isolated` source->DAG conversion. -2. Reflect tables at runtime with `defer_table_reflect` argument. -3. Set `allow_external_schedulers` to load data using [Airflow intervals](../../general-usage/incremental-loading.md#using-airflow-schedule-for-backfill-and-incremental-loading). - -### Transforming the data before load -You have direct access to the extracted data through the resource objects (`sql_table()` or `sql_database().with_resource())`), each of which represents a single SQL table. These objects are generators that yield -individual rows of the table which can be modified by using custom python functions. These functions can be applied to the resource using `add_map`. - - -Examples: -1. Pseudonymizing data to hide personally identifiable information (PII) before loading it to the destination. (See [here](https://dlthub.com/docs/general-usage/customising-pipelines/pseudonymizing_columns) for more information on pseudonymizing data with `dlt`) - - ```py - import hashlib - - def pseudonymize_name(doc): - ''' - Pseudonmyisation is a deterministic type of PII-obscuring - Its role is to allow identifying users by their hash, - without revealing the underlying info. - ''' - # add a constant salt to generate - salt = 'WI@N57%zZrmk#88c' - salted_string = doc['rfam_acc'] + salt - sh = hashlib.sha256() - sh.update(salted_string.encode()) - hashed_string = sh.digest().hex() - doc['rfam_acc'] = hashed_string - return doc - - pipeline = dlt.pipeline( - # Configure the pipeline - ) - # using sql_database source to load family table and pseudonymize the column "rfam_acc" - source = sql_database().with_resources("family") - # modify this source instance's resource - source = source.family.add_map(pseudonymize_name) - # Run the pipeline. For a large db this may take a while - info = pipeline.run(source, write_disposition="replace") - print(info) - ``` - -2. Excluding unnecessary columns before load - - ```py - def remove_columns(doc): - del doc["rfam_id"] - return doc - - pipeline = dlt.pipeline( - # Configure the pipeline - ) - # using sql_database source to load family table and remove the column "rfam_id" - source = sql_database().with_resources("family") - # modify this source instance's resource - source = source.family.add_map(remove_columns) - # Run the pipeline. For a large db this may take a while - info = pipeline.run(source, write_disposition="replace") - print(info) - ``` - - diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/advanced.md b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/advanced.md new file mode 100644 index 0000000000..adb8bf80b8 --- /dev/null +++ b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/advanced.md @@ -0,0 +1,174 @@ +--- +title: Advanced +description: advance configuration and usage of the sql_database source +keywords: [sql connector, sql database pipeline, sql database] +--- + +import Header from '../_source-info-header.md'; + +# Advanced Usage + +
+ +## Incremental Loading + +Efficient data management often requires loading only new or updated data from your SQL databases, rather than reprocessing the entire dataset. This is where incremental loading comes into play. + +Incremental loading uses a cursor column (e.g., timestamp or auto-incrementing ID) to load only data newer than a specified initial value, enhancing efficiency by reducing processing time and resource use. Read [here](https://dlthub.com/docs/walkthroughs/sql-incremental-configuration) for more details on incremental loading with `dlt`. + + +#### How to configure +1. **Choose a Cursor Column**: Identify a column in your SQL table that can serve as a reliable indicator of new or updated rows. Common choices include timestamp columns or auto-incrementing IDs. +1. **Set an Initial Value**: Choose a starting value for the cursor to begin loading data. This could be a specific timestamp or ID from which you wish to start loading data. +1. **Deduplication**: When using incremental loading, the system automatically handles the deduplication of rows based on the primary key (if available) or row hash for tables without a primary key. +1. **Set end_value for backfill**: Set `end_value` if you want to backfill data from +certain range. +1. **Order returned rows**. Set `row_order` to `asc` or `desc` to order returned rows. + +#### Examples + +1. Incremental loading with the resource `sql_table` + Consider a table "family" with a timestamp column `last_modified` that indicates when a row was last modified. To ensure that only rows modified after midnight (00:00:00) on January 1, 2024, are loaded, you would set `last_modified` timestamp as the cursor as follows: + ```py + from sql_database import sql_table + from datetime import datetime + + # Example: Incrementally loading a table based on a timestamp column + table = sql_table( + table='family', + incremental=dlt.sources.incremental( + 'last_modified', # Cursor column name + initial_value=pendulum.DateTime(2024, 1, 1, 0, 0, 0) # Initial cursor value + ) + ) + + info = pipeline.extract(table, write_disposition="merge") + print(info) + ``` + Behind the scene, the loader generates a SQL query filtering rows with `last_modified` values greater than the incremental value. In the first run, this is the initial value (midnight (00:00:00) January 1, 2024). + In subsequent runs, it is the latest value of `last_modified` that `dlt` stores in [state](https://dlthub.com/docs/general-usage/state). + +2. Incremental loading with the source `sql_database` + To achieve the same using the `sql_database` source, you would specify your cursor as follows: + + ```py + source = sql_database().with_resources("family") + #using the "last_modified" field as an incremental field using initial value of midnight January 1, 2024 + source.family.apply_hints(incremental=dlt.sources.incremental("updated"),initial_value=pendulum.DateTime(2024, 1, 1, 0, 0, 0)) + #running the pipeline + info = pipeline.run(source, write_disposition="merge") + print(info) + ``` + + :::info + * For merge write disposition, the source table needs a primary key, which `dlt` automatically sets up. + * `apply_hints` is a powerful method that enables schema modifications after resource creation, like adjusting write disposition and primary keys. You can choose from various tables and use `apply_hints` multiple times to create pipelines with merged, appended, or replaced resources. + ::: + +## Parallelized extraction + +You can extract each table in a separate thread (no multiprocessing at this point). This will decrease loading time if your queries take time to execute or your network latency/speed is low. To enable this, declare your sources/resources as follows: +```py +database = sql_database().parallelize() +table = sql_table().parallelize() +``` + +## Column reflection +Columns and their data types are reflected with SQLAlchemy. The SQL types are then mapped to `dlt` types. +Depending on the selected backend, some of the types might require additional processing. + +The `reflection_level` argument controls how much information is reflected: + +- `reflection_level = "minimal"`: Only column names and nullability are detected. Data types are inferred from the data. +- `reflection_level = "full"`: Column names, nullability, and data types are detected. For decimal types we always add precision and scale. **This is the default.** +- `reflection_level = "full_with_precision"`: Column names, nullability, data types, and precision/scale are detected, also for types like text and binary. Integer sizes are set to bigint and to int for all other types. + +If the SQL type is unknown or not supported by `dlt`, then, in the pyarrow backend, the column will be skipped, whereas in the other backends the type will be inferred directly from the data irrespective of the `reflection_level` specified. In the latter case, this often means that some types are coerced to strings and `dataclass` based values from sqlalchemy are inferred as `complex` (JSON in most destinations). +:::tip +If you use reflection level **full** / **full_with_precision** you may encounter a situation where the data returned by sqlalchemy or pyarrow backend does not match the reflected data types. Most common symptoms are: +1. The destination complains that it cannot cast one type to another for a certain column. For example `connector-x` returns TIME in nanoseconds +and BigQuery sees it as bigint and fails to load. +2. You get `SchemaCorruptedException` or other coercion error during the `normalize` step. +In that case you may try **minimal** reflection level where all data types are inferred from the returned data. From our experience this prevents +most of the coercion problems. +::: + +You can also override the sql type by passing a `type_adapter_callback` function. This function takes a `SQLAlchemy` data type as input and returns a new type (or `None` to force the column to be inferred from the data) as output. + +This is useful, for example, when: +- You're loading a data type which is not supported by the destination (e.g. you need JSON type columns to be coerced to string) +- You're using a sqlalchemy dialect which uses custom types that don't inherit from standard sqlalchemy types. +- For certain types you prefer `dlt` to infer data type from the data and you return `None` + +In the following example, when loading timestamps from Snowflake, you ensure that they get translated into standard sqlalchemy `timestamp` columns in the resultant schema: + +```py +import dlt +from snowflake.sqlalchemy import TIMESTAMP_NTZ +import sqlalchemy as sa + +def type_adapter_callback(sql_type): + if isinstance(sql_type, TIMESTAMP_NTZ): # Snowflake does not inherit from sa.DateTime + return sa.DateTime(timezone=True) + return sql_type # Use default detection for other types + +source = sql_database( + "snowflake://user:password@account/database?&warehouse=WH_123", + reflection_level="full", + type_adapter_callback=type_adapter_callback, + backend="pyarrow" +) + +dlt.pipeline("demo").run(source) +``` + +## Configuring with toml/environment variables +You can set most of the arguments of `sql_database()` and `sql_table()` directly in the `.toml` files and/or as environment variables. `dlt` automatically injects these values into the pipeline script. + +This is particularly useful with `sql_table()` because you can maintain a separate configuration for each table (below we show **secrets.toml** and **config.toml**, you are free to combine them into one): + +The examples below show how you can set arguments in any of the `.toml` files (`secrets.toml` or `config.toml`): +1. Specifying connection string: + ```toml + [sources.sql_database] + credentials="mssql+pyodbc://loader.database.windows.net/dlt_data?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server" + ``` +2. Setting parameters like backend, chunk_size, and incremental column for the table `chat_message`: + ```toml + [sources.sql_database.chat_message] + backend="pandas" + chunk_size=1000 + + [sources.sql_database.chat_message.incremental] + cursor_path="updated_at" + ``` + This is especially useful with `sql_table()` in a situation where you may want to run this resource for multiple tables. Setting parameters like this would then give you a clean way of maintaing separate configurations for each table. + +3. Handling separate configurations for database and individual tables + When using the `sql_database()` source, you can separately configure the parameters for the database and for the individual tables. + ```toml + [sources.sql_database] + credentials="mssql+pyodbc://loader.database.windows.net/dlt_data?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server" + schema="data" + backend="pandas" + chunk_size=1000 + + [sources.sql_database.chat_message.incremental] + cursor_path="updated_at" + ``` + + The resulting source created below will extract data using **pandas** backend with **chunk_size** 1000. The table **chat_message** will load data incrementally using **updated_at** column. All the other tables will not use incremental loading, and will instead load the full data. + + ```py + database = sql_database() + ``` + +You'll be able to configure all the arguments this way (except adapter callback function). [Standard dlt rules apply](https://dlthub.com/docs/general-usage/credentials/configuration#configure-dlt-sources-and-resources). + +It is also possible to set these arguments as environment variables [using the proper naming convention](https://dlthub.com/docs/general-usage/credentials/config_providers#toml-vs-environment-variables): +```sh +SOURCES__SQL_DATABASE__CREDENTIALS="mssql+pyodbc://loader.database.windows.net/dlt_data?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server" +SOURCES__SQL_DATABASE__BACKEND=pandas +SOURCES__SQL_DATABASE__CHUNK_SIZE=1000 +SOURCES__SQL_DATABASE__CHAT_MESSAGE__INCREMENTAL__CURSOR_PATH=updated_at +``` diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/configuration.md b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/configuration.md new file mode 100644 index 0000000000..f77065fd79 --- /dev/null +++ b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/configuration.md @@ -0,0 +1,292 @@ +--- +title: Configuring the SQL Database source +description: configuring the pipeline script, connection, and backend settings in the sql_database source +keywords: [sql connector, sql database pipeline, sql database] +--- + +import Header from '../_source-info-header.md'; + +# Configuration + +
+ +## Configuring the SQL Database source + +`dlt` sources are python scripts made up of source and resource functions that can be easily customized. The SQL Database verified source has the following built-in source and resource: +1. `sql_database`: a `dlt` source which can be used to load multiple tables and views from a SQL database +2. `sql_table`: a `dlt` resource that loads a single table from the SQL database + +Read more about sources and resources here: [General usage: source](../../../general-usage/source.md) and [General usage: resource](../../../general-usage/resource.md). + +### Example usage: + +1. **Load all the tables from a database** + Calling `sql_database()` loads all tables from the database. + + ```py + def load_entire_database() -> None: + + # Define the pipeline + pipeline = dlt.pipeline( + pipeline_name="rfam", + destination='synapse', + dataset_name="rfam_data" + ) + + # Fetch all the tables from the database + source = sql_database() + + # Run the pipeline + info = pipeline.run(source, write_disposition="replace") + + # Print load info + print(info) + ``` + +2. **Load select tables from a database** + Calling `sql_database().with_resources("family", "clan")` loads only the tables `"family"` and `"clan"` from the database. + + ```py + def load_select_tables_from_database() -> None: + + # Define the pipeline + pipeline = dlt.pipeline( + pipeline_name="rfam", + destination="postgres", + dataset_name="rfam_data" + ) + + # Fetch tables "family" and "clan" + source = sql_database().with_resources("family", "clan") + + # Run the pipeline + info = pipeline.run(source) + + # Print load info + print(info) + + ``` + +3. **Load a standalone table** + Calling `sql_table(table="family")` fetches only the table `"family"` + + ```py + def load_select_tables_from_database() -> None: + + # Define the pipeline + pipeline = dlt.pipeline( + pipeline_name="rfam", + destination="duckdb", + dataset_name="rfam_data" + ) + + # Fetch the table "family" + table = sql_table(table="family") + + # Run the pipeline + info = pipeline.run(table) + + # Print load info + print(info) + + ``` + +:::tip +We intend our sources to be fully hackable. Feel free to change the source code of the sources and resources to customize it to your needs. +::: + + +## Configuring the connection + +### Connection string format +`sql_database` uses SQLAlchemy to create database connections and reflect table schemas. You can pass credentials using +[database urls](https://docs.sqlalchemy.org/en/20/core/engines.html#database-urls), which has the general format: + +```py +"dialect+database_type://username:password@server:port/database_name" +``` + +For example, to connect to a MySQL database using the `pymysql` dialect you can use the following connection string: +```py +"mysql+pymysql://rfamro:PWD@mysql-rfam-public.ebi.ac.uk:4497/Rfam" +``` + +Database-specific drivers can be passed into the connection string using query parameters. For example, to connect to Microsoft SQL Server using the ODBC Driver, you would need to pass the driver as a query parameter as follows: + +```py +"mssql+pyodbc://username:password@server/database?driver=ODBC+Driver+17+for+SQL+Server" +``` + + +### Passing connection credentials to the `dlt` pipeline + +There are several options for adding your connection credentials into your `dlt` pipeline: + +#### 1. Setting them in `secrets.toml` or as environment variables (Recommended) + +You can set up credentials using [any method](../../general-usage/credentials/setup#available-config-providers) supported by `dlt`. We recommend using `.dlt/secrets.toml` or the environment variables. See Step 2 of the [setup](./setup) for how to set credentials inside `secrets.toml`. For more information on passing credentials read [here](../../general-usage/credentials/setup). + + +#### 2. Passing them directly in the script +It is also possible to explicitly pass credentials inside the source. Example: +```py +from dlt.sources.credentials import ConnectionStringCredentials +from sql_database import sql_table + +credentials = ConnectionStringCredentials( + "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam" +) + +source = sql_table(credentials).with_resource("family") +``` + +:::note +It is recommended to configure credentials in `.dlt/secrets.toml` and to not include any sensitive information in the pipeline code. +::: + +### Other connection options +#### Using SqlAlchemy Engine as credentials +You are able to pass an instance of SqlAlchemy Engine instead of credentials: +```py +from sqlalchemy import create_engine + +engine = create_engine("mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam") +table = sql_table(engine, table="chat_message", schema="data") +``` +This engine is used by `dlt` to open database connections and can work across multiple threads so is compatible with `parallelize` setting of dlt sources and resources. + + +## Configuring the backend + +Table backends convert streams of rows from database tables into batches in various formats. The default backend `SQLAlchemy` follows standard `dlt` behavior of +extracting and normalizing Python dictionaries. We recommend this for smaller tables, initial development work, and when minimal dependencies or a pure Python environment is required. This backend is also the slowest. Other backends make use of the structured data format of the tables and provide significant improvement in speeds. For example, the `PyArrow` backend converts rows into `Arrow` tables, which results in +good performance and preserves exact data types. We recommend using this backend for larger tables. + +### SQLAlchemy + +The `SQLAlchemy` backend (the default) yields table data as a list of Python dictionaries. This data goes through the regular extract +and normalize steps and does not require additional dependencies to be installed. It is the most robust (works with any destination, correctly represents data types) but also the slowest. You can set `reflection_level="full_with_precision"` to pass exact data types to `dlt` schema. + +### PyArrow + +The `PyArrow` backend yields data as `Arrow` tables. It uses `SQLAlchemy` to read rows in batches but then immediately converts them into `ndarray`, transposes it, and sets it as columns in an `Arrow` table. This backend always fully +reflects the database table and preserves original types (i.e. **decimal** / **numeric** data will be extracted without loss of precision). If the destination loads parquet files, this backend will skip `dlt` normalizer and you can gain two orders of magnitude (20x - 30x) speed increase. + +Note that if `pandas` is installed, we'll use it to convert `SQLAlchemy` tuples into `ndarray` as it seems to be 20-30% faster than using `numpy` directly. + +```py +import sqlalchemy as sa +pipeline = dlt.pipeline( + pipeline_name="rfam_cx", destination="postgres", dataset_name="rfam_data_arrow" +) + +def _double_as_decimal_adapter(table: sa.Table) -> None: + """Emits decimals instead of floats.""" + for column in table.columns.values(): + if isinstance(column.type, sa.Float): + column.type.asdecimal = False + +sql_alchemy_source = sql_database( + "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam?&binary_prefix=true", + backend="pyarrow", + table_adapter_callback=_double_as_decimal_adapter +).with_resources("family", "genome") + +info = pipeline.run(sql_alchemy_source) +print(info) +``` + +### pandas + +The `pandas` backend yields data as DataFrames using the `pandas.io.sql` module. `dlt` uses `PyArrow` dtypes by default as they generate more stable typing. + +With the default settings, several data types will be coerced to dtypes in the yielded data frame: +* **decimal** is mapped to double so it is possible to lose precision +* **date** and **time** are mapped to strings +* all types are nullable + +:::note +`dlt` will still use the data types reflected from the source database when creating destination tables. How the type differences resulting from the `pandas` backend are reconciled / parsed is up to the destination. Most of the destinations will be able to parse date/time strings and convert doubles into decimals (Please note that you'll still lose precision on decimals with default settings.). **However we strongly suggest +not to use the** `pandas` **backend if your source tables contain date, time, or decimal columns** +::: + +Internally dlt uses `pandas.io.sql._wrap_result` to generate `pandas` frames. To adjust [pandas-specific settings,](https://pandas.pydata.org/docs/reference/api/pandas.read_sql_table.html) pass it in the `backend_kwargs` parameter. For example, below we set `coerce_float` to `False`: + +```py +import sqlalchemy as sa +pipeline = dlt.pipeline( + pipeline_name="rfam_cx", destination="postgres", dataset_name="rfam_data_pandas_2" +) + +def _double_as_decimal_adapter(table: sa.Table) -> None: + """Emits decimals instead of floats.""" + for column in table.columns.values(): + if isinstance(column.type, sa.Float): + column.type.asdecimal = True + +sql_alchemy_source = sql_database( + "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam?&binary_prefix=true", + backend="pandas", + table_adapter_callback=_double_as_decimal_adapter, + chunk_size=100000, + # set coerce_float to False to represent them as string + backend_kwargs={"coerce_float": False, "dtype_backend": "numpy_nullable"}, +).with_resources("family", "genome") + +info = pipeline.run(sql_alchemy_source) +print(info) +``` + +### ConnectorX +The [`ConnectorX`](https://sfu-db.github.io/connector-x/intro.html) backend completely skips `SQLALchemy` when reading table rows, in favor of doing that in rust. This is claimed to be significantly faster than any other method (validated only on postgres). With the default settings it will emit `PyArrow` tables, but you can configure this by specifying the `return_type` in `backend_kwargs`. (See the [`ConnectorX` docs](https://sfu-db.github.io/connector-x/api.html) for a full list of configurable parameters.) + +There are certain limitations when using this backend: +* it will ignore `chunk_size`. `ConnectorX` cannot yield data in batches. +* in many cases it requires a connection string that differs from the `SQLAlchemy` connection string. Use the `conn` argument in `backend_kwargs` to set this. +* it will convert **decimals** to **doubles**, so you will lose precision. +* nullability of the columns is ignored (always true) +* it uses different mappings for each data type. (Check [here](https://sfu-db.github.io/connector-x/databases.html) for more details.) +* JSON fields (at least those coming from postgres) are double wrapped in strings. To unwrap this, you can pass the in-built transformation function `unwrap_json_connector_x` (for example, with `add_map`): + + ```py + from sources.sql_database.helpers import unwrap_json_connector_x + ``` + +:::note +`dlt` will still use the data types refected from the source database when creating destination tables. It is up to the destination to reconcile / parse type differences. Please note that you'll still lose precision on decimals with default settings. +::: + +```py +"""This example is taken from the benchmarking tests for ConnectorX performed on the UNSW_Flow dataset (~2mln rows, 25+ columns). Full code here: https://github.com/dlt-hub/sql_database_benchmarking""" +import os +from dlt.destinations import filesystem + +unsw_table = sql_table( + "postgresql://loader:loader@localhost:5432/dlt_data", + "unsw_flow_7", + "speed_test", + # this is ignored by connectorx + chunk_size=100000, + backend="connectorx", + # keep source data types + reflection_level="full_with_precision", + # just to demonstrate how to setup a separate connection string for connectorx + backend_kwargs={"conn": "postgresql://loader:loader@localhost:5432/dlt_data"} +) + +pipeline = dlt.pipeline( + pipeline_name="unsw_download", + destination=filesystem(os.path.abspath("../_storage/unsw")), + progress="log", + dev_mode=True, +) + +info = pipeline.run( + unsw_table, + dataset_name="speed_test", + table_name="unsw_flow", + loader_file_format="parquet", +) +print(info) +``` +With the dataset above and a local postgres instance, the `ConnectorX` backend is 2x faster than the `PyArrow` backend. diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/index.md b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/index.md new file mode 100644 index 0000000000..0f65fe7816 --- /dev/null +++ b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/index.md @@ -0,0 +1,51 @@ +--- +title: 30+ SQL Databases +description: dlt pipeline for SQL Database +keywords: [sql connector, sql database pipeline, sql database] +--- +import Header from '../_source-info-header.md'; + +# 30+ SQL Databases + +
+ +SQL databases are management systems (DBMS) that store data in a structured format, commonly used +for efficient and reliable data retrieval. + +The SQL Database verified source loads data to your specified destination using one of the following backends: SQLAlchemy, PyArrow, pandas, or ConnectorX. + +Sources and resources that can be loaded using this verified source are: + +| Name | Description | +| ------------ | -------------------------------------------------------------------- | +| sql_database | Reflects the tables and views in SQL database and retrieves the data | +| sql_table | Retrieves data from a particular SQL database table | +| | | + +:::tip +If you prefer to skip the tutorial and see the code example right away, check out the pipeline example [here](https://github.com/dlt-hub/verified-sources/blob/master/sources/sql_database_pipeline.py). +::: + +### Supported databases + +We support all [SQLAlchemy dialects](https://docs.sqlalchemy.org/en/20/dialects/), which include, but are not limited to, the following database engines: + +* [PostgreSQL](./troubleshooting#postgres--mssql) +* [MySQL](./troubleshooting#mysql) +* SQLite +* [Oracle](./troubleshooting#oracle) +* [Microsoft SQL Server](./troubleshooting#postgres--mssql) +* MariaDB +* [IBM DB2 and Informix](./troubleshooting#db2) +* Google BigQuery +* Snowflake +* Redshift +* Apache Hive and Presto +* SAP Hana +* CockroachDB +* Firebird +* Teradata Vantage + +:::note +Note that there many unofficial dialects, such as [DuckDB](https://duckdb.org/). +::: \ No newline at end of file diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/setup.md b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/setup.md new file mode 100644 index 0000000000..f00d0f696e --- /dev/null +++ b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/setup.md @@ -0,0 +1,76 @@ +--- +title: Setup +description: basic steps for setting up a dlt pipeline for SQL Database +keywords: [sql connector, sql database pipeline, sql database] +--- + +import Header from '../_source-info-header.md'; + +# Setup + +
+ +To connect to your SQL database using `dlt` follow these steps: + +1. Initialize a `dlt` project in the current working directory by running the following command: + + ```sh + dlt init sql_database duckdb + ``` + + This will add necessary files and configurations for a `dlt` pipeline with SQL database as the [source](../../general-usage/source) and + [DuckDB](../../destinations/duckdb.md) as the [destination](../../destinations). + +:::tip +If you'd like to use a different destination, simply replace `duckdb` with the name of your preferred [destination](../destinations). +::: + +2. Add credentials for your SQL database + + To connect to your SQL database, `dlt` would need to authenticate using necessary credentials. To enable this, paste your credentials in the `secrets.toml` file created inside the `.dlt/` folder in the following format: + ```toml + [sources.sql_database.credentials] + drivername = "mysql+pymysql" # driver name for the database + database = "Rfam" # database name + username = "rfamro" # username associated with the database + host = "mysql-rfam-public.ebi.ac.uk" # host address + port = "4497" # port required for connection + ``` + + Alternatively, you can also authenticate using connection strings: + ```toml + [sources.sql_database.credentials] + credentials="mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam" + ``` + + To learn more about how to add credentials into your `sql_database` pipeline see [here](./configuration#configuring-the-connection). + +3. Add credentials for your destination (if necessary) + + Depending on which [destination](../../destinations) you're loading into, you might also need to add your destination credentials. For more information read the [General Usage: Credentials.](../../../general-usage/credentials) + +4. Install any necessary dependencies + + ```sh + pip install -r requirements.txt + ``` + +5. Run the pipeline + + ```sh + python sql_database_pipeline.py + ``` + + Executing this command will run the example script `sql_database_pipeline.py` created in step 1. In order for this to run successfully you will need to pass the names of the databases and/or tables you wish to load. + See the [section on configuring the sql_database source](./configuration#configuring-the-sql-database-source) for more details. + + +6. Make sure everything is loaded as expected with + ```sh + dlt pipeline show + ``` + + :::note + The pipeline_name for the above example is `rfam`, you may also use any + custom name instead. + ::: \ No newline at end of file diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/troubleshooting.md b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/troubleshooting.md new file mode 100644 index 0000000000..33986fb5a6 --- /dev/null +++ b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/troubleshooting.md @@ -0,0 +1,89 @@ +--- +title: Troubleshooting +description: common troubleshooting use-cases for the sql_database source +keywords: [sql connector, sql database pipeline, sql database] +--- + +import Header from '../_source-info-header.md'; + +# Troubleshooting + +
+ +## Troubleshooting connection + +#### Connecting to MySQL with SSL +Here, we use the `mysql` and `pymysql` dialects to set up an SSL connection to a server, with all information taken from the [SQLAlchemy docs](https://docs.sqlalchemy.org/en/14/dialects/mysql.html#ssl-connections). + +1. To enforce SSL on the client without a client certificate you may pass the following DSN: + + ```toml + sources.sql_database.credentials="mysql+pymysql://root:@:3306/mysql?ssl_ca=" + ``` + +1. You can also pass the server's public certificate (potentially bundled with your pipeline) and disable host name checks: + + ```toml + sources.sql_database.credentials="mysql+pymysql://root:@:3306/mysql?ssl_ca=server-ca.pem&ssl_check_hostname=false" + ``` + +1. For servers requiring a client certificate, provide the client's private key (a secret value). In Airflow, this is usually saved as a variable and exported to a file before use. The server certificate is omitted in the example below: + + ```toml + sources.sql_database.credentials="mysql+pymysql://root:@35.203.96.191:3306/mysql?ssl_ca=&ssl_cert=client-cert.pem&ssl_key=client-key.pem" + ``` + +#### SQL Server connection options + +**To connect to an `mssql` server using Windows authentication**, include `trusted_connection=yes` in the connection string. + +```toml +sources.sql_database.credentials="mssql+pyodbc://loader.database.windows.net/dlt_data?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server" +``` + +**To connect to a local sql server instance running without SSL** pass `encrypt=no` parameter: +```toml +sources.sql_database.credentials="mssql+pyodbc://loader:loader@localhost/dlt_data?encrypt=no&driver=ODBC+Driver+17+for+SQL+Server" +``` + +**To allow self signed SSL certificate** when you are getting `certificate verify failed:unable to get local issuer certificate`: +```toml +sources.sql_database.credentials="mssql+pyodbc://loader:loader@localhost/dlt_data?TrustServerCertificate=yes&driver=ODBC+Driver+17+for+SQL+Server" +``` + +**To use long strings (>8k) and avoid collation errors**: +```toml +sources.sql_database.credentials="mssql+pyodbc://loader:loader@localhost/dlt_data?LongAsMax=yes&driver=ODBC+Driver+17+for+SQL+Server" +``` + +## Troubleshooting backends + +### Notes on specific databases + +#### Oracle +1. When using the `oracledb` dialect in thin mode we are getting protocol errors. Use thick mode or `cx_oracle` (old) client. +2. Mind that `SQLAlchemy` translates Oracle identifiers into lower case! Keep the default `dlt` naming convention (`snake_case`) when loading data. We'll support more naming conventions soon. +3. `Connectorx` is for some reason slower for Oracle than the `PyArrow` backend. + +See [here](https://github.com/dlt-hub/sql_database_benchmarking/tree/main/oracledb#installing-and-setting-up-oracle-db) for information and code on setting up and benchmarking on Oracle. + +#### DB2 +1. Mind that `SQLAlchemy` translates DB2 identifiers into lower case! Keep the default `dlt` naming convention (`snake_case`) when loading data. We'll support more naming conventions soon. +2. The DB2 type `DOUBLE` gets incorrectly mapped to the python type `float` (instead of the `SqlAlchemy` type `Numeric` with default precision). This requires `dlt` to perform additional casts. The cost of the cast, however, is minuscule compared to the cost of reading rows from database. + +See [here](https://github.com/dlt-hub/sql_database_benchmarking/tree/main/db2#installing-and-setting-up-db2) for information and code on setting up and benchmarking on db2. + +#### MySQL +1. The `SqlAlchemy` dialect converts doubles to decimals. (This can be disabled via the table adapter argument as shown in the code example [here](./configuration#pyarrow)) + +#### Postgres / MSSQL +No issues were found for these databases. Postgres is the only backend where we observed 2x speedup with `ConnectorX` (see [here](https://github.com/dlt-hub/sql_database_benchmarking/tree/main/postgres) for the benchmarking code). On other db systems it performs the same as (or some times worse than) the `PyArrow` backend. + +### Notes on specific data types + +#### JSON + +In the `SQLAlchemy` backend JSON data type is represented as a Python object, and in the `PyArrow` backend, it is represented as a JSON string. At present it does not work correctly with `pandas` and `ConnectorX`which cast Python objects to `str`, generating invalid JSON strings that cannot be loaded into destination. + +#### UUID +UUIDs are represented as string by default. You can switch this behavior by using `table_adapter_callback` to modify properties of the UUID type for a particular column. (See the code example [here](./configuration#pyarrow) for how to modify the data type properties of a particular column.) \ No newline at end of file diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/usage.md b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/usage.md new file mode 100644 index 0000000000..ee70e92ea0 --- /dev/null +++ b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/usage.md @@ -0,0 +1,102 @@ +--- +title: Usage +description: basic usage of the sql_database source +keywords: [sql connector, sql database pipeline, sql database] +--- + +import Header from '../_source-info-header.md'; + +# Usage + +
+ +## Applying column-wise filtering on the data being ingested + +By default, the existing source and resource functions, `sql_database` and `sql_table`, ingest all of the records from the source table. But by using `query_adapter_callback`, it is possible to pass a `WHERE` clause inside the underlying `SELECT` statement using the [SQLAlchemy syntax](https://docs.sqlalchemy.org/en/14/core/selectable.html#). Thich enables filtering the data based on specific columns before extract. + +The example below uses `query_adapter_callback` to filter on the column `customer_id` for the table `orders`: + +```py +def query_adapter_callback(query, table): + if table.name == "orders": + # Only select rows where the column customer_id has value 1 + return query.where(table.c.customer_id==1) + # Use the original query for other tables + return query + +source = sql_database( + query_adapter_callback=query_adapter_callback +).with_resources("orders") +``` + +## Transforming the data before load +You have direct access to the extracted data through the resource objects (`sql_table()` or `sql_database().with_resource())`), each of which represents a single SQL table. These objects are generators that yield +individual rows of the table which can be modified by using custom python functions. These functions can be applied to the resource using `add_map`. + +:::note +The PyArrow backend does not yield individual rows rather loads chunks of data as `ndarray`. In this case, the transformation function that goes into `add_map` should be configured to expect an `ndarray` input. +::: + + +Examples: +1. Pseudonymizing data to hide personally identifiable information (PII) before loading it to the destination. (See [here](https://dlthub.com/docs/general-usage/customising-pipelines/pseudonymizing_columns) for more information on pseudonymizing data with `dlt`) + + ```py + import hashlib + + def pseudonymize_name(doc): + ''' + Pseudonmyisation is a deterministic type of PII-obscuring + Its role is to allow identifying users by their hash, + without revealing the underlying info. + ''' + # add a constant salt to generate + salt = 'WI@N57%zZrmk#88c' + salted_string = doc['rfam_acc'] + salt + sh = hashlib.sha256() + sh.update(salted_string.encode()) + hashed_string = sh.digest().hex() + doc['rfam_acc'] = hashed_string + return doc + + pipeline = dlt.pipeline( + # Configure the pipeline + ) + # using sql_database source to load family table and pseudonymize the column "rfam_acc" + source = sql_database().with_resources("family") + # modify this source instance's resource + source = source.family.add_map(pseudonymize_name) + # Run the pipeline. For a large db this may take a while + info = pipeline.run(source, write_disposition="replace") + print(info) + ``` + +2. Excluding unnecessary columns before load + + ```py + def remove_columns(doc): + del doc["rfam_id"] + return doc + + pipeline = dlt.pipeline( + # Configure the pipeline + ) + # using sql_database source to load family table and remove the column "rfam_id" + source = sql_database().with_resources("family") + # modify this source instance's resource + source = source.family.add_map(remove_columns) + # Run the pipeline. For a large db this may take a while + info = pipeline.run(source, write_disposition="replace") + print(info) + ``` + +## Deploying the sql_database pipeline + +You can deploy the `sql_database` pipeline with any of the `dlt` deployment methods, such as [GitHub Actions](https://dlthub.com/docs/walkthroughs/deploy-a-pipeline/deploy-with-github-actions), [Airflow](https://dlthub.com/docs/walkthroughs/deploy-a-pipeline/deploy-with-airflow-composer), [Dagster](https://dlthub.com/docs/walkthroughs/deploy-a-pipeline/deploy-with-dagster) etc. See [here](https://dlthub.com/docs/walkthroughs/deploy-a-pipeline) for a full list of deployment methods. + +### Running on Airflow +When running on Airflow: +1. Use the `dlt` [Airflow Helper](../../../walkthroughs/deploy-a-pipeline/deploy-with-airflow-composer.md#2-modify-dag-file) to create tasks from the `sql_database` source. (If you want to run table extraction in parallel, then you can do this by setting `decompose = "parallel-isolated"` when doing the source->DAG conversion. See [here](https://dlthub.com/docs/walkthroughs/deploy-a-pipeline/deploy-with-airflow-composer#2-modify-dag-file) for code example.) +2. Reflect tables at runtime with `defer_table_reflect` argument. +3. Set `allow_external_schedulers` to load data using [Airflow intervals](../../../general-usage/incremental-loading.md#using-airflow-schedule-for-backfill-and-incremental-loading). + diff --git a/docs/website/sidebars.js b/docs/website/sidebars.js index 465212cae6..29ba5e1082 100644 --- a/docs/website/sidebars.js +++ b/docs/website/sidebars.js @@ -86,7 +86,21 @@ const sidebars = { 'dlt-ecosystem/verified-sources/salesforce', 'dlt-ecosystem/verified-sources/scrapy', 'dlt-ecosystem/verified-sources/shopify', - 'dlt-ecosystem/verified-sources/sql_database', + { + type: 'category', + label: '30+ SQL Databases', + link: { + type: 'doc', + id: 'dlt-ecosystem/verified-sources/sql_database/index', + }, + items: [ + 'dlt-ecosystem/verified-sources/sql_database/setup', + 'dlt-ecosystem/verified-sources/sql_database/configuration', + 'dlt-ecosystem/verified-sources/sql_database/usage', + 'dlt-ecosystem/verified-sources/sql_database/troubleshooting', + 'dlt-ecosystem/verified-sources/sql_database/advanced' + ] + }, 'dlt-ecosystem/verified-sources/slack', 'dlt-ecosystem/verified-sources/strapi', 'dlt-ecosystem/verified-sources/stripe', From 08a2eb39c9d479ff6600c9547ab642b6933885fd Mon Sep 17 00:00:00 2001 From: rahuljo Date: Tue, 10 Sep 2024 16:31:14 +0200 Subject: [PATCH 09/13] updating broken links --- .../deploy-a-pipeline/deploy-with-airflow-composer.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/website/docs/walkthroughs/deploy-a-pipeline/deploy-with-airflow-composer.md b/docs/website/docs/walkthroughs/deploy-a-pipeline/deploy-with-airflow-composer.md index ce76240c8a..8dd86c5172 100644 --- a/docs/website/docs/walkthroughs/deploy-a-pipeline/deploy-with-airflow-composer.md +++ b/docs/website/docs/walkthroughs/deploy-a-pipeline/deploy-with-airflow-composer.md @@ -216,7 +216,7 @@ load_data() ) ``` :::tip -When you run `load_data` DAG above, Airflow will call `source` function every 30 seconds (by default) to be able to monitor the tasks. Make sure that your source function does not do any long lasting operations ie. reflecting source database. In case of [sql_database](../../dlt-ecosystem/verified-sources/sql_database.md) we added an option to delay database reflection until data is accessed by a resource. +When you run `load_data` DAG above, Airflow will call `source` function every 30 seconds (by default) to be able to monitor the tasks. Make sure that your source function does not do any long lasting operations ie. reflecting source database. In case of [sql_database](../../dlt-ecosystem/verified-sources/sql_database/index.md) we added an option to delay database reflection until data is accessed by a resource. ::: ### 3. Import sources and move the relevant code from the pipeline script From bf3cab604d70831b21023a80f84cbd513e028403 Mon Sep 17 00:00:00 2001 From: rahuljo Date: Wed, 11 Sep 2024 10:10:41 +0200 Subject: [PATCH 10/13] removing problematic relative paths --- .../verified-sources/sql_database/configuration.md | 2 +- .../dlt-ecosystem/verified-sources/sql_database/setup.md | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/configuration.md b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/configuration.md index f77065fd79..88ea268378 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/configuration.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/configuration.md @@ -124,7 +124,7 @@ There are several options for adding your connection credentials into your `dlt` #### 1. Setting them in `secrets.toml` or as environment variables (Recommended) -You can set up credentials using [any method](../../general-usage/credentials/setup#available-config-providers) supported by `dlt`. We recommend using `.dlt/secrets.toml` or the environment variables. See Step 2 of the [setup](./setup) for how to set credentials inside `secrets.toml`. For more information on passing credentials read [here](../../general-usage/credentials/setup). +You can set up credentials using [any method](https://dlthub.com/docs/devel/general-usage/credentials/setup#available-config-providers) supported by `dlt`. We recommend using `.dlt/secrets.toml` or the environment variables. See Step 2 of the [setup](./setup) for how to set credentials inside `secrets.toml`. For more information on passing credentials read [here](https://dlthub.com/docs/devel/general-usage/credentials/setup). #### 2. Passing them directly in the script diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/setup.md b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/setup.md index f00d0f696e..a91ae40028 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/setup.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/setup.md @@ -18,11 +18,11 @@ To connect to your SQL database using `dlt` follow these steps: dlt init sql_database duckdb ``` - This will add necessary files and configurations for a `dlt` pipeline with SQL database as the [source](../../general-usage/source) and - [DuckDB](../../destinations/duckdb.md) as the [destination](../../destinations). + This will add necessary files and configurations for a `dlt` pipeline with SQL database as the source and + [DuckDB](../../destinations/duckdb.md) as the destination. :::tip -If you'd like to use a different destination, simply replace `duckdb` with the name of your preferred [destination](../destinations). +If you'd like to use a different destination, simply replace `duckdb` with the name of your preferred [destination](../../destinations). ::: 2. Add credentials for your SQL database From c7e6221955218d5bc6c9ad8ab6fc21fa66a36138 Mon Sep 17 00:00:00 2001 From: rahuljo Date: Wed, 11 Sep 2024 16:14:42 +0200 Subject: [PATCH 11/13] small formatting and language change + adding a line about column reflection --- .../verified-sources/sql_database/advanced.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/advanced.md b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/advanced.md index adb8bf80b8..c9126142dd 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/advanced.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/advanced.md @@ -27,7 +27,7 @@ certain range. #### Examples -1. Incremental loading with the resource `sql_table` +**1. Incremental loading with the resource `sql_table`** Consider a table "family" with a timestamp column `last_modified` that indicates when a row was last modified. To ensure that only rows modified after midnight (00:00:00) on January 1, 2024, are loaded, you would set `last_modified` timestamp as the cursor as follows: ```py from sql_database import sql_table @@ -48,7 +48,7 @@ certain range. Behind the scene, the loader generates a SQL query filtering rows with `last_modified` values greater than the incremental value. In the first run, this is the initial value (midnight (00:00:00) January 1, 2024). In subsequent runs, it is the latest value of `last_modified` that `dlt` stores in [state](https://dlthub.com/docs/general-usage/state). -2. Incremental loading with the source `sql_database` +**2. Incremental loading with the source `sql_database`** To achieve the same using the `sql_database` source, you would specify your cursor as follows: ```py @@ -61,7 +61,7 @@ certain range. ``` :::info - * For merge write disposition, the source table needs a primary key, which `dlt` automatically sets up. + * When using "merge" write disposition, the source table needs a primary key, which `dlt` automatically sets up. * `apply_hints` is a powerful method that enables schema modifications after resource creation, like adjusting write disposition and primary keys. You can choose from various tables and use `apply_hints` multiple times to create pipelines with merged, appended, or replaced resources. ::: @@ -74,7 +74,7 @@ table = sql_table().parallelize() ``` ## Column reflection -Columns and their data types are reflected with SQLAlchemy. The SQL types are then mapped to `dlt` types. +Column reflection is the automatic detection and retrieval of column metadata like column names, constraints, data types etc. Columns and their data types are reflected with SQLAlchemy. The SQL types are then mapped to `dlt` types. Depending on the selected backend, some of the types might require additional processing. The `reflection_level` argument controls how much information is reflected: From ac746883ade6a04d1533e9cd55676d821c0fa64d Mon Sep 17 00:00:00 2001 From: akelad Date: Thu, 12 Sep 2024 13:36:34 +0200 Subject: [PATCH 12/13] fix outdated info --- .../dlt-ecosystem/verified-sources/sql_database/advanced.md | 4 ++-- .../docs/dlt-ecosystem/verified-sources/sql_database/index.md | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/advanced.md b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/advanced.md index c9126142dd..7ff08f8095 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/advanced.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/advanced.md @@ -54,7 +54,7 @@ certain range. ```py source = sql_database().with_resources("family") #using the "last_modified" field as an incremental field using initial value of midnight January 1, 2024 - source.family.apply_hints(incremental=dlt.sources.incremental("updated"),initial_value=pendulum.DateTime(2024, 1, 1, 0, 0, 0)) + source.family.apply_hints(incremental=dlt.sources.incremental("updated", initial_value=pendulum.DateTime(2022, 1, 1, 0, 0, 0))) #running the pipeline info = pipeline.run(source, write_disposition="merge") print(info) @@ -83,7 +83,7 @@ The `reflection_level` argument controls how much information is reflected: - `reflection_level = "full"`: Column names, nullability, and data types are detected. For decimal types we always add precision and scale. **This is the default.** - `reflection_level = "full_with_precision"`: Column names, nullability, data types, and precision/scale are detected, also for types like text and binary. Integer sizes are set to bigint and to int for all other types. -If the SQL type is unknown or not supported by `dlt`, then, in the pyarrow backend, the column will be skipped, whereas in the other backends the type will be inferred directly from the data irrespective of the `reflection_level` specified. In the latter case, this often means that some types are coerced to strings and `dataclass` based values from sqlalchemy are inferred as `complex` (JSON in most destinations). +If the SQL type is unknown or not supported by `dlt`, then, in the pyarrow backend, the column will be skipped, whereas in the other backends the type will be inferred directly from the data irrespective of the `reflection_level` specified. In the latter case, this often means that some types are coerced to strings and `dataclass` based values from sqlalchemy are inferred as `json` (JSON in most destinations). :::tip If you use reflection level **full** / **full_with_precision** you may encounter a situation where the data returned by sqlalchemy or pyarrow backend does not match the reflected data types. Most common symptoms are: 1. The destination complains that it cannot cast one type to another for a certain column. For example `connector-x` returns TIME in nanoseconds diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/index.md b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/index.md index 0f65fe7816..c3d8517052 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/index.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/index.md @@ -1,6 +1,6 @@ --- title: 30+ SQL Databases -description: dlt pipeline for SQL Database +description: PostgreSQL, MySQL, MS SQL Server, BigQuery, Redshift, and more keywords: [sql connector, sql database pipeline, sql database] --- import Header from '../_source-info-header.md'; From f17416d1bb95f8d758d0af1a6f8ff9a4376f7793 Mon Sep 17 00:00:00 2001 From: akelad Date: Thu, 12 Sep 2024 13:46:29 +0200 Subject: [PATCH 13/13] fix description --- .../docs/dlt-ecosystem/verified-sources/sql_database/index.md | 2 +- docs/website/sidebars.js | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/index.md b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/index.md index c3d8517052..a8146c75fe 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/index.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/index.md @@ -1,6 +1,6 @@ --- title: 30+ SQL Databases -description: PostgreSQL, MySQL, MS SQL Server, BigQuery, Redshift, and more +description: PostgreSQL, MySQL, MS SQL, BigQuery, Redshift, and more keywords: [sql connector, sql database pipeline, sql database] --- import Header from '../_source-info-header.md'; diff --git a/docs/website/sidebars.js b/docs/website/sidebars.js index fd8d494acd..9fd256ec63 100644 --- a/docs/website/sidebars.js +++ b/docs/website/sidebars.js @@ -93,6 +93,7 @@ const sidebars = { { type: 'category', label: '30+ SQL Databases', + description: 'PostgreSQL, MySQL, MS SQL, BigQuery, Redshift, and more', link: { type: 'doc', id: 'dlt-ecosystem/verified-sources/sql_database/index',