Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-644849: Add telemetry about imported pacakages at runtime #1236

Merged
merged 5 commits into from
Sep 1, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/snowflake/connector/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
apilevel = "2.0"
threadsafety = 2
paramstyle = "pyformat"
log_imported_packages_in_telemetry = True
sfc-gh-mkeller marked this conversation as resolved.
Show resolved Hide resolved

import logging
from logging import NullHandler
Expand Down Expand Up @@ -90,4 +91,5 @@ def Connect(**kwargs) -> SnowflakeConnection:
"ROWID",
# Extended data type (experimental)
"Json",
"log_imported_packages_in_telemetry",
]
46 changes: 45 additions & 1 deletion src/snowflake/connector/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,12 @@
SnowflakeRestful,
)
from .sqlstate import SQLSTATE_CONNECTION_NOT_EXISTS, SQLSTATE_FEATURE_NOT_SUPPORTED
from .telemetry import TelemetryClient
from .telemetry import (
TelemetryClient,
TelemetryData,
TelemetryField,
generate_telemetry_data,
)
from .telemetry_oob import TelemetryService
from .time_util import HeartBeatTimer, get_time_millis
from .util_text import construct_hostname, parse_account, split_statements
Expand Down Expand Up @@ -192,6 +197,10 @@ def DefaultConverterClass():
None,
(type(None), str),
), # Path to connection diag whitelist json
"log_imported_packages_in_telemetry": (
None,
(type(None), bool),
sfc-gh-mkeller marked this conversation as resolved.
Show resolved Hide resolved
), # Whether to log imported packages in telemetry
}

APPLICATION_RE = re.compile(r"[\w\d_]+")
Expand Down Expand Up @@ -292,6 +301,9 @@ def __init__(self, **kwargs):
self.connect(**kwargs)
self._telemetry = TelemetryClient(self._rest)

# get the imported modules from sys.modules
self._log_telemetry_imported_packages()

def __del__(self): # pragma: no cover
try:
self.close(retry=False)
Expand Down Expand Up @@ -953,6 +965,13 @@ def __config(self, **kwargs):
)
self._use_openssl_only = os.environ["SF_USE_OPENSSL_ONLY"] == "True"

if self._log_imported_packages_in_telemetry is None:
import snowflake.connector

self._log_imported_packages_in_telemetry = (
snowflake.connector.log_imported_packages_in_telemetry
)

def cmd_query(
self,
sql: str,
Expand Down Expand Up @@ -1541,3 +1560,28 @@ def _all_async_queries_finished(self) -> bool:
not self.is_still_running(self.get_query_status(q)) for q in queries
)
return all(finished_async_queries)

def _log_telemetry_imported_packages(self) -> None:
if self._log_imported_packages_in_telemetry:
# filter out duplicates caused by submodules
# and internal modules with names starting with an underscore
imported_modules = {
k.split(".", maxsplit=1)[0]
sfc-gh-jdu marked this conversation as resolved.
Show resolved Hide resolved
for k in sys.modules.keys()
if not k.startswith("_")
}
ts = get_time_millis()
self._log_telemetry(
TelemetryData(
generate_telemetry_data(
from_dict={
TelemetryField.KEY_TYPE.value: TelemetryField.IMPORTED_PACKAGES.value,
TelemetryField.KEY_SOURCE.value: self.application
if self.application
else CLIENT_NAME,
sfc-gh-jdu marked this conversation as resolved.
Show resolved Hide resolved
TelemetryField.KEY_VALUE.value: str(imported_modules),
}
),
ts,
)
)
2 changes: 2 additions & 0 deletions src/snowflake/connector/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class TelemetryField(Enum):
# fetch_arrow_* usage
ARROW_FETCH_ALL = "client_fetch_arrow_all"
ARROW_FETCH_BATCHES = "client_fetch_arrow_batches"
# imported packages along with client
IMPORTED_PACKAGES = "client_imported_packages"
# Keys for telemetry data sent through either in-band or out-of-band telemetry
KEY_TYPE = "type"
KEY_SOURCE = "source"
Expand Down
71 changes: 71 additions & 0 deletions test/integ/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from snowflake.connector.errors import Error, ForbiddenError
from snowflake.connector.network import APPLICATION_SNOWSQL, ReauthenticationRequest
from snowflake.connector.sqlstate import SQLSTATE_FEATURE_NOT_SUPPORTED
from snowflake.connector.telemetry import TelemetryField

try: # pragma: no cover
from parameters import CONNECTION_PARAMETERS_ADMIN
Expand Down Expand Up @@ -1107,3 +1108,73 @@ def test_ocsp_cache_working(conn_cnx):
with conn_cnx() as cnx:
assert cnx
assert OCSP_CACHE.telemetry["hit"] + OCSP_CACHE.telemetry["miss"] > original_count


@pytest.mark.skipolddriver
def test_imported_packages_telemetry(conn_cnx, capture_sf_telemetry, db_parameters):
try:
# these imports are not used but for testing
import html.parser # noqa: F401
import json # noqa: F401
import multiprocessing as mp # noqa: F401
from datetime import date # noqa: F401
from math import sqrt # noqa: F401

with conn_cnx() as conn, capture_sf_telemetry.patch_connection(
conn, False
) as telemetry_test:
conn._log_telemetry_imported_packages()
assert len(telemetry_test.records) > 0
for t in telemetry_test.records:
if (
t.message[TelemetryField.KEY_TYPE.value]
== TelemetryField.IMPORTED_PACKAGES.value
):
assert "pytest" in t.message["value"]
assert "unittest" in t.message["value"]
assert "json" in t.message["value"]
assert "multiprocessing" in t.message["value"]
assert "html" in t.message["value"]
assert "datetime" in t.message["value"]
assert "math" in t.message["value"]
assert "__main__" not in t.message["value"]
assert CLIENT_NAME == t.message[TelemetryField.KEY_SOURCE.value]

# test different application
new_application_name = "PythonSnowpark"
config = {
"user": db_parameters["user"],
"password": db_parameters["password"],
"host": db_parameters["host"],
"port": db_parameters["port"],
"account": db_parameters["account"],
"schema": db_parameters["schema"],
"database": db_parameters["database"],
"protocol": db_parameters["protocol"],
"timezone": "UTC",
"application": new_application_name,
}
with snowflake.connector.connect(
**config
) as conn, capture_sf_telemetry.patch_connection(conn, False) as telemetry_test:
conn._log_telemetry_imported_packages()
assert len(telemetry_test.records) > 0
for t in telemetry_test.records:
if (
t.message[TelemetryField.KEY_TYPE.value]
== TelemetryField.IMPORTED_PACKAGES.value
):
assert (
new_application_name
== t.message[TelemetryField.KEY_SOURCE.value]
)
sfc-gh-mkeller marked this conversation as resolved.
Show resolved Hide resolved

# test opt out
snowflake.connector.log_imported_packages_in_telemetry = False
with conn_cnx() as conn, capture_sf_telemetry.patch_connection(
conn, False
) as telemetry_test:
conn._log_telemetry_imported_packages()
assert len(telemetry_test.records) == 0
finally:
snowflake.connector.log_imported_packages_in_telemetry = True