-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CT-3231] [Bug] Warehouse connection and post-run-hook logs get conflated when using dbtRunner #8866
Comments
@racheldaniel Awesome to jam with you earlier today to try to get to the bottom of this! Python script for further explorationHere's a Python script that we can tweak to filter out just the JSON objects that we want to examine further: python script# runner.py
import json
import time
from dbt.cli.main import dbtRunner
# Specify the project & model name to examine
PROJECT_NAME = "jaffle_shop"
MODEL_NAME = "stg_customers"
# Amount of time to pause between runs (in seconds)
PAUSE_BETWEEN_RUNS = 1 # in seconds
# Event names to filter upon
EVENT_NAMES = ("NewConnection", "LogHookEndLine")
NODE_NAME = f"{PROJECT_NAME}-on-run-end-0"
UNIQUE_ID = f"operation.{PROJECT_NAME}.{NODE_NAME}"
# Specify the path to dbt logs in JSON Lines (JSON-L) format
LOG_PATH = "logs/dbt.log"
def log_hook_events(json_obj):
"""Filter to check for specific event names"""
return (
"info" in json_obj
and "name" in json_obj["info"]
and json_obj["info"]["name"] in EVENT_NAMES
)
def node_name(json_obj):
"""Filter to check for a specific node name"""
return (
"data" in json_obj
and "node_info" in json_obj["data"]
and "node_name" in json_obj["data"]["node_info"]
and json_obj["data"]["node_info"]["node_name"] == NODE_NAME
)
def node_unique_id(json_obj):
"""Filter to check for a specific node name"""
return (
"data" in json_obj
and "node_info" in json_obj["data"]
and "unique_id" in json_obj["data"]["node_info"]
and json_obj["data"]["node_info"]["unique_id"] == UNIQUE_ID
)
def filter_jsonl(file_path, json_filter_func=log_hook_events):
filtered_objects = []
try:
with open(file_path, "r") as file:
# Read the file line by line and parse each JSON object
for line in file:
json_obj = json.loads(line)
# Check whether the callback function evaluates to True for this JSON object
if json_filter_func(json_obj):
filtered_objects.append(json_obj)
except FileNotFoundError:
print(f"The file {file_path} does not exist.")
except json.JSONDecodeError:
print("A line in the file does not contain valid JSON.")
except Exception as e:
print(f"An unexpected error occurred: {e}")
return filtered_objects
def filter_json_logs(filter_func=log_hook_events, show=True):
matches = filter_jsonl(LOG_PATH, filter_func)
print(f"Found {len(matches)} JSON objects that match your filter")
if show:
# Pretty-print each JSON object
for json_obj in matches:
print(json.dumps(json_obj, indent=4))
return matches
def main():
dbt = dbtRunner()
cmd = [
"--quiet",
"--log-format=json",
"--log-level=debug",
"run",
"--select",
MODEL_NAME,
]
dbt.invoke(cmd)
# Use a function to filter JSON logs. Options are:
# - log_hook_events
# - node_name
# - node_unique_id
filter_json_logs(filter_func=log_hook_events, show=True)
if __name__ == "__main__":
main() Commands:dbt clean
python runner.py
sleep 60
python runner.py Here's the full output:
Here's just the final JSON objects that we filtered for:{
"data": {
"conn_name": "master",
"conn_type": "postgres"
},
"info": {
"category": "",
"code": "E005",
"extra": {},
"invocation_id": "6f4b7e9f-7fcd-403d-a931-d20557c2a3b7",
"level": "debug",
"msg": "Acquiring new postgres connection 'master'",
"name": "NewConnection",
"pid": 41233,
"thread": "MainThread",
"ts": "2023-10-27T17:42:40.145200Z"
}
}
{
"data": {
"conn_name": "list_postgres",
"conn_type": "postgres"
},
"info": {
"category": "",
"code": "E005",
"extra": {},
"invocation_id": "6f4b7e9f-7fcd-403d-a931-d20557c2a3b7",
"level": "debug",
"msg": "Acquiring new postgres connection 'list_postgres'",
"name": "NewConnection",
"pid": 41233,
"thread": "ThreadPoolExecutor-0_0",
"ts": "2023-10-27T17:42:40.146457Z"
}
}
{
"data": {
"execution_time": 0.0040040016,
"index": 1,
"node_info": {
"materialized": "view",
"meta": {},
"node_finished_at": "2023-10-27T17:42:40.603161",
"node_name": "jaffle_shop-on-run-end-0",
"node_path": "hooks/jaffle_shop-on-run-end-0.sql",
"node_relation": {
"alias": "jaffle_shop-on-run-end-0",
"database": "postgres",
"relation_name": "",
"schema": "dbt_dbeatty"
},
"node_started_at": "2023-10-27T17:42:40.595296",
"node_status": "success",
"resource_type": "operation",
"unique_id": "operation.jaffle_shop.jaffle_shop-on-run-end-0"
},
"statement": "jaffle_shop.on-run-end.0",
"status": "SELECT 1",
"total": 1
},
"info": {
"category": "",
"code": "Q033",
"extra": {},
"invocation_id": "6f4b7e9f-7fcd-403d-a931-d20557c2a3b7",
"level": "info",
"msg": "1 of 1 OK hook: jaffle_shop.on-run-end.0 ....................................... [\u001b[32mSELECT 1\u001b[0m in 0.00s]",
"name": "LogHookEndLine",
"pid": 41233,
"thread": "MainThread",
"ts": "2023-10-27T17:42:40.603343Z"
}
}
{
"data": {
"conn_name": "master",
"conn_type": "postgres"
},
"info": {
"category": "",
"code": "E005",
"extra": {},
"invocation_id": "93134ad8-520f-49b1-8982-20e04bb9c209",
"level": "debug",
"msg": "Acquiring new postgres connection 'master'",
"name": "NewConnection",
"pid": 42018,
"thread": "MainThread",
"ts": "2023-10-27T17:43:44.788232Z"
}
}
{
"data": {
"conn_name": "list_postgres",
"conn_type": "postgres"
},
"info": {
"category": "",
"code": "E005",
"extra": {},
"invocation_id": "93134ad8-520f-49b1-8982-20e04bb9c209",
"level": "debug",
"msg": "Acquiring new postgres connection 'list_postgres'",
"name": "NewConnection",
"pid": 42018,
"thread": "ThreadPoolExecutor-0_0",
"ts": "2023-10-27T17:43:44.789530Z"
}
}
{
"data": {
"execution_time": 0.0034229755,
"index": 1,
"node_info": {
"materialized": "view",
"meta": {},
"node_finished_at": "2023-10-27T17:43:45.107894",
"node_name": "jaffle_shop-on-run-end-0",
"node_path": "hooks/jaffle_shop-on-run-end-0.sql",
"node_relation": {
"alias": "jaffle_shop-on-run-end-0",
"database": "postgres",
"relation_name": "",
"schema": "dbt_dbeatty"
},
"node_started_at": "2023-10-27T17:43:45.100886",
"node_status": "success",
"resource_type": "operation",
"unique_id": "operation.jaffle_shop.jaffle_shop-on-run-end-0"
},
"statement": "jaffle_shop.on-run-end.0",
"status": "SELECT 1",
"total": 1
},
"info": {
"category": "",
"code": "Q033",
"extra": {},
"invocation_id": "93134ad8-520f-49b1-8982-20e04bb9c209",
"level": "info",
"msg": "1 of 1 OK hook: jaffle_shop.on-run-end.0 ....................................... [\u001b[32mSELECT 1\u001b[0m in 0.00s]",
"name": "LogHookEndLine",
"pid": 42018,
"thread": "MainThread",
"ts": "2023-10-27T17:43:45.108046Z"
}
} Making tweaksThe script assumes the following project name and model name, which are configurable: PROJECT_NAME = "jaffle_shop"
MODEL_NAME = "stg_customers" If we want to change the events to filter upon, update this section: EVENT_NAMES = ("NewConnection", "LogHookEndLine") If we want to filter by node name instead of the "NewConnection" and "LogHookEndLine" events, change this: filter_json_logs(filter_func=log_hook_events, show=True) to be this instead: filter_json_logs(filter_func=node_name, show=True) |
Thank you, Doug! Per our conversation Friday, here is some clarifying information and some cleaner data for review: In the ticket description the log excerpts I pasted in the ticket body were not from the same logs as those I linked, which is likely confusing. Here I've generated logs again following the same steps in the ticket. Here are the problematic lines to note: In
This event contains a node_started_at timestamp that you'll notice is before the first log's ts timestamp 2023-10-27T16:31:35.589978Z. So where did that node_started_at timestamp come from? If we look back at the logs from the first run in run_hook_logs_1.json, we can see that the timestamp matches the LogHookStartLine from the previous run:
I bring this up because it suggests some kind of caching going on in core, but the real problem this is causing in the IDE is that we do calculations for total elapsed time based on the node_started_at and node_finished_at fields, and because that first NewConnection event is coming across as associated with the Please let me know how I can further assist! |
I see what you are saying @racheldaniel. run_hook_logs_1 -- all 7 eventsFiltering run_hook_logs_1.json for events related to Click to toggle JSON{
"info": {
"name": "WritingInjectedSQLForNode",
"msg": "Writing injected SQL for node \"operation.jaffle_shop.jaffle_shop-on-run-end-0\"",
"invocation_id": "13428cbb-a2c6-4c11-a282-feb52cfcb0e5",
"ts": "2023-10-27T16:26:26.156651Z"
},
"data": {
"node_info": {
"node_name": "jaffle_shop-on-run-end-0",
"node_started_at": "2023-10-27T16:26:26.153592",
"node_finished_at": "",
"node_status": "started"
}
}
}
{
"info": {
"name": "LogHookStartLine",
"msg": "1 of 1 START hook: jaffle_shop.on-run-end.0 .................................... [RUN]",
"invocation_id": "13428cbb-a2c6-4c11-a282-feb52cfcb0e5",
"ts": "2023-10-27T16:26:26.159236Z"
},
"data": {
"node_info": {
"node_name": "jaffle_shop-on-run-end-0",
"node_started_at": "2023-10-27T16:26:26.153592",
"node_finished_at": "",
"node_status": "started"
}
}
}
{
"info": {
"name": "ConnectionUsed",
"msg": "Using postgres connection \"master\"",
"invocation_id": "13428cbb-a2c6-4c11-a282-feb52cfcb0e5",
"ts": "2023-10-27T16:26:26.160144Z"
},
"data": {
"node_info": {
"node_name": "jaffle_shop-on-run-end-0",
"node_started_at": "2023-10-27T16:26:26.153592",
"node_finished_at": "",
"node_status": "started"
}
}
}
{
"info": {
"name": "SQLQuery",
"msg": "On master: /* {\"app\": \"dbt\", \"dbt_version\": \"1.6.6\", \"profile_name\": \"user\", \"target_name\": \"default\", \"connection_name\": \"master\"} */\nselect 1;",
"invocation_id": "13428cbb-a2c6-4c11-a282-feb52cfcb0e5",
"ts": "2023-10-27T16:26:26.160914Z"
},
"data": {
"node_info": {
"node_name": "jaffle_shop-on-run-end-0",
"node_started_at": "2023-10-27T16:26:26.153592",
"node_finished_at": "",
"node_status": "started"
}
}
}
{
"info": {
"name": "SQLQueryStatus",
"msg": "SQL status: SELECT 1 in 0.0 seconds",
"invocation_id": "13428cbb-a2c6-4c11-a282-feb52cfcb0e5",
"ts": "2023-10-27T16:26:26.192099Z"
},
"data": {
"node_info": {
"node_name": "jaffle_shop-on-run-end-0",
"node_started_at": "2023-10-27T16:26:26.153592",
"node_finished_at": "",
"node_status": "started"
}
}
}
{
"info": {
"name": "LogHookEndLine",
"msg": "1 of 1 OK hook: jaffle_shop.on-run-end.0 ....................................... [\u001b[32mSELECT 1\u001b[0m in 0.03s]",
"invocation_id": "13428cbb-a2c6-4c11-a282-feb52cfcb0e5",
"ts": "2023-10-27T16:26:26.193827Z"
},
"data": {
"node_info": {
"node_name": "jaffle_shop-on-run-end-0",
"node_started_at": "2023-10-27T16:26:26.153592",
"node_finished_at": "2023-10-27T16:26:26.193613",
"node_status": "success"
}
}
}
{
"info": {
"name": "ConnectionClosed",
"msg": "On master: Close",
"invocation_id": "13428cbb-a2c6-4c11-a282-feb52cfcb0e5",
"ts": "2023-10-27T16:26:26.195333Z"
},
"data": {
"node_info": {
"node_name": "jaffle_shop-on-run-end-0",
"node_started_at": "2023-10-27T16:26:26.153592",
"node_finished_at": "2023-10-27T16:26:26.193613",
"node_status": "success"
}
}
} run_hook_logs_2 -- first 25 eventsDoing the same filter on run_hook_logs_2.json for events related to The first 25 all have Click to toggle JSON{
"info": {
"name": "NewConnection",
"msg": "Acquiring new postgres connection 'master'",
"invocation_id": "6236acc0-e604-4827-a297-d797873695df",
"ts": "2023-10-27T16:31:35.955797Z"
},
"data": {
"node_info": {
"node_name": "jaffle_shop-on-run-end-0",
"node_started_at": "2023-10-27T16:26:26.153592",
"node_finished_at": "2023-10-27T16:26:26.193613",
"node_status": "success"
}
}
}
{
"info": {
"name": "ConnectionUsed",
"msg": "Using postgres connection \"master\"",
"invocation_id": "6236acc0-e604-4827-a297-d797873695df",
"ts": "2023-10-27T16:31:36.500301Z"
},
"data": {
"node_info": {
"node_name": "jaffle_shop-on-run-end-0",
"node_started_at": "2023-10-27T16:26:26.153592",
"node_finished_at": "2023-10-27T16:26:26.193613",
"node_status": "success"
}
}
}
{
"info": {
"name": "SQLQuery",
"msg": "On master: BEGIN",
"invocation_id": "6236acc0-e604-4827-a297-d797873695df",
"ts": "2023-10-27T16:31:36.501344Z"
},
"data": {
"node_info": {
"node_name": "jaffle_shop-on-run-end-0",
"node_started_at": "2023-10-27T16:26:26.153592",
"node_finished_at": "2023-10-27T16:26:26.193613",
"node_status": "success"
}
}
}
{
"info": {
"name": "NewConnectionOpening",
"msg": "Opening a new connection, currently in state init",
"invocation_id": "6236acc0-e604-4827-a297-d797873695df",
"ts": "2023-10-27T16:31:36.502637Z"
},
"data": {
"node_info": {
"node_name": "jaffle_shop-on-run-end-0",
"node_started_at": "2023-10-27T16:26:26.153592",
"node_finished_at": "2023-10-27T16:26:26.193613",
"node_status": "success"
}
}
}
{
"info": {
"name": "SQLQueryStatus",
"msg": "SQL status: BEGIN in 0.0 seconds",
"invocation_id": "6236acc0-e604-4827-a297-d797873695df",
"ts": "2023-10-27T16:31:36.689137Z"
},
"data": {
"node_info": {
"node_name": "jaffle_shop-on-run-end-0",
"node_started_at": "2023-10-27T16:26:26.153592",
"node_finished_at": "2023-10-27T16:26:26.193613",
"node_status": "success"
}
}
}
{
"info": {
"name": "ConnectionUsed",
"msg": "Using postgres connection \"master\"",
"invocation_id": "6236acc0-e604-4827-a297-d797873695df",
"ts": "2023-10-27T16:31:36.691572Z"
},
"data": {
"node_info": {
"node_name": "jaffle_shop-on-run-end-0",
"node_started_at": "2023-10-27T16:26:26.153592",
"node_finished_at": "2023-10-27T16:26:26.193613",
"node_status": "success"
}
}
}
{
"info": {
"name": "SQLQuery",
"msg": "On master: /* {\"app\": \"dbt\", \"dbt_version\": \"1.6.6\", \"profile_name\": \"user\", \"target_name\": \"default\", \"connection_name\": \"master\"} */\nwith relation as (\n select\n pg_rewrite.ev_class as class,\n pg_rewrite.oid as id\n from pg_rewrite\n ),\n class as (\n select\n oid as id,\n relname as name,\n relnamespace as schema,\n relkind as kind\n from pg_class\n ),\n dependency as (\n select distinct\n pg_depend.objid as id,\n pg_depend.refobjid as ref\n from pg_depend\n ),\n schema as (\n select\n pg_namespace.oid as id,\n pg_namespace.nspname as name\n from pg_namespace\n where nspname != 'information_schema' and nspname not like 'pg\\_%'\n ),\n referenced as (\n select\n relation.id AS id,\n referenced_class.name ,\n referenced_class.schema ,\n referenced_class.kind\n from relation\n join class as referenced_class on relation.class=referenced_class.id\n where referenced_class.kind in ('r', 'v', 'm')\n ),\n relationships as (\n select\n referenced.name as referenced_name,\n referenced.schema as referenced_schema_id,\n dependent_class.name as dependent_name,\n dependent_class.schema as dependent_schema_id,\n referenced.kind as kind\n from referenced\n join dependency on referenced.id=dependency.id\n join class as dependent_class on dependency.ref=dependent_class.id\n where\n (referenced.name != dependent_class.name or\n referenced.schema != dependent_class.schema)\n )\n\n select\n referenced_schema.name as referenced_schema,\n relationships.referenced_name as referenced_name,\n dependent_schema.name as dependent_schema,\n relationships.dependent_name as dependent_name\n from relationships\n join schema as dependent_schema on relationships.dependent_schema_id=dependent_schema.id\n join schema as referenced_schema on relationships.referenced_schema_id=referenced_schema.id\n group by referenced_schema, referenced_name, dependent_schema, dependent_name\n order by referenced_schema, referenced_name, dependent_schema, dependent_name;",
"invocation_id": "6236acc0-e604-4827-a297-d797873695df",
"ts": "2023-10-27T16:31:36.694010Z"
},
"data": {
"node_info": {
"node_name": "jaffle_shop-on-run-end-0",
"node_started_at": "2023-10-27T16:26:26.153592",
"node_finished_at": "2023-10-27T16:26:26.193613",
"node_status": "success"
}
}
}
{
"info": {
"name": "SQLQueryStatus",
"msg": "SQL status: SELECT 17 in 0.0 seconds",
"invocation_id": "6236acc0-e604-4827-a297-d797873695df",
"ts": "2023-10-27T16:31:36.750521Z"
},
"data": {
"node_info": {
"node_name": "jaffle_shop-on-run-end-0",
"node_started_at": "2023-10-27T16:26:26.153592",
"node_finished_at": "2023-10-27T16:26:26.193613",
"node_status": "success"
}
}
}
{
"info": {
"name": "Rollback",
"msg": "On master: ROLLBACK",
"invocation_id": "6236acc0-e604-4827-a297-d797873695df",
"ts": "2023-10-27T16:31:36.759279Z"
},
"data": {
"node_info": {
"node_name": "jaffle_shop-on-run-end-0",
"node_started_at": "2023-10-27T16:26:26.153592",
"node_finished_at": "2023-10-27T16:26:26.193613",
"node_status": "success"
}
}
}
{
"info": {
"name": "ConnectionUsed",
"msg": "Using postgres connection \"master\"",
"invocation_id": "6236acc0-e604-4827-a297-d797873695df",
"ts": "2023-10-27T16:31:36.795450Z"
},
"data": {
"node_info": {
"node_name": "jaffle_shop-on-run-end-0",
"node_started_at": "2023-10-27T16:26:26.153592",
"node_finished_at": "2023-10-27T16:26:26.193613",
"node_status": "success"
}
}
}
{
"info": {
"name": "SQLQuery",
"msg": "On master: BEGIN",
"invocation_id": "6236acc0-e604-4827-a297-d797873695df",
"ts": "2023-10-27T16:31:36.796619Z"
},
"data": {
"node_info": {
"node_name": "jaffle_shop-on-run-end-0",
"node_started_at": "2023-10-27T16:26:26.153592",
"node_finished_at": "2023-10-27T16:26:26.193613",
"node_status": "success"
}
}
}
{
"info": {
"name": "SQLQueryStatus",
"msg": "SQL status: BEGIN in 0.0 seconds",
"invocation_id": "6236acc0-e604-4827-a297-d797873695df",
"ts": "2023-10-27T16:31:36.858170Z"
},
"data": {
"node_info": {
"node_name": "jaffle_shop-on-run-end-0",
"node_started_at": "2023-10-27T16:26:26.153592",
"node_finished_at": "2023-10-27T16:26:26.193613",
"node_status": "success"
}
}
}
{
"info": {
"name": "SQLCommit",
"msg": "On master: COMMIT",
"invocation_id": "6236acc0-e604-4827-a297-d797873695df",
"ts": "2023-10-27T16:31:36.859935Z"
},
"data": {
"node_info": {
"node_name": "jaffle_shop-on-run-end-0",
"node_started_at": "2023-10-27T16:26:26.153592",
"node_finished_at": "2023-10-27T16:26:26.193613",
"node_status": "success"
}
}
}
{
"info": {
"name": "ConnectionUsed",
"msg": "Using postgres connection \"master\"",
"invocation_id": "6236acc0-e604-4827-a297-d797873695df",
"ts": "2023-10-27T16:31:36.861057Z"
},
"data": {
"node_info": {
"node_name": "jaffle_shop-on-run-end-0",
"node_started_at": "2023-10-27T16:26:26.153592",
"node_finished_at": "2023-10-27T16:26:26.193613",
"node_status": "success"
}
}
}
{
"info": {
"name": "SQLQuery",
"msg": "On master: COMMIT",
"invocation_id": "6236acc0-e604-4827-a297-d797873695df",
"ts": "2023-10-27T16:31:36.862185Z"
},
"data": {
"node_info": {
"node_name": "jaffle_shop-on-run-end-0",
"node_started_at": "2023-10-27T16:26:26.153592",
"node_finished_at": "2023-10-27T16:26:26.193613",
"node_status": "success"
}
}
}
{
"info": {
"name": "SQLQueryStatus",
"msg": "SQL status: COMMIT in 0.0 seconds",
"invocation_id": "6236acc0-e604-4827-a297-d797873695df",
"ts": "2023-10-27T16:31:36.894646Z"
},
"data": {
"node_info": {
"node_name": "jaffle_shop-on-run-end-0",
"node_started_at": "2023-10-27T16:26:26.153592",
"node_finished_at": "2023-10-27T16:26:26.193613",
"node_status": "success"
}
}
}
{
"info": {
"name": "ConnectionClosed",
"msg": "On master: Close",
"invocation_id": "6236acc0-e604-4827-a297-d797873695df",
"ts": "2023-10-27T16:31:36.896428Z"
},
"data": {
"node_info": {
"node_name": "jaffle_shop-on-run-end-0",
"node_started_at": "2023-10-27T16:26:26.153592",
"node_finished_at": "2023-10-27T16:26:26.193613",
"node_status": "success"
}
}
}
{
"info": {
"name": "ConnectionUsed",
"msg": "Using postgres connection \"master\"",
"invocation_id": "6236acc0-e604-4827-a297-d797873695df",
"ts": "2023-10-27T16:31:37.314347Z"
},
"data": {
"node_info": {
"node_name": "jaffle_shop-on-run-end-0",
"node_started_at": "2023-10-27T16:26:26.153592",
"node_finished_at": "2023-10-27T16:26:26.193613",
"node_status": "success"
}
}
}
{
"info": {
"name": "SQLQuery",
"msg": "On master: BEGIN",
"invocation_id": "6236acc0-e604-4827-a297-d797873695df",
"ts": "2023-10-27T16:31:37.315668Z"
},
"data": {
"node_info": {
"node_name": "jaffle_shop-on-run-end-0",
"node_started_at": "2023-10-27T16:26:26.153592",
"node_finished_at": "2023-10-27T16:26:26.193613",
"node_status": "success"
}
}
}
{
"info": {
"name": "NewConnectionOpening",
"msg": "Opening a new connection, currently in state closed",
"invocation_id": "6236acc0-e604-4827-a297-d797873695df",
"ts": "2023-10-27T16:31:37.316718Z"
},
"data": {
"node_info": {
"node_name": "jaffle_shop-on-run-end-0",
"node_started_at": "2023-10-27T16:26:26.153592",
"node_finished_at": "2023-10-27T16:26:26.193613",
"node_status": "success"
}
}
}
{
"info": {
"name": "SQLQueryStatus",
"msg": "SQL status: BEGIN in 0.0 seconds",
"invocation_id": "6236acc0-e604-4827-a297-d797873695df",
"ts": "2023-10-27T16:31:37.497314Z"
},
"data": {
"node_info": {
"node_name": "jaffle_shop-on-run-end-0",
"node_started_at": "2023-10-27T16:26:26.153592",
"node_finished_at": "2023-10-27T16:26:26.193613",
"node_status": "success"
}
}
}
{
"info": {
"name": "SQLCommit",
"msg": "On master: COMMIT",
"invocation_id": "6236acc0-e604-4827-a297-d797873695df",
"ts": "2023-10-27T16:31:37.498355Z"
},
"data": {
"node_info": {
"node_name": "jaffle_shop-on-run-end-0",
"node_started_at": "2023-10-27T16:26:26.153592",
"node_finished_at": "2023-10-27T16:26:26.193613",
"node_status": "success"
}
}
}
{
"info": {
"name": "ConnectionUsed",
"msg": "Using postgres connection \"master\"",
"invocation_id": "6236acc0-e604-4827-a297-d797873695df",
"ts": "2023-10-27T16:31:37.499139Z"
},
"data": {
"node_info": {
"node_name": "jaffle_shop-on-run-end-0",
"node_started_at": "2023-10-27T16:26:26.153592",
"node_finished_at": "2023-10-27T16:26:26.193613",
"node_status": "success"
}
}
}
{
"info": {
"name": "SQLQuery",
"msg": "On master: COMMIT",
"invocation_id": "6236acc0-e604-4827-a297-d797873695df",
"ts": "2023-10-27T16:31:37.499877Z"
},
"data": {
"node_info": {
"node_name": "jaffle_shop-on-run-end-0",
"node_started_at": "2023-10-27T16:26:26.153592",
"node_finished_at": "2023-10-27T16:26:26.193613",
"node_status": "success"
}
}
}
{
"info": {
"name": "SQLQueryStatus",
"msg": "SQL status: COMMIT in 0.0 seconds",
"invocation_id": "6236acc0-e604-4827-a297-d797873695df",
"ts": "2023-10-27T16:31:37.529682Z"
},
"data": {
"node_info": {
"node_name": "jaffle_shop-on-run-end-0",
"node_started_at": "2023-10-27T16:26:26.153592",
"node_finished_at": "2023-10-27T16:26:26.193613",
"node_status": "success"
}
}
} run_hook_logs_2 -- last 7 eventsThe final 7 have Click to toggle JSON{
"info": {
"name": "WritingInjectedSQLForNode",
"msg": "Writing injected SQL for node \"operation.jaffle_shop.jaffle_shop-on-run-end-0\"",
"invocation_id": "6236acc0-e604-4827-a297-d797873695df",
"ts": "2023-10-27T16:31:37.535097Z"
},
"data": {
"node_info": {
"node_name": "jaffle_shop-on-run-end-0",
"node_started_at": "2023-10-27T16:31:37.532037",
"node_finished_at": "",
"node_status": "started"
}
}
}
{
"info": {
"name": "LogHookStartLine",
"msg": "1 of 1 START hook: jaffle_shop.on-run-end.0 .................................... [RUN]",
"invocation_id": "6236acc0-e604-4827-a297-d797873695df",
"ts": "2023-10-27T16:31:37.537988Z"
},
"data": {
"node_info": {
"node_name": "jaffle_shop-on-run-end-0",
"node_started_at": "2023-10-27T16:31:37.532037",
"node_finished_at": "",
"node_status": "started"
}
}
}
{
"info": {
"name": "ConnectionUsed",
"msg": "Using postgres connection \"master\"",
"invocation_id": "6236acc0-e604-4827-a297-d797873695df",
"ts": "2023-10-27T16:31:37.539327Z"
},
"data": {
"node_info": {
"node_name": "jaffle_shop-on-run-end-0",
"node_started_at": "2023-10-27T16:31:37.532037",
"node_finished_at": "",
"node_status": "started"
}
}
}
{
"info": {
"name": "SQLQuery",
"msg": "On master: /* {\"app\": \"dbt\", \"dbt_version\": \"1.6.6\", \"profile_name\": \"user\", \"target_name\": \"default\", \"connection_name\": \"master\"} */\nselect 1;",
"invocation_id": "6236acc0-e604-4827-a297-d797873695df",
"ts": "2023-10-27T16:31:37.540237Z"
},
"data": {
"node_info": {
"node_name": "jaffle_shop-on-run-end-0",
"node_started_at": "2023-10-27T16:31:37.532037",
"node_finished_at": "",
"node_status": "started"
}
}
}
{
"info": {
"name": "SQLQueryStatus",
"msg": "SQL status: SELECT 1 in 0.0 seconds",
"invocation_id": "6236acc0-e604-4827-a297-d797873695df",
"ts": "2023-10-27T16:31:37.569072Z"
},
"data": {
"node_info": {
"node_name": "jaffle_shop-on-run-end-0",
"node_started_at": "2023-10-27T16:31:37.532037",
"node_finished_at": "",
"node_status": "started"
}
}
}
{
"info": {
"name": "LogHookEndLine",
"msg": "1 of 1 OK hook: jaffle_shop.on-run-end.0 ....................................... [\u001b[32mSELECT 1\u001b[0m in 0.03s]",
"invocation_id": "6236acc0-e604-4827-a297-d797873695df",
"ts": "2023-10-27T16:31:37.571483Z"
},
"data": {
"node_info": {
"node_name": "jaffle_shop-on-run-end-0",
"node_started_at": "2023-10-27T16:31:37.532037",
"node_finished_at": "2023-10-27T16:31:37.571084",
"node_status": "success"
}
}
}
{
"info": {
"name": "ConnectionClosed",
"msg": "On master: Close",
"invocation_id": "6236acc0-e604-4827-a297-d797873695df",
"ts": "2023-10-27T16:31:37.575315Z"
},
"data": {
"node_info": {
"node_name": "jaffle_shop-on-run-end-0",
"node_started_at": "2023-10-27T16:31:37.532037",
"node_finished_at": "2023-10-27T16:31:37.571084",
"node_status": "success"
}
}
} I don't know the reason for the "extra" 25 events in I think we'll want to create a version of Could you help try to do something like this that reproduces what you are seeing? |
@dbeatty10 Sure, let me see if I can reproduce via script |
@dbeatty10 I tweaked your script so that it now does two runs, produces two output files for comparison, and creates an additional file that just has the node names, events and timestamps from those two runs. I'm doing this with postgres, and the only thing I've changed about the standard jaffle_shop project is that I added the following to my dbt_project.yml:
I'm sure all of the changes I made were not necessary-- for example I added some cleanup of old log files just to make absolutely sure file writes had nothing to do with the duplicate logs. Let me know if this doesn't work for you for some reason! # test_runner.py
import json
import os
import time
from dbt.cli.main import dbtRunner
# Specify the project & model name to examine
PROJECT_NAME = "jaffle_shop"
MODEL_NAME = "stg_orders"
# Amount of time to pause between runs (in seconds)
PAUSE_BETWEEN_RUNS = 10 # in seconds
# Event names to filter upon
EVENT_NAMES = ("NewConnection", "LogHookEndLine")
NODE_NAME = f"{PROJECT_NAME}-on-run-end-0"
UNIQUE_ID = f"operation.{PROJECT_NAME}.{NODE_NAME}"
# Specify the path to dbt logs in JSON Lines (JSON-L) format
LOG_PATH = "logs/dbt.log"
def log_hook_events(json_obj):
"""Filter to check for specific event names"""
return (
"info" in json_obj
and "name" in json_obj["info"]
and json_obj["info"]["name"] in EVENT_NAMES
)
def node_name(json_obj):
"""Filter to check for a specific node name"""
return (
"data" in json_obj
and "node_info" in json_obj["data"]
and "node_name" in json_obj["data"]["node_info"]
and json_obj["data"]["node_info"]["node_name"] == NODE_NAME
)
def node_unique_id(json_obj):
"""Filter to check for a specific node name"""
return (
"data" in json_obj
and "node_info" in json_obj["data"]
and "unique_id" in json_obj["data"]["node_info"]
and json_obj["data"]["node_info"]["unique_id"] == UNIQUE_ID
)
def filter_jsonl(file_path, json_filter_func=log_hook_events):
filtered_objects = []
try:
with open(file_path, "r") as file:
# Read the file line by line and parse each JSON object
for line in file:
json_obj = json.loads(line)
# Check whether the callback function evaluates to True for this JSON object
if json_filter_func(json_obj):
filtered_objects.append(json_obj)
except FileNotFoundError:
print(f"The file {file_path} does not exist.")
except json.JSONDecodeError:
print("A line in the file does not contain valid JSON.")
except Exception as e:
print(f"An unexpected error occurred: {e}")
return filtered_objects
def filter_json_logs(filter_func=log_hook_events, show=True, output_file=None):
matches = filter_jsonl(LOG_PATH, filter_func)
print(f"Found {len(matches)} JSON objects that match your filter")
if output_file:
with open(output_file, "w") as file:
json.dump(matches, file, indent=4)
if show:
# Pretty-print each JSON object
for json_obj in matches:
print(json.dumps(json_obj, indent=4))
return matches
def compare_node_timestamps(file_path_1, file_path_2):
comparison_file = f"logs/{PROJECT_NAME}-{MODEL_NAME}-comparison.json"
run_nodes = []
def __make_json_obj(json_obj):
return {
'node_name': json_obj['data']['node_info']['node_name'],
'unique_id': json_obj['data']['node_info']['unique_id'],
'node_started_at': json_obj['data']['node_info']['node_started_at'],
'node_finished_at': json_obj['data']['node_info']['node_finished_at'],
'event_name': json_obj['info']['name']
}
def process_file(file_path):
try:
with open(file_path, "r") as file:
content = json.load(file)
for obj in content:
new_obj = __make_json_obj(obj)
run_nodes.append(new_obj)
except Exception as e:
print(f"An unexpected error occurred: {e}")
process_file(file_path_1)
run_nodes.append("___________________________END_RUN_1___________________________")
process_file(file_path_2)
with open(comparison_file, "w") as file:
json.dump(run_nodes, file, indent=4)
return run_nodes
def main(output_file=None):
dbt = dbtRunner()
cmd = [
"--log-format=json",
"--log-level=debug",
"run",
"--select",
MODEL_NAME,
]
dbt.invoke(cmd)
# Use a function to filter JSON logs. Options are:
# - log_hook_events
# - node_name
# - node_unique_id
filter_json_logs(filter_func=node_name, show=True, output_file=output_file)
def run_and_cleanup(output_file):
if os.path.exists(LOG_PATH):
os.remove(LOG_PATH)
if os.path.exists(output_file):
os.remove(output_file)
main(output_file)
if __name__ == "__main__":
output_file_1 = f"logs/{PROJECT_NAME}-{MODEL_NAME}-filtered-1.json"
run_and_cleanup(output_file_1)
time.sleep(PAUSE_BETWEEN_RUNS)
output_file_2 = f"logs/{PROJECT_NAME}-{MODEL_NAME}-filtered-2.json"
run_and_cleanup(output_file_2)
compare_node_timestamps(output_file_1, output_file_2) |
This works perfect @racheldaniel 🤩 Here's the steps that I used locally to run this end-to-end: ReprexClone the Jaffle Shop repo: git clone https://github.com/dbt-labs/jaffle_shop.git
cd jaffle_shop Copy-paste the Modify Then make sure the connection works: dbt debug Add the on-run-end:
- "select 1;" Run the script: python test_runner.py Then examine the output within In my local run, I can see the following the demonstrates the issue you are describing:
{
"node_name": "jaffle_shop-on-run-end-0",
"unique_id": "operation.jaffle_shop.jaffle_shop-on-run-end-0",
"node_started_at": "2023-10-31T16:13:21.526377",
"node_finished_at": "",
"event_name": "WritingInjectedSQLForNode"
},
"___________________________END_RUN_1___________________________",
{
"node_name": "jaffle_shop-on-run-end-0",
"unique_id": "operation.jaffle_shop.jaffle_shop-on-run-end-0",
"node_started_at": "2023-10-31T16:13:21.526377",
"node_finished_at": "2023-10-31T16:13:21.538143",
"event_name": "SQLQueryStatus"
},
{
"node_name": "jaffle_shop-on-run-end-0",
"unique_id": "operation.jaffle_shop.jaffle_shop-on-run-end-0",
"node_started_at": "2023-10-31T16:13:31.989195",
"node_finished_at": "",
"event_name": "WritingInjectedSQLForNode"
}, |
The node_info is started and cleared by a contextmanager, so it's possible that the hooks are outside of that. It's also possible that the event_status info is getting saved and restored where it shouldn't be. |
It looks like LogHookEndLine shouldn't have node_info, because it's for an entire run, not for the node in a call_runner. The adapter various Connection events sometimes are associated with a node and sometimes not. So I'm guessing that they are sometimes finding node_info that they shouldn't be. Still looking into that. |
Is this a new bug in dbt-core?
Current Behavior
When using a post-run hook in one's project and invoking dbt through dbtRunner, some of the data warehouse connection events (
NewConnection
,ConnectionUsed
,NewConnectionOpening
) come through in the logs as the nodehooks/{{dbt_project}}-on-run-end-0.sql
. These events containnode_started_at
andnode_finished_at
fields, and we see that thenode_started_at
time for these events will match thehooks/{{dbt_project}}-on-run-end-0.sql
node'snode_finished_at
time from the previous run .Ex. I did 2 consecutive runs of the command
dbt --log-format=json --log-level=debug run --select stg_customers
. I've attached the complete log files from those runs. Below are thenode_started_at
andnode_finished_at
times pulled from the logs for the two nodes that this command runs, including the timestamps that are shown for data warehouse events with the on-run-end-0 node name. Note the matching timestamps in boldRun 1
stg_customers
node_started_at
: 2023-10-17T15:39:34.591460on-run-end-0
node_started_at
NewConnection: 2023-10-17T15:39:29.441266on-run-end-0
node_started_at
LogHookStartLine: 2023-10-17T15:39:35.209166stg_customers
node_finished at
: 2023-10-17T15:39:34.974192on-run-end-0
node_finished_at
NewConnection: 2023-10-17T15:39:29.504932on-run-end-0 n
ode_finished_at
LogHookEndLine: 2023-10-17T15:39:35.263544Run 2
stg_customers
node_started_at
: 2023-10-17T15:52:40.656494on-run-end-0
node_started_at
NewConnection: 2023-10-17T15:39:35.209166on-run-end-0
node_started_at
LogHookEndLine: 2023-10-17T15:52:41.374772stg_customers
node_finished at
: 2023-10-17T15:52:41.134269on-run-end-0
node_finished_at
NewConnection: 2023-10-17T15:39:35.263544on-run-end-0
node_finished_at
LogHookEndLine: 2023-10-17T15:52:41.436292dbt-run-1.log
dbt-run-2.log
Expected Behavior
I would not expect to see any node data in a NewConnection event at all, much less node data that seems to reflect a different node from a prior run.
This is what the NewConnection events look like when using
dbtRunner
to execute:And here is what the same NewConnection events look like when running dbt normally from the command line:
Steps To Reproduce
dbt_project.yml
:NewConnection
event is associated with theon-run-end
nodenode_started_at
timestamp for thisNewConnection
event matches thenode_finished_at
timestamp from the previous run'sLogHookEndLine
eventRelevant log output
No response
Environment
Which database adapter are you using with dbt?
postgres
Additional Context
No response
The text was updated successfully, but these errors were encountered: