Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added callback function for getting profilers #393

Merged
merged 10 commits into from
Jan 17, 2022
11 changes: 7 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

### Bug Fixes
1. [#375](https://github.com/influxdata/influxdb-client-python/pull/375): Construct `InfluxDBError` without HTTP response
1. [#378](https://github.com/influxdata/influxdb-client-python/pull/378): Correct serialization DataFrame with nan values [DataFrame]
1. [#384](https://github.com/influxdata/influxdb-client-python/pull/384): Timeout can be specified as a `float`
1. [#380](https://github.com/influxdata/influxdb-client-python/pull/380): Correct data types for querying [DataFrame]
1. [#391](https://github.com/influxdata/influxdb-client-python/pull/391): Ping function uses debug for log
2. [#378](https://github.com/influxdata/influxdb-client-python/pull/378): Correct serialization DataFrame with nan values [DataFrame]
3. [#384](https://github.com/influxdata/influxdb-client-python/pull/384): Timeout can be specified as a `float`
4. [#380](https://github.com/influxdata/influxdb-client-python/pull/380): Correct data types for querying [DataFrame]
5. [#391](https://github.com/influxdata/influxdb-client-python/pull/391): Ping function uses debug for log
michaelahojna marked this conversation as resolved.
Show resolved Hide resolved

### Features
1. [#393](https://github.com/influxdata/influxdb-client-python/pull/393): Added callback function for getting profilers with example and test
michaelahojna marked this conversation as resolved.
Show resolved Hide resolved

### CI
1. [#370](https://github.com/influxdata/influxdb-client-python/pull/370): Add Python 3.10 to CI builds
Expand Down
42 changes: 42 additions & 0 deletions examples/query_with_profilers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.flux_table import FluxRecord
michaelahojna marked this conversation as resolved.
Show resolved Hide resolved
from influxdb_client.client.query_api import QueryOptions
from influxdb_client.client.write_api import SYNCHRONOUS

with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", debug=True) as client:

"""
Define callback to process profiler results.
"""
class ProfilersCallback(object):
def __init__(self):
self.records = []

def __call__(self, flux_record):
self.records.append(flux_record.values)


callback = ProfilersCallback()

write_api = client.write_api(write_options=SYNCHRONOUS)

"""
Prepare data
"""
_point1 = Point("my_measurement").tag("location", "Prague").field("temperature", 25.3)
_point2 = Point("my_measurement").tag("location", "New York").field("temperature", 24.3)
write_api.write(bucket="my-bucket", record=[_point1, _point2])

"""
Pass callback to QueryOptions
"""
query_api = client.query_api(
query_options=QueryOptions(profilers=["query", "operator"], profiler_callback=callback))

"""
Perform query
"""
tables = query_api.query('from(bucket:"my-bucket") |> range(start: -10m)')

for profiler in callback.records:
print(f'Custom processing of profiler result: {profiler}')
michaelahojna marked this conversation as resolved.
Show resolved Hide resolved
31 changes: 17 additions & 14 deletions influxdb_client/client/flux_csv_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,15 @@ class FluxCsvParser(object):
"""Parse to processing response from InfluxDB to FluxStructures or DataFrame."""

def __init__(self, response: HTTPResponse, serialization_mode: FluxSerializationMode,
data_frame_index: List[str] = None, profilers: List[str] = None) -> None:
data_frame_index: List[str] = None, query_options: object = None) -> None:
michaelahojna marked this conversation as resolved.
Show resolved Hide resolved
"""Initialize defaults."""
self._response = response
self.tables = []
self._serialization_mode = serialization_mode
self._data_frame_index = data_frame_index
self._data_frame_values = []
self._profilers = profilers
self._profilers = query_options.profilers if query_options is not None else None
self._profiler_callback = query_options.profiler_callback if query_options is not None else None
pass

def __enter__(self):
Expand Down Expand Up @@ -289,16 +290,18 @@ def table_list(self) -> List[FluxTable]:
else:
return list(filter(lambda table: not self._is_profiler_table(table), self.tables))

@staticmethod
def _print_profiler_info(flux_record: FluxRecord):
def _print_profiler_info(self, flux_record: FluxRecord):
if flux_record.get_measurement().startswith("profiler/"):
msg = "Profiler: " + flux_record.get_measurement()
print("\n" + len(msg) * "=")
print(msg)
print(len(msg) * "=")
for name in flux_record.values:
val = flux_record[name]
if isinstance(val, str) and len(val) > 50:
print(f"{name:<20}: \n\n{val}")
elif val is not None:
print(f"{name:<20}: {val:<20}")
if self._profiler_callback:
self._profiler_callback(flux_record)
else:
msg = "Profiler: " + flux_record.get_measurement()
print("\n" + len(msg) * "=")
print(msg)
print(len(msg) * "=")
for name in flux_record.values:
val = flux_record[name]
if isinstance(val, str) and len(val) > 50:
print(f"{name:<20}: \n\n{val}")
elif val is not None:
print(f"{name:<20}: {val:<20}")
22 changes: 13 additions & 9 deletions influxdb_client/client/query_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,21 @@
from influxdb_client.client.flux_table import FluxTable, FluxRecord
from influxdb_client.client.util.date_utils import get_date_helper
from influxdb_client.client.util.helpers import get_org_query_param
from typing import Callable
michaelahojna marked this conversation as resolved.
Show resolved Hide resolved


class QueryOptions(object):
"""Query options."""

def __init__(self, profilers: List[str] = None) -> None:
def __init__(self, profilers: List[str] = None, profiler_callback: Callable = None) -> None:
"""
Initialize query options.

:param profilers: list of enabled flux profilers
:param profiler_callback: callback function return profilers (FluxRecord)
"""
self.profilers = profilers
self.profiler_callback = profiler_callback


class QueryApi(object):
Expand Down Expand Up @@ -101,7 +104,7 @@ def query(self, query: str, org=None, params: dict = None) -> List['FluxTable']:
async_req=False, _preload_content=False, _return_http_data_only=False)

_parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.tables,
profilers=self._profilers())
query_options=self._get_query_options())

list(_parser.generator())

Expand All @@ -123,7 +126,7 @@ def query_stream(self, query: str, org=None, params: dict = None) -> Generator['
response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect, params),
async_req=False, _preload_content=False, _return_http_data_only=False)
_parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.stream,
profilers=self._profilers())
query_options=self._get_query_options())

return _parser.generator()

Expand Down Expand Up @@ -176,17 +179,18 @@ def query_data_frame_stream(self, query: str, org=None, data_frame_index: List[s

_parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.dataFrame,
data_frame_index=data_frame_index,
profilers=self._profilers())
query_options=self._get_query_options())
return _parser.generator()

def _profilers(self):
def _get_query_options(self):
if self._query_options and self._query_options.profilers:
return self._query_options.profilers
else:
return self._influxdb_client.profilers
return self._query_options
elif self._influxdb_client.profilers:
return QueryOptions(profilers=self._influxdb_client.profilers)

def _create_query(self, query, dialect=default_dialect, params: dict = None):
profilers = self._profilers()
query_options = self._get_query_options()
profilers = query_options.profilers if query_options is not None else None
q = Query(query=query, dialect=dialect, extern=QueryApi._build_flux_ast(params, profilers))

if profilers:
Expand Down
21 changes: 21 additions & 0 deletions tests/test_QueryApi.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,27 @@ def test_query_profiler_present(self):
print(f"Profiler record: {flux_record}")
self.assertTrue(found_profiler_records)

def test_profilers_callback(self):

class ProfilersCallback(object):
def __init__(self):
self.records = []

def __call__(self, flux_record):
self.records.append(flux_record.values)

def get_record(self, num, val):
return (self.records[num])[val]

callback = ProfilersCallback()

query_api = self.client.query_api(query_options=QueryOptions(profilers=["query", "operator"],
profiler_callback=callback))
query_api.query('from(bucket:"my-bucket") |> range(start: -10m)')

self.assertEqual("profiler/query", callback.get_record(0, "_measurement"))
self.assertEqual("profiler/operator", callback.get_record(1, "_measurement"))

def test_profiler_ast(self):

expect = {
Expand Down