diff --git a/CHANGELOG.md b/CHANGELOG.md index b52c9324..152b3e6d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,10 @@ 1. [#378](https://github.com/influxdata/influxdb-client-python/pull/378): Correct serialization DataFrame with nan values [DataFrame] 1. [#384](https://github.com/influxdata/influxdb-client-python/pull/384): Timeout can be specified as a `float` 1. [#380](https://github.com/influxdata/influxdb-client-python/pull/380): Correct data types for querying [DataFrame] -1. [#391](https://github.com/influxdata/influxdb-client-python/pull/391): Ping function uses debug for log +1. [#391](https://github.com/influxdata/influxdb-client-python/pull/391): Ping function uses debug for log + +### Features +1. [#393](https://github.com/influxdata/influxdb-client-python/pull/393): Added callback function for getting profilers output with example and test ### CI 1. [#370](https://github.com/influxdata/influxdb-client-python/pull/370): Add Python 3.10 to CI builds diff --git a/README.rst b/README.rst index 76a0da0f..738f99e5 100644 --- a/README.rst +++ b/README.rst @@ -337,6 +337,36 @@ Example of a profiler output: DurationSum : 940500 MeanDuration : 940500.0 +You can also use callback function to get profilers output. +Return value of this callback is type of FluxRecord. + +Example how to use profilers with callback: + +.. code-block:: python + + class ProfilersCallback(object): + def __init__(self): + self.records = [] + + def __call__(self, flux_record): + self.records.append(flux_record.values) + + callback = ProfilersCallback() + + query_api = client.query_api(query_options=QueryOptions(profilers=["query", "operator"], profiler_callback=callback)) + tables = query_api.query('from(bucket:"my-bucket") |> range(start: -10m)') + + for profiler in callback.records: + print(f'Custom processing of profiler result: {profiler}') + +Example output of this callback: + +.. code-block:: + + Custom processing of profiler result: {'result': '_profiler', 'table': 0, '_measurement': 'profiler/query', 'TotalDuration': 18843792, 'CompileDuration': 1078666, 'QueueDuration': 93375, 'PlanDuration': 0, 'RequeueDuration': 0, 'ExecuteDuration': 17371000, 'Concurrency': 0, 'MaxAllocated': 448, 'TotalAllocated': 0, 'RuntimeErrors': None, 'flux/query-plan': 'digraph {\r\n ReadRange2\r\n generated_yield\r\n\r\n ReadRange2 -> generated_yield\r\n}\r\n\r\n', 'influxdb/scanned-bytes': 0, 'influxdb/scanned-values': 0} + Custom processing of profiler result: {'result': '_profiler', 'table': 1, '_measurement': 'profiler/operator', 'Type': '*influxdb.readFilterSource', 'Label': 'ReadRange2', 'Count': 1, 'MinDuration': 3274084, 'MaxDuration': 3274084, 'DurationSum': 3274084, 'MeanDuration': 3274084.0} + + .. marker-index-end diff --git a/examples/README.md b/examples/README.md index 53b8370f..9892a837 100644 --- a/examples/README.md +++ b/examples/README.md @@ -14,6 +14,7 @@ - [query.py](query.py) - How to query data into `FluxTable`s, `Stream` and `CSV` - [query_from_file.py](query_from_file.py) - How to use a Flux query defined in a separate file - [query_response_to_json.py](query_response_to_json.py) - How to serialize Query response to JSON +- [query_with_profilers.py](query_with_profilers.py) - How to process profilers output by callback ## Management API diff --git a/examples/query_with_profilers.py b/examples/query_with_profilers.py new file mode 100644 index 00000000..5275367b --- /dev/null +++ b/examples/query_with_profilers.py @@ -0,0 +1,41 @@ +from influxdb_client import InfluxDBClient, Point +from influxdb_client.client.query_api import QueryOptions +from influxdb_client.client.write_api import SYNCHRONOUS + +with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", debug=True) as client: + + """ + Define callback to process profiler results. + """ + class ProfilersCallback(object): + def __init__(self): + self.records = [] + + def __call__(self, flux_record): + self.records.append(flux_record.values) + + + callback = ProfilersCallback() + + write_api = client.write_api(write_options=SYNCHRONOUS) + + """ + Prepare data + """ + _point1 = Point("my_measurement").tag("location", "Prague").field("temperature", 25.3) + _point2 = Point("my_measurement").tag("location", "New York").field("temperature", 24.3) + write_api.write(bucket="my-bucket", record=[_point1, _point2]) + + """ + Pass callback to QueryOptions + """ + query_api = client.query_api( + query_options=QueryOptions(profilers=["query", "operator"], profiler_callback=callback)) + + """ + Perform query + """ + tables = query_api.query('from(bucket:"my-bucket") |> range(start: -10m)') + + for profiler in callback.records: + print(f'Custom processing of profiler result: {profiler}') diff --git a/influxdb_client/client/flux_csv_parser.py b/influxdb_client/client/flux_csv_parser.py index a973bef5..67e08801 100644 --- a/influxdb_client/client/flux_csv_parser.py +++ b/influxdb_client/client/flux_csv_parser.py @@ -45,14 +45,15 @@ class FluxCsvParser(object): """Parse to processing response from InfluxDB to FluxStructures or DataFrame.""" def __init__(self, response: HTTPResponse, serialization_mode: FluxSerializationMode, - data_frame_index: List[str] = None, profilers: List[str] = None) -> None: + data_frame_index: List[str] = None, query_options=None) -> None: """Initialize defaults.""" self._response = response self.tables = [] self._serialization_mode = serialization_mode self._data_frame_index = data_frame_index self._data_frame_values = [] - self._profilers = profilers + self._profilers = query_options.profilers if query_options is not None else None + self._profiler_callback = query_options.profiler_callback if query_options is not None else None pass def __enter__(self): @@ -289,16 +290,18 @@ def table_list(self) -> List[FluxTable]: else: return list(filter(lambda table: not self._is_profiler_table(table), self.tables)) - @staticmethod - def _print_profiler_info(flux_record: FluxRecord): + def _print_profiler_info(self, flux_record: FluxRecord): if flux_record.get_measurement().startswith("profiler/"): - msg = "Profiler: " + flux_record.get_measurement() - print("\n" + len(msg) * "=") - print(msg) - print(len(msg) * "=") - for name in flux_record.values: - val = flux_record[name] - if isinstance(val, str) and len(val) > 50: - print(f"{name:<20}: \n\n{val}") - elif val is not None: - print(f"{name:<20}: {val:<20}") + if self._profiler_callback: + self._profiler_callback(flux_record) + else: + msg = "Profiler: " + flux_record.get_measurement() + print("\n" + len(msg) * "=") + print(msg) + print(len(msg) * "=") + for name in flux_record.values: + val = flux_record[name] + if isinstance(val, str) and len(val) > 50: + print(f"{name:<20}: \n\n{val}") + elif val is not None: + print(f"{name:<20}: {val:<20}") diff --git a/influxdb_client/client/query_api.py b/influxdb_client/client/query_api.py index 665651eb..c4e65ce4 100644 --- a/influxdb_client/client/query_api.py +++ b/influxdb_client/client/query_api.py @@ -7,7 +7,7 @@ import codecs import csv from datetime import datetime, timedelta -from typing import List, Generator, Any, Union, Iterable +from typing import List, Generator, Any, Union, Iterable, Callable from influxdb_client import Dialect, IntegerLiteral, BooleanLiteral, FloatLiteral, DateTimeLiteral, StringLiteral, \ VariableAssignment, Identifier, OptionStatement, File, DurationLiteral, Duration, UnaryExpression, Expression, \ @@ -22,13 +22,15 @@ class QueryOptions(object): """Query options.""" - def __init__(self, profilers: List[str] = None) -> None: + def __init__(self, profilers: List[str] = None, profiler_callback: Callable = None) -> None: """ Initialize query options. :param profilers: list of enabled flux profilers + :param profiler_callback: callback function return profilers (FluxRecord) """ self.profilers = profilers + self.profiler_callback = profiler_callback class QueryApi(object): @@ -101,7 +103,7 @@ def query(self, query: str, org=None, params: dict = None) -> List['FluxTable']: async_req=False, _preload_content=False, _return_http_data_only=False) _parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.tables, - profilers=self._profilers()) + query_options=self._get_query_options()) list(_parser.generator()) @@ -123,7 +125,7 @@ def query_stream(self, query: str, org=None, params: dict = None) -> Generator[' response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect, params), async_req=False, _preload_content=False, _return_http_data_only=False) _parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.stream, - profilers=self._profilers()) + query_options=self._get_query_options()) return _parser.generator() @@ -176,17 +178,18 @@ def query_data_frame_stream(self, query: str, org=None, data_frame_index: List[s _parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.dataFrame, data_frame_index=data_frame_index, - profilers=self._profilers()) + query_options=self._get_query_options()) return _parser.generator() - def _profilers(self): + def _get_query_options(self): if self._query_options and self._query_options.profilers: - return self._query_options.profilers - else: - return self._influxdb_client.profilers + return self._query_options + elif self._influxdb_client.profilers: + return QueryOptions(profilers=self._influxdb_client.profilers) def _create_query(self, query, dialect=default_dialect, params: dict = None): - profilers = self._profilers() + query_options = self._get_query_options() + profilers = query_options.profilers if query_options is not None else None q = Query(query=query, dialect=dialect, extern=QueryApi._build_flux_ast(params, profilers)) if profilers: diff --git a/tests/test_QueryApi.py b/tests/test_QueryApi.py index 29375203..bda2b068 100644 --- a/tests/test_QueryApi.py +++ b/tests/test_QueryApi.py @@ -376,6 +376,27 @@ def test_query_profiler_present(self): print(f"Profiler record: {flux_record}") self.assertTrue(found_profiler_records) + def test_profilers_callback(self): + + class ProfilersCallback(object): + def __init__(self): + self.records = [] + + def __call__(self, flux_record): + self.records.append(flux_record.values) + + def get_record(self, num, val): + return (self.records[num])[val] + + callback = ProfilersCallback() + + query_api = self.client.query_api(query_options=QueryOptions(profilers=["query", "operator"], + profiler_callback=callback)) + query_api.query('from(bucket:"my-bucket") |> range(start: -10m)') + + self.assertEqual("profiler/query", callback.get_record(0, "_measurement")) + self.assertEqual("profiler/operator", callback.get_record(1, "_measurement")) + def test_profiler_ast(self): expect = {