Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added callback function for getting profilers #393

Merged
merged 10 commits into from
Jan 17, 2022
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
1. [#378](https://github.com/influxdata/influxdb-client-python/pull/378): Correct serialization DataFrame with nan values [DataFrame]
1. [#384](https://github.com/influxdata/influxdb-client-python/pull/384): Timeout can be specified as a `float`
1. [#380](https://github.com/influxdata/influxdb-client-python/pull/380): Correct data types for querying [DataFrame]
1. [#391](https://github.com/influxdata/influxdb-client-python/pull/391): Ping function uses debug for log
1. [#391](https://github.com/influxdata/influxdb-client-python/pull/391): Ping function uses debug for log

### Features
1. [#393](https://github.com/influxdata/influxdb-client-python/pull/393): Added callback function for getting profilers output with example and test

### CI
1. [#370](https://github.com/influxdata/influxdb-client-python/pull/370): Add Python 3.10 to CI builds
Expand Down
30 changes: 30 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,36 @@ Example of a profiler output:
DurationSum : 940500
MeanDuration : 940500.0

You can also use callback function to get profilers output.
Return value of this callback is type of FluxRecord.

Example how to use profilers with callback:

.. code-block:: python

class ProfilersCallback(object):
def __init__(self):
self.records = []

def __call__(self, flux_record):
self.records.append(flux_record.values)

callback = ProfilersCallback()

query_api = client.query_api(query_options=QueryOptions(profilers=["query", "operator"], profiler_callback=callback))
tables = query_api.query('from(bucket:"my-bucket") |> range(start: -10m)')

for profiler in callback.records:
print(f'Custom processing of profiler result: {profiler}')

Example output of this callback:

.. code-block::

Custom processing of profiler result: {'result': '_profiler', 'table': 0, '_measurement': 'profiler/query', 'TotalDuration': 18843792, 'CompileDuration': 1078666, 'QueueDuration': 93375, 'PlanDuration': 0, 'RequeueDuration': 0, 'ExecuteDuration': 17371000, 'Concurrency': 0, 'MaxAllocated': 448, 'TotalAllocated': 0, 'RuntimeErrors': None, 'flux/query-plan': 'digraph {\r\n ReadRange2\r\n generated_yield\r\n\r\n ReadRange2 -> generated_yield\r\n}\r\n\r\n', 'influxdb/scanned-bytes': 0, 'influxdb/scanned-values': 0}
Custom processing of profiler result: {'result': '_profiler', 'table': 1, '_measurement': 'profiler/operator', 'Type': '*influxdb.readFilterSource', 'Label': 'ReadRange2', 'Count': 1, 'MinDuration': 3274084, 'MaxDuration': 3274084, 'DurationSum': 3274084, 'MeanDuration': 3274084.0}


.. marker-index-end


Expand Down
1 change: 1 addition & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
- [query.py](query.py) - How to query data into `FluxTable`s, `Stream` and `CSV`
- [query_from_file.py](query_from_file.py) - How to use a Flux query defined in a separate file
- [query_response_to_json.py](query_response_to_json.py) - How to serialize Query response to JSON
- [query_with_profilers.py](query_with_profilers.py) - How to process profilers output by callback


## Management API
Expand Down
41 changes: 41 additions & 0 deletions examples/query_with_profilers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.query_api import QueryOptions
from influxdb_client.client.write_api import SYNCHRONOUS

with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", debug=True) as client:

"""
Define callback to process profiler results.
"""
class ProfilersCallback(object):
def __init__(self):
self.records = []

def __call__(self, flux_record):
self.records.append(flux_record.values)


callback = ProfilersCallback()

write_api = client.write_api(write_options=SYNCHRONOUS)

"""
Prepare data
"""
_point1 = Point("my_measurement").tag("location", "Prague").field("temperature", 25.3)
_point2 = Point("my_measurement").tag("location", "New York").field("temperature", 24.3)
write_api.write(bucket="my-bucket", record=[_point1, _point2])

"""
Pass callback to QueryOptions
"""
query_api = client.query_api(
query_options=QueryOptions(profilers=["query", "operator"], profiler_callback=callback))

"""
Perform query
"""
tables = query_api.query('from(bucket:"my-bucket") |> range(start: -10m)')

for profiler in callback.records:
print(f'Custom processing of profiler result: {profiler}')
31 changes: 17 additions & 14 deletions influxdb_client/client/flux_csv_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,15 @@ class FluxCsvParser(object):
"""Parse to processing response from InfluxDB to FluxStructures or DataFrame."""

def __init__(self, response: HTTPResponse, serialization_mode: FluxSerializationMode,
data_frame_index: List[str] = None, profilers: List[str] = None) -> None:
data_frame_index: List[str] = None, query_options=None) -> None:
"""Initialize defaults."""
self._response = response
self.tables = []
self._serialization_mode = serialization_mode
self._data_frame_index = data_frame_index
self._data_frame_values = []
self._profilers = profilers
self._profilers = query_options.profilers if query_options is not None else None
self._profiler_callback = query_options.profiler_callback if query_options is not None else None
pass

def __enter__(self):
Expand Down Expand Up @@ -289,16 +290,18 @@ def table_list(self) -> List[FluxTable]:
else:
return list(filter(lambda table: not self._is_profiler_table(table), self.tables))

@staticmethod
def _print_profiler_info(flux_record: FluxRecord):
def _print_profiler_info(self, flux_record: FluxRecord):
if flux_record.get_measurement().startswith("profiler/"):
msg = "Profiler: " + flux_record.get_measurement()
print("\n" + len(msg) * "=")
print(msg)
print(len(msg) * "=")
for name in flux_record.values:
val = flux_record[name]
if isinstance(val, str) and len(val) > 50:
print(f"{name:<20}: \n\n{val}")
elif val is not None:
print(f"{name:<20}: {val:<20}")
if self._profiler_callback:
self._profiler_callback(flux_record)
else:
msg = "Profiler: " + flux_record.get_measurement()
print("\n" + len(msg) * "=")
print(msg)
print(len(msg) * "=")
for name in flux_record.values:
val = flux_record[name]
if isinstance(val, str) and len(val) > 50:
print(f"{name:<20}: \n\n{val}")
elif val is not None:
print(f"{name:<20}: {val:<20}")
23 changes: 13 additions & 10 deletions influxdb_client/client/query_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import codecs
import csv
from datetime import datetime, timedelta
from typing import List, Generator, Any, Union, Iterable
from typing import List, Generator, Any, Union, Iterable, Callable

from influxdb_client import Dialect, IntegerLiteral, BooleanLiteral, FloatLiteral, DateTimeLiteral, StringLiteral, \
VariableAssignment, Identifier, OptionStatement, File, DurationLiteral, Duration, UnaryExpression, Expression, \
Expand All @@ -22,13 +22,15 @@
class QueryOptions(object):
"""Query options."""

def __init__(self, profilers: List[str] = None) -> None:
def __init__(self, profilers: List[str] = None, profiler_callback: Callable = None) -> None:
"""
Initialize query options.

:param profilers: list of enabled flux profilers
:param profiler_callback: callback function return profilers (FluxRecord)
"""
self.profilers = profilers
self.profiler_callback = profiler_callback


class QueryApi(object):
Expand Down Expand Up @@ -101,7 +103,7 @@ def query(self, query: str, org=None, params: dict = None) -> List['FluxTable']:
async_req=False, _preload_content=False, _return_http_data_only=False)

_parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.tables,
profilers=self._profilers())
query_options=self._get_query_options())

list(_parser.generator())

Expand All @@ -123,7 +125,7 @@ def query_stream(self, query: str, org=None, params: dict = None) -> Generator['
response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect, params),
async_req=False, _preload_content=False, _return_http_data_only=False)
_parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.stream,
profilers=self._profilers())
query_options=self._get_query_options())

return _parser.generator()

Expand Down Expand Up @@ -176,17 +178,18 @@ def query_data_frame_stream(self, query: str, org=None, data_frame_index: List[s

_parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.dataFrame,
data_frame_index=data_frame_index,
profilers=self._profilers())
query_options=self._get_query_options())
return _parser.generator()

def _profilers(self):
def _get_query_options(self):
if self._query_options and self._query_options.profilers:
return self._query_options.profilers
else:
return self._influxdb_client.profilers
return self._query_options
elif self._influxdb_client.profilers:
return QueryOptions(profilers=self._influxdb_client.profilers)

def _create_query(self, query, dialect=default_dialect, params: dict = None):
profilers = self._profilers()
query_options = self._get_query_options()
profilers = query_options.profilers if query_options is not None else None
q = Query(query=query, dialect=dialect, extern=QueryApi._build_flux_ast(params, profilers))

if profilers:
Expand Down
21 changes: 21 additions & 0 deletions tests/test_QueryApi.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,27 @@ def test_query_profiler_present(self):
print(f"Profiler record: {flux_record}")
self.assertTrue(found_profiler_records)

def test_profilers_callback(self):

class ProfilersCallback(object):
def __init__(self):
self.records = []

def __call__(self, flux_record):
self.records.append(flux_record.values)

def get_record(self, num, val):
return (self.records[num])[val]

callback = ProfilersCallback()

query_api = self.client.query_api(query_options=QueryOptions(profilers=["query", "operator"],
profiler_callback=callback))
query_api.query('from(bucket:"my-bucket") |> range(start: -10m)')

self.assertEqual("profiler/query", callback.get_record(0, "_measurement"))
self.assertEqual("profiler/operator", callback.get_record(1, "_measurement"))

def test_profiler_ast(self):

expect = {
Expand Down