Skip to content

Commit

Permalink
fix: fixed serialization of DataFrame with empty (NaN) values, fixed …
Browse files Browse the repository at this point in the history
…escaping whitespaces, fixed order of tags (#123)
  • Loading branch information
rolincova authored Jul 13, 2020
1 parent f74d183 commit d2472f4
Show file tree
Hide file tree
Showing 4 changed files with 237 additions and 55 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
### Bug Fixes
1. [#117](https://github.com/influxdata/influxdb-client-python/pull/117): Fixed appending default tags for single Point
1. [#115](https://github.com/influxdata/influxdb-client-python/pull/115): Fixed serialization of `\n`, `\r` and `\t` to Line Protocol, `=` is valid sign for measurement name
1. [#118](https://github.com/influxdata/influxdb-client-python/issues/118): Fixed serialization of DataFrame with empty (NaN) values

## 1.8.0 [2020-06-19]

Expand Down
103 changes: 103 additions & 0 deletions influxdb_client/client/write/dataframe_serializer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import re
from functools import reduce
from itertools import chain

from influxdb_client.client.write.point import _ESCAPE_KEY, _ESCAPE_MEASUREMENT

"""
Functions for serialize Pandas DataFrame.
Much of the code here is inspired by that in the aioinflux packet found here: https://github.com/gusutabopb/aioinflux
"""


def _replace(data_frame):
from ...extras import np

# string columns
obj_cols = {k for k, v in dict(data_frame.dtypes).items() if v is np.dtype('O')}

# number columns
other_cols = set(data_frame.columns) - obj_cols

obj_nans = (f'{k}=nan' for k in obj_cols)
other_nans = (f'{k}=nani?' for k in other_cols)

replacements = [
('|'.join(chain(obj_nans, other_nans)), ''),
(',{2,}', ','),
('|'.join([', ,', ', ', ' ,']), ' '),
]

return replacements


def _itertuples(data_frame):
cols = [data_frame.iloc[:, k] for k in range(len(data_frame.columns))]
return zip(data_frame.index, *cols)


def data_frame_to_list_of_points(data_frame, point_settings, **kwargs):
from ...extras import pd, np
if not isinstance(data_frame, pd.DataFrame):
raise TypeError('Must be DataFrame, but type was: {0}.'
.format(type(data_frame)))

if 'data_frame_measurement_name' not in kwargs:
raise TypeError('"data_frame_measurement_name" is a Required Argument')

if isinstance(data_frame.index, pd.PeriodIndex):
data_frame.index = data_frame.index.to_timestamp()
else:
data_frame.index = pd.to_datetime(data_frame.index)

if data_frame.index.tzinfo is None:
data_frame.index = data_frame.index.tz_localize('UTC')

measurement_name = str(kwargs.get('data_frame_measurement_name')).translate(_ESCAPE_MEASUREMENT)
data_frame_tag_columns = kwargs.get('data_frame_tag_columns')
data_frame_tag_columns = set(data_frame_tag_columns or [])

tags = []
fields = []
keys = []

if point_settings.defaultTags:
for key, value in point_settings.defaultTags.items():
data_frame[key] = value
data_frame_tag_columns.add(key)

for index, (key, value) in enumerate(data_frame.dtypes.items()):
key = str(key)
keys.append(key.translate(_ESCAPE_KEY))
key_format = f'{{keys[{index}]}}'

if key in data_frame_tag_columns:
tags.append({'key': key, 'value': f"{key_format}={{str(p[{index + 1}]).translate(_ESCAPE_KEY)}}"})
elif issubclass(value.type, np.integer):
fields.append(f"{key_format}={{p[{index + 1}]}}i")
elif issubclass(value.type, (np.float, np.bool_)):
fields.append(f"{key_format}={{p[{index + 1}]}}")
else:
fields.append(f"{key_format}=\"{{str(p[{index + 1}]).translate(_ESCAPE_KEY)}}\"")

tags.sort(key=lambda x: x['key'])
tags = ','.join(map(lambda y: y['value'], tags))

fmt = ('{measurement_name}', f'{"," if tags else ""}', tags,
' ', ','.join(fields), ' {p[0].value}')
f = eval("lambda p: f'{}'".format(''.join(fmt)),
{'measurement_name': measurement_name, '_ESCAPE_KEY': _ESCAPE_KEY, 'keys': keys})

for k, v in dict(data_frame.dtypes).items():
if k in data_frame_tag_columns:
data_frame[k].replace('', np.nan, inplace=True)

isnull = data_frame.isnull().any(axis=1)

if isnull.any():
rep = _replace(data_frame)
lp = (reduce(lambda a, b: re.sub(*b, a), rep, f(p))
for p in _itertuples(data_frame))
return list(lp)
else:
return list(map(f, _itertuples(data_frame)))
58 changes: 4 additions & 54 deletions influxdb_client/client/write_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
from rx.subject import Subject

from influxdb_client import WritePrecision, WriteService
from influxdb_client.client.write.point import Point, DEFAULT_WRITE_PRECISION, _ESCAPE_KEY
from influxdb_client.client.write.dataframe_serializer import data_frame_to_list_of_points
from influxdb_client.client.write.point import Point, DEFAULT_WRITE_PRECISION
from influxdb_client.rest import ApiException

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -258,7 +259,7 @@ def _serialize(self, record, write_precision, payload, **kwargs):
self._serialize(Point.from_dict(record, write_precision=write_precision),
write_precision, payload, **kwargs)
elif 'DataFrame' in type(record).__name__:
_data = self._data_frame_to_list_of_points(record, precision=write_precision, **kwargs)
_data = data_frame_to_list_of_points(record, self._point_settings, **kwargs)
self._serialize(_data, write_precision, payload, **kwargs)

elif isinstance(record, list):
Expand All @@ -284,7 +285,7 @@ def _write_batching(self, bucket, org, data,
precision, **kwargs)

elif 'DataFrame' in type(data).__name__:
self._write_batching(bucket, org, self._data_frame_to_list_of_points(data, precision, **kwargs),
self._write_batching(bucket, org, data_frame_to_list_of_points(data, self._point_settings, **kwargs),
precision, **kwargs)

elif isinstance(data, list):
Expand All @@ -306,57 +307,6 @@ def _append_default_tag(self, key, val, record):
for item in record:
self._append_default_tag(key, val, item)

def _itertuples(self, data_frame):
cols = [data_frame.iloc[:, k] for k in range(len(data_frame.columns))]
return zip(data_frame.index, *cols)

def _data_frame_to_list_of_points(self, data_frame, precision, **kwargs):
from ..extras import pd, np
if not isinstance(data_frame, pd.DataFrame):
raise TypeError('Must be DataFrame, but type was: {0}.'
.format(type(data_frame)))

if 'data_frame_measurement_name' not in kwargs:
raise TypeError('"data_frame_measurement_name" is a Required Argument')

if isinstance(data_frame.index, pd.PeriodIndex):
data_frame.index = data_frame.index.to_timestamp()
else:
data_frame.index = pd.to_datetime(data_frame.index)

if data_frame.index.tzinfo is None:
data_frame.index = data_frame.index.tz_localize('UTC')

measurement_name = kwargs.get('data_frame_measurement_name')
data_frame_tag_columns = kwargs.get('data_frame_tag_columns')
data_frame_tag_columns = set(data_frame_tag_columns or [])

tags = []
fields = []

if self._point_settings.defaultTags:
for key, value in self._point_settings.defaultTags.items():
data_frame[key] = value
data_frame_tag_columns.add(key)

for index, (key, value) in enumerate(data_frame.dtypes.items()):
key = str(key).translate(_ESCAPE_KEY)

if key in data_frame_tag_columns:
tags.append(f"{key}={{p[{index + 1}].translate(_ESCAPE_KEY)}}")
elif issubclass(value.type, np.integer):
fields.append(f"{key}={{p[{index + 1}]}}i")
elif issubclass(value.type, (np.float, np.bool_)):
fields.append(f"{key}={{p[{index + 1}]}}")
else:
fields.append(f"{key}=\"{{p[{index + 1}].translate(_ESCAPE_KEY)}}\"")

fmt = (f'{measurement_name}', f'{"," if tags else ""}', ','.join(tags),
' ', ','.join(fields), ' {p[0].value}')
f = eval("lambda p: f'{}'".format(''.join(fmt)))

return list(map(f, self._itertuples(data_frame)))

def _http(self, batch_item: _BatchItem):

logger.debug("Write time series data into InfluxDB: %s", batch_item)
Expand Down
130 changes: 129 additions & 1 deletion tests/test_WriteApiDataFrame.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from datetime import timedelta

from influxdb_client import InfluxDBClient, WriteOptions, WriteApi
from influxdb_client.client.write_api import SYNCHRONOUS
from influxdb_client.client.write.dataframe_serializer import data_frame_to_list_of_points
from influxdb_client.client.write_api import SYNCHRONOUS, PointSettings
from tests.base_test import BaseTest


Expand Down Expand Up @@ -86,3 +87,130 @@ def test_write_num_py(self):
self.assertEqual(result[0].records[1].get_value(), 200.0)

pass

def test_write_nan(self):
from influxdb_client.extras import pd, np

now = pd.Timestamp('2020-04-05 00:00+00:00')

data_frame = pd.DataFrame(data=[[3.1955, np.nan, 20.514305, np.nan],
[5.7310, np.nan, 23.328710, np.nan],
[np.nan, 3.138664, np.nan, 20.755026],
[5.7310, 5.139563, 23.328710, 19.791240]],
index=[now, now + timedelta(minutes=30), now + timedelta(minutes=60),
now + timedelta(minutes=90)],
columns=["actual_kw_price", "forecast_kw_price", "actual_general_use",
"forecast_general_use"])

points = data_frame_to_list_of_points(data_frame=data_frame, point_settings=PointSettings(),
data_frame_measurement_name='measurement')

self.assertEqual(4, len(points))
self.assertEqual("measurement actual_kw_price=3.1955,actual_general_use=20.514305 1586044800000000000",
points[0])
self.assertEqual("measurement actual_kw_price=5.731,actual_general_use=23.32871 1586046600000000000",
points[1])
self.assertEqual("measurement forecast_kw_price=3.138664,forecast_general_use=20.755026 1586048400000000000",
points[2])
self.assertEqual("measurement actual_kw_price=5.731,forecast_kw_price=5.139563,actual_general_use=23.32871,"
"forecast_general_use=19.79124 1586050200000000000",
points[3])

def test_write_tag_nan(self):
from influxdb_client.extras import pd, np

now = pd.Timestamp('2020-04-05 00:00+00:00')

data_frame = pd.DataFrame(data=[["", 3.1955, 20.514305],
['', 5.7310, 23.328710],
[np.nan, 5.7310, 23.328710],
["tag", 3.138664, 20.755026]],
index=[now, now + timedelta(minutes=30),
now + timedelta(minutes=60), now + timedelta(minutes=90)],
columns=["tag", "actual_kw_price", "forecast_kw_price"])

write_api = self.client.write_api(write_options=SYNCHRONOUS, point_settings=PointSettings())

points = data_frame_to_list_of_points(data_frame=data_frame,
point_settings=PointSettings(),
data_frame_measurement_name='measurement',
data_frame_tag_columns={"tag"})

self.assertEqual(4, len(points))
self.assertEqual("measurement actual_kw_price=3.1955,forecast_kw_price=20.514305 1586044800000000000",
points[0])
self.assertEqual("measurement actual_kw_price=5.731,forecast_kw_price=23.32871 1586046600000000000",
points[1])
self.assertEqual("measurement actual_kw_price=5.731,forecast_kw_price=23.32871 1586048400000000000",
points[2])
self.assertEqual("measurement,tag=tag actual_kw_price=3.138664,forecast_kw_price=20.755026 1586050200000000000",
points[3])

write_api.__del__()

def test_escaping_measurement(self):
from influxdb_client.extras import pd, np

now = pd.Timestamp('2020-04-05 00:00+00:00')

data_frame = pd.DataFrame(data=[["coyote_creek", np.int64(100.5)], ["coyote_creek", np.int64(200)]],
index=[now + timedelta(hours=1), now + timedelta(hours=2)],
columns=["location", "water_level"])

points = data_frame_to_list_of_points(data_frame=data_frame,
point_settings=PointSettings(),
data_frame_measurement_name='measu rement',
data_frame_tag_columns={"tag"})

self.assertEqual(2, len(points))
self.assertEqual("measu\\ rement location=\"coyote_creek\",water_level=100i 1586048400000000000",
points[0])
self.assertEqual("measu\\ rement location=\"coyote_creek\",water_level=200i 1586052000000000000",
points[1])

points = data_frame_to_list_of_points(data_frame=data_frame,
point_settings=PointSettings(),
data_frame_measurement_name='measu\nrement2',
data_frame_tag_columns={"tag"})

self.assertEqual(2, len(points))
self.assertEqual("measu\\nrement2 location=\"coyote_creek\",water_level=100i 1586048400000000000",
points[0])
self.assertEqual("measu\\nrement2 location=\"coyote_creek\",water_level=200i 1586052000000000000",
points[1])

def test_tag_escaping_key_and_value(self):
from influxdb_client.extras import pd, np

now = pd.Timestamp('2020-04-05 00:00+00:00')

data_frame = pd.DataFrame(data=[["carriage\nreturn", "new\nline", "t\tab", np.int64(2)], ],
index=[now + timedelta(hours=1), ],
columns=["carriage\rreturn", "new\nline", "t\tab", "l\ne\rv\tel"])

points = data_frame_to_list_of_points(data_frame=data_frame,
point_settings=PointSettings(),
data_frame_measurement_name='h\n2\ro\t_data',
data_frame_tag_columns={"new\nline", "carriage\rreturn", "t\tab"})

self.assertEqual(1, len(points))
self.assertEqual(
"h\\n2\\ro\\t_data,carriage\\rreturn=carriage\\nreturn,new\\nline=new\\nline,t\\tab=t\\tab l\\ne\\rv\\tel=2i 1586048400000000000",
points[0])

def test_tags_order(self):
from influxdb_client.extras import pd, np

now = pd.Timestamp('2020-04-05 00:00+00:00')

data_frame = pd.DataFrame(data=[["c", "a", "b", np.int64(2)], ],
index=[now + timedelta(hours=1), ],
columns=["c", "a", "b", "level"])

points = data_frame_to_list_of_points(data_frame=data_frame,
point_settings=PointSettings(),
data_frame_measurement_name='h2o',
data_frame_tag_columns={"c", "a", "b"})

self.assertEqual(1, len(points))
self.assertEqual("h2o,a=a,b=b,c=c level=2i 1586048400000000000", points[0])

0 comments on commit d2472f4

Please sign in to comment.