diff --git a/influxdb_client/client/flux_csv_parser.py b/influxdb_client/client/flux_csv_parser.py index 530115ab..5b4d55d7 100644 --- a/influxdb_client/client/flux_csv_parser.py +++ b/influxdb_client/client/flux_csv_parser.py @@ -1,8 +1,10 @@ import base64 import codecs import csv as csv_parser +from enum import Enum import ciso8601 +from pandas import DataFrame from urllib3 import HTTPResponse from influxdb_client.client.flux_table import FluxTable, FluxColumn, FluxRecord @@ -18,12 +20,18 @@ class FluxCsvParserException(Exception): pass +class FluxSerializationMode(Enum): + tables = 1 + stream = 2 + dataFrame = 3 + + class FluxCsvParser(object): - def __init__(self, response: HTTPResponse, stream: bool) -> None: + def __init__(self, response: HTTPResponse, serialization_mode: FluxSerializationMode) -> None: self._response = response self.tables = [] - self._stream = stream + self._serialization_mode = serialization_mode pass def __enter__(self): @@ -64,6 +72,11 @@ def _parse_flux_response(self): token = csv[0] # start new table if "#datatype" == token: + + # Return already parsed DataFrame + if (self._serialization_mode is FluxSerializationMode.dataFrame) & hasattr(self, '_dataFrame'): + yield self._dataFrame + start_new_table = True table = FluxTable() self._insert_table(table, table_index) @@ -86,6 +99,12 @@ def _parse_flux_response(self): if start_new_table: self.add_column_names_and_tags(table, csv) start_new_table = False + # Create DataFrame with default values + if self._serialization_mode is FluxSerializationMode.dataFrame: + self._dataFrame = DataFrame(data=[], columns=[], index=None) + for column in table.columns: + self._dataFrame[column.label] = column.default_value + pass continue # to int converions todo @@ -101,14 +120,23 @@ def _parse_flux_response(self): flux_record = self.parse_record(table_index - 1, table, csv) - if not self._stream: + if self._serialization_mode is FluxSerializationMode.tables: self.tables[table_index - 1].records.append(flux_record) - yield flux_record + if self._serialization_mode is FluxSerializationMode.stream: + yield flux_record + + if self._serialization_mode is FluxSerializationMode.dataFrame: + self._dataFrame.loc[len(self._dataFrame.index)] = flux_record.values + pass # debug # print(flux_record) + # Return latest DataFrame + if (self._serialization_mode is FluxSerializationMode.dataFrame) & hasattr(self, '_dataFrame'): + yield self._dataFrame + def parse_record(self, table_index, table, csv): record = FluxRecord(table_index) @@ -180,5 +208,5 @@ def add_column_names_and_tags(table, csv): i += 1 def _insert_table(self, table, table_index): - if not self._stream: + if self._serialization_mode is FluxSerializationMode.tables: self.tables.insert(table_index, table) diff --git a/influxdb_client/client/query_api.py b/influxdb_client/client/query_api.py index 63a36dd3..de0ce83e 100644 --- a/influxdb_client/client/query_api.py +++ b/influxdb_client/client/query_api.py @@ -2,11 +2,9 @@ import csv from typing import List, Generator, Any -from pandas import DataFrame - from influxdb_client import Dialect from influxdb_client import Query, QueryService -from influxdb_client.client.flux_csv_parser import FluxCsvParser +from influxdb_client.client.flux_csv_parser import FluxCsvParser, FluxSerializationMode from influxdb_client.client.flux_table import FluxTable, FluxRecord @@ -70,7 +68,7 @@ def query(self, query: str, org=None) -> List['FluxTable']: response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect), async_req=False, _preload_content=False, _return_http_data_only=False) - _parser = FluxCsvParser(response=response, stream=False) + _parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.tables) list(_parser.generator()) @@ -90,13 +88,14 @@ def query_stream(self, query: str, org=None) -> Generator['FluxRecord', Any, Non response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect), async_req=False, _preload_content=False, _return_http_data_only=False) - _parser = FluxCsvParser(response=response, stream=True) + _parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.stream) return _parser.generator() def query_data_frame(self, query: str, org=None): """ - Synchronously executes the Flux query and return Pandas DataFrame + Synchronously executes the Flux query and return Pandas DataFrame. + Note that if a query returns more then one table than the client generates a dataframe for each of them. :param query: the Flux query :param org: organization name (optional if already specified in InfluxDBClient) @@ -105,23 +104,16 @@ def query_data_frame(self, query: str, org=None): if org is None: org = self._influxdb_client.org - flux_tables = self.query(query=query, org=org) - - if len(flux_tables) == 0: - return DataFrame + response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect), + async_req=False, _preload_content=False, _return_http_data_only=False) - if len(flux_tables) > 1: - raise Exception("Flux query result must contain one table.") + _parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.dataFrame) + _dataFrames = list(_parser.generator()) - table = flux_tables[0] - data = [] - column_names = list(map(lambda c: c.label, table.columns)) - for record in table: - row = [] - for column_name in column_names: - row.append(record[column_name]) - data.append(row) - return DataFrame(data=data, columns=column_names, index=None) + if len(_dataFrames) == 1: + return _dataFrames[0] + else: + return _dataFrames # private helper for c @staticmethod diff --git a/tests/test_QueryApiDataFrame.py b/tests/test_QueryApiDataFrame.py new file mode 100644 index 00000000..85e3f329 --- /dev/null +++ b/tests/test_QueryApiDataFrame.py @@ -0,0 +1,163 @@ +import httpretty +from pandas import DataFrame +from pandas._libs.tslibs.timestamps import Timestamp + +from influxdb_client import InfluxDBClient +from tests.base_test import BaseTest + + +class QueryDataFrameApi(BaseTest): + + def setUp(self) -> None: + super(QueryDataFrameApi, self).setUp() + # https://github.com/gabrielfalcao/HTTPretty/issues/368 + import warnings + warnings.filterwarnings("ignore", category=ResourceWarning, message="unclosed.*") + warnings.filterwarnings("ignore", category=PendingDeprecationWarning, message="isAlive*") + + httpretty.enable() + httpretty.reset() + + def tearDown(self) -> None: + self.client.__del__() + httpretty.disable() + + def test_one_table(self): + query_response = \ + '#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string\n' \ + '#group,false,false,true,true,false,false,true,true,true\n' \ + '#default,_result,,,,,,,,\n' \ + ',result,table,_start,_stop,_time,_value,_field,_measurement,host\n' \ + ',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:05Z,11125907456,used,mem,mac.local\n' \ + ',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:06Z,11127103488,used,mem,mac.local\n' \ + ',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:07Z,11127291904,used,mem,mac.local\n' \ + ',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:08Z,11126190080,used,mem,mac.local\n' \ + ',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:09Z,11127832576,used,mem,mac.local\n' \ + '\n\n' + + httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/query", status=200, body=query_response) + + self.client = InfluxDBClient("http://localhost", "my-token", org="my-org", enable_gzip=False) + + _dataFrame = self.client.query_api().query_data_frame( + 'from(bucket: "my-bucket") ' + '|> range(start: -5s, stop: now()) ' + '|> filter(fn: (r) => r._measurement == "mem") ' + '|> filter(fn: (r) => r._field == "used")', + "my-org") + + self.assertEqual(DataFrame, type(_dataFrame)) + self.assertListEqual( + ["result", "table", "_start", "_stop", "_time", "_value", "_field", "_measurement", "host"], + list(_dataFrame.columns)) + self.assertListEqual([0, 1, 2, 3, 4], list(_dataFrame.index)) + self.assertEqual(5, len(_dataFrame)) + self.assertEqual("_result", _dataFrame['result'][0]) + self.assertEqual("_result", _dataFrame['result'][1]) + self.assertEqual("_result", _dataFrame['result'][2]) + self.assertEqual("_result", _dataFrame['result'][3]) + self.assertEqual("_result", _dataFrame['result'][4]) + self.assertEqual(0, _dataFrame['table'][0], None) + self.assertEqual(0, _dataFrame['table'][1], None) + self.assertEqual(0, _dataFrame['table'][2], None) + self.assertEqual(0, _dataFrame['table'][3], None) + self.assertEqual(0, _dataFrame['table'][4], None) + self.assertEqual(Timestamp('2019-11-12 08:09:04.795385+0000'), _dataFrame['_start'][0]) + self.assertEqual(Timestamp('2019-11-12 08:09:04.795385+0000'), _dataFrame['_start'][1]) + self.assertEqual(Timestamp('2019-11-12 08:09:04.795385+0000'), _dataFrame['_start'][2]) + self.assertEqual(Timestamp('2019-11-12 08:09:04.795385+0000'), _dataFrame['_start'][3]) + self.assertEqual(Timestamp('2019-11-12 08:09:04.795385+0000'), _dataFrame['_start'][4]) + self.assertEqual(Timestamp('2019-11-12 08:09:09.795385+0000'), _dataFrame['_stop'][0]) + self.assertEqual(Timestamp('2019-11-12 08:09:09.795385+0000'), _dataFrame['_stop'][1]) + self.assertEqual(Timestamp('2019-11-12 08:09:09.795385+0000'), _dataFrame['_stop'][2]) + self.assertEqual(Timestamp('2019-11-12 08:09:09.795385+0000'), _dataFrame['_stop'][3]) + self.assertEqual(Timestamp('2019-11-12 08:09:09.795385+0000'), _dataFrame['_stop'][4]) + self.assertEqual(Timestamp('2019-11-12 08:09:05+0000'), _dataFrame['_time'][0]) + self.assertEqual(Timestamp('2019-11-12 08:09:06+0000'), _dataFrame['_time'][1]) + self.assertEqual(Timestamp('2019-11-12 08:09:07+0000'), _dataFrame['_time'][2]) + self.assertEqual(Timestamp('2019-11-12 08:09:08+0000'), _dataFrame['_time'][3]) + self.assertEqual(Timestamp('2019-11-12 08:09:09+0000'), _dataFrame['_time'][4]) + self.assertEqual(11125907456, _dataFrame['_value'][0]) + self.assertEqual(11127103488, _dataFrame['_value'][1]) + self.assertEqual(11127291904, _dataFrame['_value'][2]) + self.assertEqual(11126190080, _dataFrame['_value'][3]) + self.assertEqual(11127832576, _dataFrame['_value'][4]) + self.assertEqual('used', _dataFrame['_field'][0]) + self.assertEqual('used', _dataFrame['_field'][1]) + self.assertEqual('used', _dataFrame['_field'][2]) + self.assertEqual('used', _dataFrame['_field'][3]) + self.assertEqual('used', _dataFrame['_field'][4]) + self.assertEqual('mem', _dataFrame['_measurement'][0]) + self.assertEqual('mem', _dataFrame['_measurement'][1]) + self.assertEqual('mem', _dataFrame['_measurement'][2]) + self.assertEqual('mem', _dataFrame['_measurement'][3]) + self.assertEqual('mem', _dataFrame['_measurement'][4]) + self.assertEqual('mac.local', _dataFrame['host'][0]) + self.assertEqual('mac.local', _dataFrame['host'][1]) + self.assertEqual('mac.local', _dataFrame['host'][2]) + self.assertEqual('mac.local', _dataFrame['host'][3]) + self.assertEqual('mac.local', _dataFrame['host'][4]) + + def test_more_table(self): + query_response = \ + '#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string\n' \ + '#group,false,false,true,true,false,false,true,true,true\n' \ + '#default,_result,,,,,,,,\n' \ + ',result,table,_start,_stop,_time,_value,_field,_measurement,host\n' \ + ',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:05Z,11125907456,used,mem,mac.local\n' \ + ',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:06Z,11127103488,used,mem,mac.local\n' \ + ',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:07Z,11127291904,used,mem,mac.local\n' \ + ',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:08Z,11126190080,used,mem,mac.local\n' \ + ',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:09Z,11127832576,used,mem,mac.local\n' \ + '\n\n' \ + '#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string\n' \ + '#group,false,false,true,true,false,false,true,true,true\n' \ + '#default,_result,,,,,,,,\n' \ + ',result,table,_start,_stop,_time,_value,_field,_measurement,host\n' \ + ',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:05Z,6053961728,available,mem,mac.local\n' \ + ',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:06Z,6052765696,available,mem,mac.local\n' \ + ',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:07Z,6052577280,available,mem,mac.local\n' \ + ',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:08Z,6053679104,available,mem,mac.local\n' \ + ',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:09Z,6052036608,available,mem,mac.local\n' \ + '\n\n' \ + '#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string\n' \ + '#group,false,false,true,true,false,false,true,true,true\n' \ + '#default,_result,,,,,,,,\n' \ + ',result,table,_start,_stop,_time,_value,_field,_measurement,host\n' \ + ',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:05Z,18632704,free,mem,mac.local\n' \ + ',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:06Z,17420288,free,mem,mac.local\n' \ + ',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:07Z,17256448,free,mem,mac.local\n' \ + ',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:08Z,18362368,free,mem,mac.local\n' \ + ',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:09Z,16723968,free,mem,mac.local\n\n' + + httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/query", status=200, body=query_response) + + self.client = InfluxDBClient("http://localhost", "my-token", org="my-org", enable_gzip=False) + + _dataFrames = self.client.query_api().query_data_frame( + 'from(bucket: "my-bucket") ' + '|> range(start: -5s, stop: now()) ' + '|> filter(fn: (r) => r._measurement == "mem") ' + '|> filter(fn: (r) => r._field == "available" or r._field == "free" or r._field == "used")', + "my-org") + + self.assertEqual(list, type(_dataFrames)) + self.assertEqual(len(_dataFrames), 3) + + self.assertListEqual( + ["result", "table", "_start", "_stop", "_time", "_value", "_field", "_measurement", "host"], + list(_dataFrames[0].columns)) + self.assertListEqual([0, 1, 2, 3, 4], list(_dataFrames[0].index)) + self.assertEqual(5, len(_dataFrames[0])) + + self.assertListEqual( + ["result", "table", "_start", "_stop", "_time", "_value", "_field", "_measurement", "host"], + list(_dataFrames[1].columns)) + self.assertListEqual([0, 1, 2, 3, 4], list(_dataFrames[1].index)) + self.assertEqual(5, len(_dataFrames[1])) + + self.assertListEqual( + ["result", "table", "_start", "_stop", "_time", "_value", "_field", "_measurement", "host"], + list(_dataFrames[2].columns)) + self.assertListEqual([0, 1, 2, 3, 4], list(_dataFrames[2].index)) + self.assertEqual(5, len(_dataFrames[2]))