Skip to content

Commit

Permalink
feat: Add on-fly parsing of response to pandas (#29)
Browse files Browse the repository at this point in the history
  • Loading branch information
bednar committed Nov 13, 2019
1 parent 6407789 commit 9e96c58
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 26 deletions.
38 changes: 33 additions & 5 deletions influxdb_client/client/flux_csv_parser.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import base64
import codecs
import csv as csv_parser
from enum import Enum

import ciso8601
from pandas import DataFrame
from urllib3 import HTTPResponse

from influxdb_client.client.flux_table import FluxTable, FluxColumn, FluxRecord
Expand All @@ -18,12 +20,18 @@ class FluxCsvParserException(Exception):
pass


class FluxSerializationMode(Enum):
tables = 1
stream = 2
dataFrame = 3


class FluxCsvParser(object):

def __init__(self, response: HTTPResponse, stream: bool) -> None:
def __init__(self, response: HTTPResponse, serialization_mode: FluxSerializationMode) -> None:
self._response = response
self.tables = []
self._stream = stream
self._serialization_mode = serialization_mode
pass

def __enter__(self):
Expand Down Expand Up @@ -64,6 +72,11 @@ def _parse_flux_response(self):
token = csv[0]
# start new table
if "#datatype" == token:

# Return already parsed DataFrame
if (self._serialization_mode is FluxSerializationMode.dataFrame) & hasattr(self, '_dataFrame'):
yield self._dataFrame

start_new_table = True
table = FluxTable()
self._insert_table(table, table_index)
Expand All @@ -86,6 +99,12 @@ def _parse_flux_response(self):
if start_new_table:
self.add_column_names_and_tags(table, csv)
start_new_table = False
# Create DataFrame with default values
if self._serialization_mode is FluxSerializationMode.dataFrame:
self._dataFrame = DataFrame(data=[], columns=[], index=None)
for column in table.columns:
self._dataFrame[column.label] = column.default_value
pass
continue

# to int converions todo
Expand All @@ -101,14 +120,23 @@ def _parse_flux_response(self):

flux_record = self.parse_record(table_index - 1, table, csv)

if not self._stream:
if self._serialization_mode is FluxSerializationMode.tables:
self.tables[table_index - 1].records.append(flux_record)

yield flux_record
if self._serialization_mode is FluxSerializationMode.stream:
yield flux_record

if self._serialization_mode is FluxSerializationMode.dataFrame:
self._dataFrame.loc[len(self._dataFrame.index)] = flux_record.values
pass

# debug
# print(flux_record)

# Return latest DataFrame
if (self._serialization_mode is FluxSerializationMode.dataFrame) & hasattr(self, '_dataFrame'):
yield self._dataFrame

def parse_record(self, table_index, table, csv):
record = FluxRecord(table_index)

Expand Down Expand Up @@ -180,5 +208,5 @@ def add_column_names_and_tags(table, csv):
i += 1

def _insert_table(self, table, table_index):
if not self._stream:
if self._serialization_mode is FluxSerializationMode.tables:
self.tables.insert(table_index, table)
34 changes: 13 additions & 21 deletions influxdb_client/client/query_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@
import csv
from typing import List, Generator, Any

from pandas import DataFrame

from influxdb_client import Dialect
from influxdb_client import Query, QueryService
from influxdb_client.client.flux_csv_parser import FluxCsvParser
from influxdb_client.client.flux_csv_parser import FluxCsvParser, FluxSerializationMode
from influxdb_client.client.flux_table import FluxTable, FluxRecord


Expand Down Expand Up @@ -70,7 +68,7 @@ def query(self, query: str, org=None) -> List['FluxTable']:
response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect),
async_req=False, _preload_content=False, _return_http_data_only=False)

_parser = FluxCsvParser(response=response, stream=False)
_parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.tables)

list(_parser.generator())

Expand All @@ -90,13 +88,14 @@ def query_stream(self, query: str, org=None) -> Generator['FluxRecord', Any, Non
response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect),
async_req=False, _preload_content=False, _return_http_data_only=False)

_parser = FluxCsvParser(response=response, stream=True)
_parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.stream)

return _parser.generator()

def query_data_frame(self, query: str, org=None):
"""
Synchronously executes the Flux query and return Pandas DataFrame
Synchronously executes the Flux query and return Pandas DataFrame.
Note that if a query returns more then one table than the client generates a dataframe for each of them.
:param query: the Flux query
:param org: organization name (optional if already specified in InfluxDBClient)
Expand All @@ -105,23 +104,16 @@ def query_data_frame(self, query: str, org=None):
if org is None:
org = self._influxdb_client.org

flux_tables = self.query(query=query, org=org)

if len(flux_tables) == 0:
return DataFrame
response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect),
async_req=False, _preload_content=False, _return_http_data_only=False)

if len(flux_tables) > 1:
raise Exception("Flux query result must contain one table.")
_parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.dataFrame)
_dataFrames = list(_parser.generator())

table = flux_tables[0]
data = []
column_names = list(map(lambda c: c.label, table.columns))
for record in table:
row = []
for column_name in column_names:
row.append(record[column_name])
data.append(row)
return DataFrame(data=data, columns=column_names, index=None)
if len(_dataFrames) == 1:
return _dataFrames[0]
else:
return _dataFrames

# private helper for c
@staticmethod
Expand Down
163 changes: 163 additions & 0 deletions tests/test_QueryApiDataFrame.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
import httpretty
from pandas import DataFrame
from pandas._libs.tslibs.timestamps import Timestamp

from influxdb_client import InfluxDBClient
from tests.base_test import BaseTest


class QueryDataFrameApi(BaseTest):

def setUp(self) -> None:
super(QueryDataFrameApi, self).setUp()
# https://github.com/gabrielfalcao/HTTPretty/issues/368
import warnings
warnings.filterwarnings("ignore", category=ResourceWarning, message="unclosed.*")
warnings.filterwarnings("ignore", category=PendingDeprecationWarning, message="isAlive*")

httpretty.enable()
httpretty.reset()

def tearDown(self) -> None:
self.client.__del__()
httpretty.disable()

def test_one_table(self):
query_response = \
'#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string\n' \
'#group,false,false,true,true,false,false,true,true,true\n' \
'#default,_result,,,,,,,,\n' \
',result,table,_start,_stop,_time,_value,_field,_measurement,host\n' \
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:05Z,11125907456,used,mem,mac.local\n' \
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:06Z,11127103488,used,mem,mac.local\n' \
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:07Z,11127291904,used,mem,mac.local\n' \
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:08Z,11126190080,used,mem,mac.local\n' \
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:09Z,11127832576,used,mem,mac.local\n' \
'\n\n'

httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/query", status=200, body=query_response)

self.client = InfluxDBClient("http://localhost", "my-token", org="my-org", enable_gzip=False)

_dataFrame = self.client.query_api().query_data_frame(
'from(bucket: "my-bucket") '
'|> range(start: -5s, stop: now()) '
'|> filter(fn: (r) => r._measurement == "mem") '
'|> filter(fn: (r) => r._field == "used")',
"my-org")

self.assertEqual(DataFrame, type(_dataFrame))
self.assertListEqual(
["result", "table", "_start", "_stop", "_time", "_value", "_field", "_measurement", "host"],
list(_dataFrame.columns))
self.assertListEqual([0, 1, 2, 3, 4], list(_dataFrame.index))
self.assertEqual(5, len(_dataFrame))
self.assertEqual("_result", _dataFrame['result'][0])
self.assertEqual("_result", _dataFrame['result'][1])
self.assertEqual("_result", _dataFrame['result'][2])
self.assertEqual("_result", _dataFrame['result'][3])
self.assertEqual("_result", _dataFrame['result'][4])
self.assertEqual(0, _dataFrame['table'][0], None)
self.assertEqual(0, _dataFrame['table'][1], None)
self.assertEqual(0, _dataFrame['table'][2], None)
self.assertEqual(0, _dataFrame['table'][3], None)
self.assertEqual(0, _dataFrame['table'][4], None)
self.assertEqual(Timestamp('2019-11-12 08:09:04.795385+0000'), _dataFrame['_start'][0])
self.assertEqual(Timestamp('2019-11-12 08:09:04.795385+0000'), _dataFrame['_start'][1])
self.assertEqual(Timestamp('2019-11-12 08:09:04.795385+0000'), _dataFrame['_start'][2])
self.assertEqual(Timestamp('2019-11-12 08:09:04.795385+0000'), _dataFrame['_start'][3])
self.assertEqual(Timestamp('2019-11-12 08:09:04.795385+0000'), _dataFrame['_start'][4])
self.assertEqual(Timestamp('2019-11-12 08:09:09.795385+0000'), _dataFrame['_stop'][0])
self.assertEqual(Timestamp('2019-11-12 08:09:09.795385+0000'), _dataFrame['_stop'][1])
self.assertEqual(Timestamp('2019-11-12 08:09:09.795385+0000'), _dataFrame['_stop'][2])
self.assertEqual(Timestamp('2019-11-12 08:09:09.795385+0000'), _dataFrame['_stop'][3])
self.assertEqual(Timestamp('2019-11-12 08:09:09.795385+0000'), _dataFrame['_stop'][4])
self.assertEqual(Timestamp('2019-11-12 08:09:05+0000'), _dataFrame['_time'][0])
self.assertEqual(Timestamp('2019-11-12 08:09:06+0000'), _dataFrame['_time'][1])
self.assertEqual(Timestamp('2019-11-12 08:09:07+0000'), _dataFrame['_time'][2])
self.assertEqual(Timestamp('2019-11-12 08:09:08+0000'), _dataFrame['_time'][3])
self.assertEqual(Timestamp('2019-11-12 08:09:09+0000'), _dataFrame['_time'][4])
self.assertEqual(11125907456, _dataFrame['_value'][0])
self.assertEqual(11127103488, _dataFrame['_value'][1])
self.assertEqual(11127291904, _dataFrame['_value'][2])
self.assertEqual(11126190080, _dataFrame['_value'][3])
self.assertEqual(11127832576, _dataFrame['_value'][4])
self.assertEqual('used', _dataFrame['_field'][0])
self.assertEqual('used', _dataFrame['_field'][1])
self.assertEqual('used', _dataFrame['_field'][2])
self.assertEqual('used', _dataFrame['_field'][3])
self.assertEqual('used', _dataFrame['_field'][4])
self.assertEqual('mem', _dataFrame['_measurement'][0])
self.assertEqual('mem', _dataFrame['_measurement'][1])
self.assertEqual('mem', _dataFrame['_measurement'][2])
self.assertEqual('mem', _dataFrame['_measurement'][3])
self.assertEqual('mem', _dataFrame['_measurement'][4])
self.assertEqual('mac.local', _dataFrame['host'][0])
self.assertEqual('mac.local', _dataFrame['host'][1])
self.assertEqual('mac.local', _dataFrame['host'][2])
self.assertEqual('mac.local', _dataFrame['host'][3])
self.assertEqual('mac.local', _dataFrame['host'][4])

def test_more_table(self):
query_response = \
'#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string\n' \
'#group,false,false,true,true,false,false,true,true,true\n' \
'#default,_result,,,,,,,,\n' \
',result,table,_start,_stop,_time,_value,_field,_measurement,host\n' \
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:05Z,11125907456,used,mem,mac.local\n' \
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:06Z,11127103488,used,mem,mac.local\n' \
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:07Z,11127291904,used,mem,mac.local\n' \
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:08Z,11126190080,used,mem,mac.local\n' \
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:09Z,11127832576,used,mem,mac.local\n' \
'\n\n' \
'#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string\n' \
'#group,false,false,true,true,false,false,true,true,true\n' \
'#default,_result,,,,,,,,\n' \
',result,table,_start,_stop,_time,_value,_field,_measurement,host\n' \
',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:05Z,6053961728,available,mem,mac.local\n' \
',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:06Z,6052765696,available,mem,mac.local\n' \
',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:07Z,6052577280,available,mem,mac.local\n' \
',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:08Z,6053679104,available,mem,mac.local\n' \
',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:09Z,6052036608,available,mem,mac.local\n' \
'\n\n' \
'#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string\n' \
'#group,false,false,true,true,false,false,true,true,true\n' \
'#default,_result,,,,,,,,\n' \
',result,table,_start,_stop,_time,_value,_field,_measurement,host\n' \
',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:05Z,18632704,free,mem,mac.local\n' \
',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:06Z,17420288,free,mem,mac.local\n' \
',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:07Z,17256448,free,mem,mac.local\n' \
',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:08Z,18362368,free,mem,mac.local\n' \
',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:09Z,16723968,free,mem,mac.local\n\n'

httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/query", status=200, body=query_response)

self.client = InfluxDBClient("http://localhost", "my-token", org="my-org", enable_gzip=False)

_dataFrames = self.client.query_api().query_data_frame(
'from(bucket: "my-bucket") '
'|> range(start: -5s, stop: now()) '
'|> filter(fn: (r) => r._measurement == "mem") '
'|> filter(fn: (r) => r._field == "available" or r._field == "free" or r._field == "used")',
"my-org")

self.assertEqual(list, type(_dataFrames))
self.assertEqual(len(_dataFrames), 3)

self.assertListEqual(
["result", "table", "_start", "_stop", "_time", "_value", "_field", "_measurement", "host"],
list(_dataFrames[0].columns))
self.assertListEqual([0, 1, 2, 3, 4], list(_dataFrames[0].index))
self.assertEqual(5, len(_dataFrames[0]))

self.assertListEqual(
["result", "table", "_start", "_stop", "_time", "_value", "_field", "_measurement", "host"],
list(_dataFrames[1].columns))
self.assertListEqual([0, 1, 2, 3, 4], list(_dataFrames[1].index))
self.assertEqual(5, len(_dataFrames[1]))

self.assertListEqual(
["result", "table", "_start", "_stop", "_time", "_value", "_field", "_measurement", "host"],
list(_dataFrames[2].columns))
self.assertListEqual([0, 1, 2, 3, 4], list(_dataFrames[2].index))
self.assertEqual(5, len(_dataFrames[2]))

0 comments on commit 9e96c58

Please sign in to comment.