Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Epic: Database table/non-file dependencies #9945

Closed
3 of 7 tasks
dberenbaum opened this issue Sep 14, 2023 · 25 comments
Closed
3 of 7 tasks

Epic: Database table/non-file dependencies #9945

dberenbaum opened this issue Sep 14, 2023 · 25 comments
Assignees
Labels
epic Umbrella issue (high level). Include here: list of tasks/PRs, details, Qs, timelines, etc p1-important Important, aka current backlog of things to do

Comments

@dberenbaum
Copy link
Collaborator

dberenbaum commented Sep 14, 2023

Update

See updated proposal below.

Original proposal (outdated)

Summary / Background

Tables/datasets from databases and similar systems can be specified as dependencies in dvc pipelines.

Related:

Scope

  • user should be able to specify a table name and a database connection as a dvc pipeline dependency in dvc.yaml
  • dvc should record a unique id in dvc.lock (version id, table hash, or however table contents can be uniquely identified in the system)
  • user can check if the table has changed since they last ran the query
  • user can freeze the stage if they don't want to always check for the latest version of the table

Assumptions

  • versioning table-level dependencies is granular enough for now -- the user can specify both the sql query script and the table names as dependencies, which should be enough for most use cases (no need for dvc to know anything about the query)
  • some unique content identifier for a table can be identified in the database

Open Questions

  • Do we need to build a separate integration for every database or can we make it more generic?
  • What fields are needed in dvc.yaml and dvc.lock?
  • Should database connection info be inside dvc.yaml (and dvc.lock?) or stored in config file for reuse?
  • Which databases to prioritize?

Blockers / Dependencies

  • DVCX should be one of the backends to prioritize but may not be ready yet

General Approach

Example:

# dvc.yaml
stages:
  train:
    cmd: python train.py
    deps:
      - train.py
      - customer_info:
          type: delta-lake
          conn_info: ...
    outs:
      - model
# dvc.lock
train:
  cmd: python train.py
  deps:
    - path: train.py
       hash: md5
       md5: 324001573ed724e5ae092226fcf9ca30
       size: 1666
    - name: customer_info
       type: delta-lake
       version: 2
  outs:
    - path: model
       hash: md5
       md5: f9ee137c1772f7fb21b8f52c8a72c8ac
       size: 1957931

Steps

Must have (p1)

  • Research into databases and how much we can generalize
  • 1-3 traditional databases/data warehouses (postgresql, sql server, mysql, oracle, snowflake, redshift, bigquery)
  • Delta Lake tables
  • DVCX datasets #10114

Optional / followup (p2)

  • Generalized support for callback dependencies
  • Database tables as outputs: A pipeline stage may take tables and a query script as dependencies and generate a new table as an output. DVC can track that table as a (non-cached) stage output.
  • Import: Be able to import/instantiate the table to files.

Timelines

TBD by the assignee

@dberenbaum dberenbaum added epic Umbrella issue (high level). Include here: list of tasks/PRs, details, Qs, timelines, etc p1-important Important, aka current backlog of things to do labels Sep 14, 2023
@dberenbaum
Copy link
Collaborator Author

  • Delta Lake tables

This can be limited to databricks rather than a generic connection to delta lake. https://docs.databricks.com/en/dev-tools/index.html might be helpful to research.

@skshetry
Copy link
Member

Also related: #2378.

@skshetry skshetry self-assigned this Sep 25, 2023
@dberenbaum
Copy link
Collaborator Author

Edited to add generalized support for callbacks deps as a p2 item

@skshetry
Copy link
Member

skshetry commented Sep 28, 2023

Snowflake has LAST_ALTERED that can be used

LAST_ALTERED TIMESTAMP_LTZ Date and time when the table was last altered by a DDL or DML operation.

For views and external tables, even though they have LAST_ALTERED, they only record the timestamp when the structure was changed. So this approach is only going to work for tables.

use schema snowflake_sample_data.tpcds_sf100tcl;
SELECT LAST_ALTERED 
FROM snowflake_sample_data.INFORMATION_SCHEMA.TABLES 
WHERE TABLE_NAME = 'CUSTOMER' and TABLE_SCHEMA = 'TPCDS_SF100TCL';

Snowflake also has time-travel feature where you can query using AT/BEFORE SQL extensions. The data retention is set to 1 day by default for all accounts, but you can only extend it to 90 days max if you have an enterprise account.

# show data from 5 minutes ago
SELECT * FROM customer_table AT(OFFSET => -60*5);

We can also use hash_agg(*), which computes a single hash for the table. It seemed to be used for checking data consistency, equivalency of different tables, and retriggering pipelines[citation required].
It seemed fast enough to me on the snowflake sample data, but I am not sure how fast it will be on a really large datasets.

SELECT hash_agg(*) FROM table;
HASH_AGG(*)
-5,290,741,514,389,461,925

The first one is applicable only for snowflake, and the second one does not seem very useful by default. The hash_agg approach may be generalizable to other databases, as they have similar functions (if it's okay to compute each time). Anyway, it's a good function to measure and see (On a snowflake trial, it was ⚡ fast for me on the sample dataset).

@skshetry
Copy link
Member

skshetry commented Sep 29, 2023

Dolt is a full-fledged mysql database with some stored procedures for Git-like operations and versioning capabilities.

See https://www.dolthub.com/blog/2023-04-19-dolt-architecture-intro/. cc @shcheklein
Also see Dolt 1.0 where they replaced internal storage format.

Also, there have seen some attempts on integrating dolt with DVC.

@dberenbaum
Copy link
Collaborator Author

@skshetry hash_agg() is what I used at my last company to use dvc with snowflake, and from discussion with some others on other teams in the org, I think this was generally agreed as the best way to do it. It worked pretty well even for large datasets AFAIR.

@dberenbaum
Copy link
Collaborator Author

dberenbaum commented Sep 29, 2023

Notes from discussion with @dmpetrov and @shcheklein:

Other tools to research in this area:

We also discussed starting even simpler and leave it to the user to decide when to check for updates and run the query again. In the basic use case of "query and dump," someone wants to run an expensive query, often across multiple tables, and dump the results to a file. DVC caches the results to recover later, and the user chooses when to update those results.

A simple example of how to do this in DVC now would be to write a script to execute the query and dump the results:

# dump.py
import pandas as pd
import sqlite3

conn = sqlite3.connect("mydb.db")
df = pd.read_sql_query("SELECT * FROM table WHERE ...", conn)
df.to_csv("dump.csv")

Then wrap that in a DVC stage:

stages:
  dump:
    cmd: python dump.py
    deps:
      - dump.py
    outs:
      - dump.csv

Running dvc repro will cache the output in a csv file and won't run again unless dump.py changes (for example, if the query is updated). If the user thinks the database tables have changed and wants to run the stage again, they can use --force to update the results.

The simplest approach we can take is to document this pattern. Pending looking deeper into the technologies above, I think anything beyond that will require us to write some database-specific functions to either:

  • auto-detect if the raw database tables have changed (the suggestion in the OP above). This may require a different query in each database flavor to detect changes.
  • dump query results to a file (like the Python script is doing in this example). This one is probably more generalizable since it's probably possible to run queries across many database flavors with something like sqlalchemy and then have some common serialization function. However, we would need some new syntax (dvc import-db --conn sqlite:... --input query.sql --output dump.csv?) to support it.

@dberenbaum
Copy link
Collaborator Author

  • dump query results to a file (like the Python script is doing in this example). This one is probably more generalizable since it's probably possible to run queries across many database flavors with something like sqlalchemy and then have some common serialization function. However, we would need some new syntax (dvc import-db --conn sqlite:... --input query.sql --output dump.csv?) to support it.

Additional considerations:

  • Need to make some opinionated choices about the output format (csv/parquet/etc. and options for that file type)
  • If we use something general like sqlalchemy, my experience has been that it could be much slower than a database-specific export functionality

@skshetry
Copy link
Member

skshetry commented Oct 3, 2023

How much do users care about materializing as files compared to materializing as a transient table or a view that they use in the next stage? Materializing to a file is not always possible for large data warehouses databases, and compute/storage is cheap anyway. Although depends on what databases we are targeting and if we are focused on end-to-end scenarios or not.

@dberenbaum
Copy link
Collaborator Author

Discussed in the sprint meeting:

  • A downside to dumping query results to a file is that we lose more granular lineage tracking about the database tables (for example, dvc wouldn't know the dvcx dataset name or version)
  • dvc isn't the right tool for database-only etl processes, so at some point materialization is likely; OTOH, there could be ways to read data directly from database tables (like dvcx pytorch loader), which may be more practical for large datasets
  • dbt seems useful for lineage tracking inside a database and dvc is useful for files -- can we show how to connect them for full lineage that starts in a database and ends in a model or other non-database files?

@skshetry
Copy link
Member

skshetry commented Oct 3, 2023

dbt seems useful for lineage tracking inside a database and dvc is useful for files -- can we show how to connect them for full lineage that starts in a database and ends in a model or other non-database files?

With dbt, you can deploy to a separate production schema (or your dev schema), so the way to convert a database table to a file is the same as for any other databases. dbt is not in the picture for this. You can track full lineage in dbt and use dvc to work with files even today. Note that a dbt repository is just a git repository, so you can easily use both dvc and dbt at the same time.

@skshetry
Copy link
Member

skshetry commented Oct 5, 2023

BigQuery has last_modified_time in __tables__.

SELECT 
  table_id, 
  TIMESTAMP_MILLIS(last_modified_time) AS last_modified
FROM 
  project_id.dataset.__TABLES__

There are other ways written on this blog post, which also mentions that there is a way using INFORMATION_SCHEMA.TABLES, but I could neither find it documented in TABLES view nor comes up in query.

There is not a hash_agg() function like in BigQuery either, but we probably can use this: https://stackoverflow.com/questions/50428639/does-bigquery-has-an-api-to-return-a-checksum-for-a-exported-table.

@skshetry
Copy link
Member

skshetry commented Oct 5, 2023

Postgresql>=9.5 has track_commit_timestamp config, that when enabled logs the commit timestamp.

Also, this post suggests that it's not that reliable:

PostgreSQL 9.5 has commit timestamps. If you have them enabled in postgresql.conf (and did so in the past too), you can check the commit timestamp for the row with the greatest xmin to approximate the last modified time. It's only an approximation because if the most recent rows have been deleted they won't be counted.

Also, commit timestamp records are only kept for a limited time. So if you want to tell when a table that isn't modified much is modified, the answer will effectively be "dunno, a while ago".

There is no metadata in INFORMATION_SCHEMA.TABLES and no hash_agg like checksum functions, but can be achieved similar to above in BigQuery.

@skshetry
Copy link
Member

skshetry commented Oct 5, 2023

Databricks has LAST_ALTERED field in INFORMATION_SCHEMA.TABLES.

But no such fields exist for views (not even for DDL changes).

@skshetry
Copy link
Member

skshetry commented Oct 5, 2023

MySQL has a UPDATE_TIMEin INFORMATION_SCHEMA.TABLES, but it depends on storage engine used. By default, it should exist. But it is not persisted over reboots.

There is also something called live checksum that you can set when creating the table, and a checksum field will be populated in the INFORMATION_SCHEMA.TABLES for that table, but it might make inserts slower.

@skshetry
Copy link
Member

skshetry commented Oct 5, 2023

For SQL Server, I could only find modify_date in sys.objects table.

modify_date datetime Date the object was last modified by using an ALTER statement. If the object is a table or a view, modify_date also changes when an index on the table or view is created or altered.

Since it is a system table, probably not everyone has access to it.

There is CHECKSUM_AGG function in SQL Server.

@dberenbaum
Copy link
Collaborator Author

dberenbaum commented Oct 6, 2023

dbt is conceptually analagous to dvc for databases. One approach to avoid having to build too much ourselves or maintain implementations for different databases is to be opinionated. In other words, we could suggest to use dbt for database/data engineering/ETL version control (and see if they are also interested in partnering on this), and then only provide an integration with dbt.

For example, dbt already has a way to determine the freshness of a table that we could use to determine when data was last updated (when available).

If we want to start with simply dumping a query result, we could provide a way to specify a dbt model to dump.

Besides this approach being easier to implement, the reason to take this approach would be to have a holistic end-to-end workflow that supports versioning and reproducibility for structured data, starting from ETL all the way to model training and deployment (similar to how dvcx may be upstream of dvc for unstructured data workflows).

Edit: I should also mention downsides to this approach. It's opinionated and requires learning another fairly complex tool.

@skshetry
Copy link
Member

Different databases offer different authentication and configuration mechanisms. For example, Snowflake supports username/password auth, key-pair auths, MFA, SSO, etc. Redshift supports password-based auth and IAM based authentication.

Similarly, bigquery supports oauth, service-account based login, etc. And these are only a few databases out of many that we'd have to support. Also, each database requires a driver/connector to run.

Even if we do start with password-based auth, it is likely going to turn into something more complicated soon, so I don't find the complexity worth it for the import-db feature (which in a way is already possible via user's script).

I am trying to look into dbt, to see if we can reuse their "Connection Profiles" to run the database queries. They have the concept of adapters which are implemented for each of the databases, with some maintained by dbt-labs, and some community maintained. See https://docs.getdbt.com/docs/core/connect-data-platform/about-core-connections.

Similarly, we could reuse source freshness implementation from dbt for dependencies.
(See dbt-labs/dbt-core#7012 which we would benefit from if we can somehow use dbt).

dbt seems risky because they are primarily a CLI application, and internals may not be stable or accessible. I'll try looking deeper into their internals.

Also might be worth discussing from other integrations POV with dbt.

@dberenbaum
Copy link
Collaborator Author

@skshetry @shcheklein To provide a bit more concrete proposal from the thoughts above about dbt integration, let me try to start with the query and dump use case and explain how it might work with dbt.

Assumptions:

  • User relies on dbt to setup a database connection profile
  • dvc can use dbt to access the database and then can write the data to a file (I'm not aware that dbt has any way to dump to files)

The user can write their query as a dbt model. This looks like a templated select query, but dbt automatically saves it to a table (it's also possible to save it as a view or a few other variations of how to materialize the model).

Then they can dump the results to a dvc-tracked file:

dvc import-dbt . customers

The arguments are a path to the dbt repo and the name of the dbt model to import. This could perform a select * from customers and write those results to a file like customers.csv tracked by customers.csv.dvc (path and output format could be configurable).

Some benefits of this approach:

  • dbt handles many database connections
  • dbt enforces best practices like saving query results to a table
  • dbt can run a full database pipeline over many data sources and multiple queries
  • dvc can inspect dbt information about the table and query

Questions:

  • Should import-dbt (optionally?) perform dbt run to update the query results first? Does dbt cache the results or will it re-run the query each time?
  • Can snapshots and/or freshness be used to auto-detect updates?
  • What info can we get from dbt and what is useful?
  • What about dbt models as pipeline dependencies (for example, to stream the results of a dbt model into code and aggregate or transform in some non-sql script before writing to a file; this may conflict with dbt python models)?
  • Should there also be import-dvcx or should there be a dvcx-dbt connector that makes dvcx work with this dbt integration?
  • Is there interest in this from the dbt side?

@skshetry
Copy link
Member

skshetry commented Oct 12, 2023

There is also dbt-fal that provides a function to get a pandas dataframe from a dbt model. But it's one more thing on top of dbt that user should know (but could be a good starting point to take inspiration from).

@dberenbaum
Copy link
Collaborator Author

@dberenbaum
Copy link
Collaborator Author

Discussed with @skshetry to move forward with a proof of concept on this that we can use to:

  • try out the workflow
  • use as a framework that dvcx can use for integration
  • reach out to dbt with something concrete to show

@dberenbaum
Copy link
Collaborator Author

@skshetry Should we consider Delta Lake tables done since Databricks is a supported backend for dbt? I don't think we can prioritize another effort specific to Delta Lake right now.

@dberenbaum
Copy link
Collaborator Author

Now that we have import-db, I'm going to close this and open a new one for streaming support related to #10164.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
epic Umbrella issue (high level). Include here: list of tasks/PRs, details, Qs, timelines, etc p1-important Important, aka current backlog of things to do
Projects
None yet
Development

No branches or pull requests

2 participants