diff --git a/dbt/main.py b/dbt/main.py index 87271a81242..c6165c86b44 100644 --- a/dbt/main.py +++ b/dbt/main.py @@ -43,6 +43,8 @@ def handle(args): # correct profiles.yml file if not config.send_anonymous_usage_stats(parsed.profiles_dir): dbt.tracking.do_not_track() + else: + dbt.tracking.initialize_tracking() res = run_from_args(parsed) dbt.tracking.flush() @@ -92,10 +94,13 @@ def run_from_args(parsed): log_path = proj.get('log-path', 'logs') initialize_logger(parsed.debug, log_path) + logger.debug("Tracking: {}".format(dbt.tracking.active_user.state())) dbt.tracking.track_invocation_start(project=proj, args=parsed) + + result = None try: - return task.run() + result = task.run() dbt.tracking.track_invocation_end( project=proj, args=parsed, result_type="ok", result=None ) @@ -110,6 +115,8 @@ def run_from_args(parsed): ) raise + return result + def invoke_dbt(parsed): task = None diff --git a/dbt/runner.py b/dbt/runner.py index 5c1fa904593..77c31d12960 100644 --- a/dbt/runner.py +++ b/dbt/runner.py @@ -412,7 +412,7 @@ def call_table_exists(schema, table): self.context = { "run_started_at": datetime.now(), - "invocation_id": dbt.tracking.invocation_id, + "invocation_id": dbt.tracking.active_user.invocation_id, "get_columns_in_table": call_get_columns_in_table, "get_missing_columns": call_get_missing_columns, "already_exists": call_table_exists, @@ -575,8 +575,9 @@ def on_complete(run_model_results): run_model_result.execution_time ) + invocation_id = dbt.tracking.active_user.invocation_id dbt.tracking.track_model_run({ - "invocation_id": dbt.tracking.invocation_id, + "invocation_id": invocation_id, "index": index, "total": num_models, "execution_time": run_model_result.execution_time, diff --git a/dbt/tracking.py b/dbt/tracking.py index 8392168ea55..7e2b02be2f6 100644 --- a/dbt/tracking.py +++ b/dbt/tracking.py @@ -31,35 +31,56 @@ emitter = Emitter(COLLECTOR_URL, protocol=COLLECTOR_PROTOCOL, buffer_size=1) tracker = Tracker(emitter, namespace="cf", app_id="dbt") +active_user = None -def __write_user(): - user = { - "id": str(uuid.uuid4()) - } - cookie_dir = os.path.dirname(COOKIE_PATH) - if not os.path.exists(cookie_dir): - os.makedirs(cookie_dir) +class User(object): - with open(COOKIE_PATH, "w") as fh: - yaml.dump(user, fh) + def __init__(self): + self.do_not_track = True - return user + self.id = None + self.invocation_id = None + def state(self): + return "do not track" if self.do_not_track else "tracking" -def get_user(): - if os.path.isfile(COOKIE_PATH): - with open(COOKIE_PATH, "r") as fh: - try: - user = yaml.safe_load(fh) - if user is None: - user = __write_user() - except yaml.reader.ReaderError as e: - user = __write_user() - else: - user = __write_user() + def initialize(self): + self.do_not_track = False + + self.invocation_id = str(uuid.uuid4()) + + cookie = self.get_cookie() + self.id = cookie.get('id') + + subject = Subject() + subject.set_user_id(self.id) + tracker.set_subject(subject) + + def set_cookie(self): + cookie_dir = os.path.dirname(COOKIE_PATH) + user = {"id": str(uuid.uuid4())} + + if not os.path.exists(cookie_dir): + os.makedirs(cookie_dir) - return user + with open(COOKIE_PATH, "w") as fh: + yaml.dump(user, fh) + + return user + + def get_cookie(self): + if not os.path.isfile(COOKIE_PATH): + user = self.set_cookie() + else: + with open(COOKIE_PATH, "r") as fh: + try: + user = yaml.safe_load(fh) + if user is None: + user = self.set_cookie() + except yaml.reader.ReaderError as e: + user = self.set_cookie() + return user def get_options(args): @@ -75,11 +96,11 @@ def get_run_type(args): return 'regular' -def get_invocation_context(invocation_id, user, project, args): +def get_invocation_context(user, project, args): return { "project_id": None if project is None else project.hashed_name(), - "user_id": user.get("id", None), - "invocation_id": invocation_id, + "user_id": user.id, + "invocation_id": user.invocation_id, "command": args.which, "options": get_options(args), @@ -89,8 +110,8 @@ def get_invocation_context(invocation_id, user, project, args): } -def get_invocation_start_context(invocation_id, user, project, args): - data = get_invocation_context(invocation_id, user, project, args) +def get_invocation_start_context(user, project, args): + data = get_invocation_context(user, project, args) start_data = { "progress": "start", @@ -102,10 +123,8 @@ def get_invocation_start_context(invocation_id, user, project, args): return SelfDescribingJson(INVOCATION_SPEC, data) -def get_invocation_end_context( - invocation_id, user, project, args, result_type, result -): - data = get_invocation_context(invocation_id, user, project, args) +def get_invocation_end_context(user, project, args, result_type, result): + data = get_invocation_context(user, project, args) start_data = { "progress": "end", @@ -117,10 +136,8 @@ def get_invocation_end_context( return SelfDescribingJson(INVOCATION_SPEC, data) -def get_invocation_invalid_context( - invocation_id, user, project, args, result_type, result -): - data = get_invocation_context(invocation_id, user, project, args) +def get_invocation_invalid_context(user, project, args, result_type, result): + data = get_invocation_context(user, project, args) start_data = { "progress": "invalid", @@ -155,20 +172,9 @@ def get_dbt_env_context(): return SelfDescribingJson(INVOCATION_ENV_SPEC, data) -invocation_id = str(uuid.uuid4()) -platform_context = get_platform_context() -env_context = get_dbt_env_context() - -user = get_user() -subject = Subject() -subject.set_user_id(user.get("id", None)) -tracker.set_subject(subject) - -__is_do_not_track = False - -def track(*args, **kwargs): - if __is_do_not_track: +def track(user, *args, **kwargs): + if user.do_not_track: return else: logger.debug("Sending event: {}".format(kwargs)) @@ -181,20 +187,29 @@ def track(*args, **kwargs): def track_invocation_start(project=None, args=None): - invocation_context = get_invocation_start_context( - invocation_id, user, project, args + context = [ + get_invocation_start_context(active_user, project, args), + get_platform_context(), + get_dbt_env_context() + ] + + track( + active_user, + category="dbt", + action='invocation', + label='start', + context=context ) - context = [invocation_context, platform_context, env_context] - track(category="dbt", action='invocation', label='start', context=context) def track_model_run(options): context = [SelfDescribingJson(RUN_MODEL_SPEC, options)] model_id = options['model_id'] track( + active_user, category="dbt", action='run_model', - label=invocation_id, + label=active_user.invocation_id, context=context ) @@ -202,22 +217,46 @@ def track_model_run(options): def track_invocation_end( project=None, args=None, result_type=None, result=None ): - invocation_context = get_invocation_end_context( - invocation_id, user, project, args, result_type, result + user = active_user + context = [ + get_invocation_end_context(user, project, args, result_type, result), + get_platform_context(), + get_dbt_env_context() + ] + track( + active_user, + category="dbt", + action='invocation', + label='end', + context=context ) - context = [invocation_context, platform_context, env_context] - track(category="dbt", action='invocation', label='end', context=context) def track_invalid_invocation( project=None, args=None, result_type=None, result=None ): + + user = active_user invocation_context = get_invocation_invalid_context( - invocation_id, user, project, args, result_type, result + user, + project, + args, + result_type, + result ) - context = [invocation_context, platform_context, env_context] + + context = [ + invocation_context, + get_platform_context(), + get_dbt_env_context() + ] + track( - category="dbt", action='invocation', label='invalid', context=context + active_user, + category="dbt", + action='invocation', + label='invalid', + context=context ) @@ -227,6 +266,11 @@ def flush(): def do_not_track(): - global __is_do_not_track - logger.debug("Not sending anonymous usage events") - __is_do_not_track = True + global active_user + active_user = User() + + +def initialize_tracking(): + global active_user + active_user = User() + active_user.initialize()