Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fixed serialization of DataFrame with empty (NaN) values, fixed escaping whitespaces, fixed order of tags #123

Merged
merged 2 commits into from
Jul 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
### Bug Fixes
1. [#117](https://github.com/influxdata/influxdb-client-python/pull/117): Fixed appending default tags for single Point
1. [#115](https://github.com/influxdata/influxdb-client-python/pull/115): Fixed serialization of `\n`, `\r` and `\t` to Line Protocol, `=` is valid sign for measurement name
1. [#118](https://github.com/influxdata/influxdb-client-python/issues/118): Fixed serialization of DataFrame with empty (NaN) values

## 1.8.0 [2020-06-19]

Expand Down
103 changes: 103 additions & 0 deletions influxdb_client/client/write/dataframe_serializer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import re
from functools import reduce
from itertools import chain

from influxdb_client.client.write.point import _ESCAPE_KEY, _ESCAPE_MEASUREMENT

"""
Functions for serialize Pandas DataFrame.
Much of the code here is inspired by that in the aioinflux packet found here: https://github.com/gusutabopb/aioinflux
"""


def _replace(data_frame):
from ...extras import np

# string columns
obj_cols = {k for k, v in dict(data_frame.dtypes).items() if v is np.dtype('O')}

# number columns
other_cols = set(data_frame.columns) - obj_cols

obj_nans = (f'{k}=nan' for k in obj_cols)
other_nans = (f'{k}=nani?' for k in other_cols)

replacements = [
('|'.join(chain(obj_nans, other_nans)), ''),
(',{2,}', ','),
('|'.join([', ,', ', ', ' ,']), ' '),
]

return replacements


def _itertuples(data_frame):
cols = [data_frame.iloc[:, k] for k in range(len(data_frame.columns))]
return zip(data_frame.index, *cols)


def data_frame_to_list_of_points(data_frame, point_settings, **kwargs):
from ...extras import pd, np
if not isinstance(data_frame, pd.DataFrame):
raise TypeError('Must be DataFrame, but type was: {0}.'
.format(type(data_frame)))

if 'data_frame_measurement_name' not in kwargs:
raise TypeError('"data_frame_measurement_name" is a Required Argument')

if isinstance(data_frame.index, pd.PeriodIndex):
data_frame.index = data_frame.index.to_timestamp()
else:
data_frame.index = pd.to_datetime(data_frame.index)

if data_frame.index.tzinfo is None:
data_frame.index = data_frame.index.tz_localize('UTC')

measurement_name = str(kwargs.get('data_frame_measurement_name')).translate(_ESCAPE_MEASUREMENT)
data_frame_tag_columns = kwargs.get('data_frame_tag_columns')
data_frame_tag_columns = set(data_frame_tag_columns or [])

tags = []
fields = []
keys = []

if point_settings.defaultTags:
for key, value in point_settings.defaultTags.items():
data_frame[key] = value
data_frame_tag_columns.add(key)

for index, (key, value) in enumerate(data_frame.dtypes.items()):
key = str(key)
keys.append(key.translate(_ESCAPE_KEY))
key_format = f'{{keys[{index}]}}'

if key in data_frame_tag_columns:
tags.append({'key': key, 'value': f"{key_format}={{str(p[{index + 1}]).translate(_ESCAPE_KEY)}}"})
elif issubclass(value.type, np.integer):
fields.append(f"{key_format}={{p[{index + 1}]}}i")
elif issubclass(value.type, (np.float, np.bool_)):
fields.append(f"{key_format}={{p[{index + 1}]}}")
else:
fields.append(f"{key_format}=\"{{str(p[{index + 1}]).translate(_ESCAPE_KEY)}}\"")

tags.sort(key=lambda x: x['key'])
tags = ','.join(map(lambda y: y['value'], tags))

fmt = ('{measurement_name}', f'{"," if tags else ""}', tags,
' ', ','.join(fields), ' {p[0].value}')
f = eval("lambda p: f'{}'".format(''.join(fmt)),
{'measurement_name': measurement_name, '_ESCAPE_KEY': _ESCAPE_KEY, 'keys': keys})

for k, v in dict(data_frame.dtypes).items():
if k in data_frame_tag_columns:
data_frame[k].replace('', np.nan, inplace=True)

isnull = data_frame.isnull().any(axis=1)

if isnull.any():
rep = _replace(data_frame)
lp = (reduce(lambda a, b: re.sub(*b, a), rep, f(p))
for p in _itertuples(data_frame))
return list(lp)
else:
return list(map(f, _itertuples(data_frame)))
58 changes: 4 additions & 54 deletions influxdb_client/client/write_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
from rx.subject import Subject

from influxdb_client import WritePrecision, WriteService
from influxdb_client.client.write.point import Point, DEFAULT_WRITE_PRECISION, _ESCAPE_KEY
from influxdb_client.client.write.dataframe_serializer import data_frame_to_list_of_points
from influxdb_client.client.write.point import Point, DEFAULT_WRITE_PRECISION
from influxdb_client.rest import ApiException

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -258,7 +259,7 @@ def _serialize(self, record, write_precision, payload, **kwargs):
self._serialize(Point.from_dict(record, write_precision=write_precision),
write_precision, payload, **kwargs)
elif 'DataFrame' in type(record).__name__:
_data = self._data_frame_to_list_of_points(record, precision=write_precision, **kwargs)
_data = data_frame_to_list_of_points(record, self._point_settings, **kwargs)
self._serialize(_data, write_precision, payload, **kwargs)

elif isinstance(record, list):
Expand All @@ -284,7 +285,7 @@ def _write_batching(self, bucket, org, data,
precision, **kwargs)

elif 'DataFrame' in type(data).__name__:
self._write_batching(bucket, org, self._data_frame_to_list_of_points(data, precision, **kwargs),
self._write_batching(bucket, org, data_frame_to_list_of_points(data, self._point_settings, **kwargs),
precision, **kwargs)

elif isinstance(data, list):
Expand All @@ -306,57 +307,6 @@ def _append_default_tag(self, key, val, record):
for item in record:
self._append_default_tag(key, val, item)

def _itertuples(self, data_frame):
cols = [data_frame.iloc[:, k] for k in range(len(data_frame.columns))]
return zip(data_frame.index, *cols)

def _data_frame_to_list_of_points(self, data_frame, precision, **kwargs):
from ..extras import pd, np
if not isinstance(data_frame, pd.DataFrame):
raise TypeError('Must be DataFrame, but type was: {0}.'
.format(type(data_frame)))

if 'data_frame_measurement_name' not in kwargs:
raise TypeError('"data_frame_measurement_name" is a Required Argument')

if isinstance(data_frame.index, pd.PeriodIndex):
data_frame.index = data_frame.index.to_timestamp()
else:
data_frame.index = pd.to_datetime(data_frame.index)

if data_frame.index.tzinfo is None:
data_frame.index = data_frame.index.tz_localize('UTC')

measurement_name = kwargs.get('data_frame_measurement_name')
data_frame_tag_columns = kwargs.get('data_frame_tag_columns')
data_frame_tag_columns = set(data_frame_tag_columns or [])

tags = []
fields = []

if self._point_settings.defaultTags:
for key, value in self._point_settings.defaultTags.items():
data_frame[key] = value
data_frame_tag_columns.add(key)

for index, (key, value) in enumerate(data_frame.dtypes.items()):
key = str(key).translate(_ESCAPE_KEY)

if key in data_frame_tag_columns:
tags.append(f"{key}={{p[{index + 1}].translate(_ESCAPE_KEY)}}")
elif issubclass(value.type, np.integer):
fields.append(f"{key}={{p[{index + 1}]}}i")
elif issubclass(value.type, (np.float, np.bool_)):
fields.append(f"{key}={{p[{index + 1}]}}")
else:
fields.append(f"{key}=\"{{p[{index + 1}].translate(_ESCAPE_KEY)}}\"")

fmt = (f'{measurement_name}', f'{"," if tags else ""}', ','.join(tags),
' ', ','.join(fields), ' {p[0].value}')
f = eval("lambda p: f'{}'".format(''.join(fmt)))

return list(map(f, self._itertuples(data_frame)))

def _http(self, batch_item: _BatchItem):

logger.debug("Write time series data into InfluxDB: %s", batch_item)
Expand Down
130 changes: 129 additions & 1 deletion tests/test_WriteApiDataFrame.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from datetime import timedelta

from influxdb_client import InfluxDBClient, WriteOptions, WriteApi
from influxdb_client.client.write_api import SYNCHRONOUS
from influxdb_client.client.write.dataframe_serializer import data_frame_to_list_of_points
from influxdb_client.client.write_api import SYNCHRONOUS, PointSettings
from tests.base_test import BaseTest


Expand Down Expand Up @@ -86,3 +87,130 @@ def test_write_num_py(self):
self.assertEqual(result[0].records[1].get_value(), 200.0)

pass

def test_write_nan(self):
from influxdb_client.extras import pd, np

now = pd.Timestamp('2020-04-05 00:00+00:00')

data_frame = pd.DataFrame(data=[[3.1955, np.nan, 20.514305, np.nan],
[5.7310, np.nan, 23.328710, np.nan],
[np.nan, 3.138664, np.nan, 20.755026],
[5.7310, 5.139563, 23.328710, 19.791240]],
index=[now, now + timedelta(minutes=30), now + timedelta(minutes=60),
now + timedelta(minutes=90)],
columns=["actual_kw_price", "forecast_kw_price", "actual_general_use",
"forecast_general_use"])

points = data_frame_to_list_of_points(data_frame=data_frame, point_settings=PointSettings(),
data_frame_measurement_name='measurement')

self.assertEqual(4, len(points))
self.assertEqual("measurement actual_kw_price=3.1955,actual_general_use=20.514305 1586044800000000000",
points[0])
self.assertEqual("measurement actual_kw_price=5.731,actual_general_use=23.32871 1586046600000000000",
points[1])
self.assertEqual("measurement forecast_kw_price=3.138664,forecast_general_use=20.755026 1586048400000000000",
points[2])
self.assertEqual("measurement actual_kw_price=5.731,forecast_kw_price=5.139563,actual_general_use=23.32871,"
"forecast_general_use=19.79124 1586050200000000000",
points[3])

def test_write_tag_nan(self):
from influxdb_client.extras import pd, np

now = pd.Timestamp('2020-04-05 00:00+00:00')

data_frame = pd.DataFrame(data=[["", 3.1955, 20.514305],
['', 5.7310, 23.328710],
[np.nan, 5.7310, 23.328710],
["tag", 3.138664, 20.755026]],
index=[now, now + timedelta(minutes=30),
now + timedelta(minutes=60), now + timedelta(minutes=90)],
columns=["tag", "actual_kw_price", "forecast_kw_price"])

write_api = self.client.write_api(write_options=SYNCHRONOUS, point_settings=PointSettings())

points = data_frame_to_list_of_points(data_frame=data_frame,
point_settings=PointSettings(),
data_frame_measurement_name='measurement',
data_frame_tag_columns={"tag"})

self.assertEqual(4, len(points))
self.assertEqual("measurement actual_kw_price=3.1955,forecast_kw_price=20.514305 1586044800000000000",
points[0])
self.assertEqual("measurement actual_kw_price=5.731,forecast_kw_price=23.32871 1586046600000000000",
points[1])
self.assertEqual("measurement actual_kw_price=5.731,forecast_kw_price=23.32871 1586048400000000000",
points[2])
self.assertEqual("measurement,tag=tag actual_kw_price=3.138664,forecast_kw_price=20.755026 1586050200000000000",
points[3])

write_api.__del__()

def test_escaping_measurement(self):
from influxdb_client.extras import pd, np

now = pd.Timestamp('2020-04-05 00:00+00:00')

data_frame = pd.DataFrame(data=[["coyote_creek", np.int64(100.5)], ["coyote_creek", np.int64(200)]],
index=[now + timedelta(hours=1), now + timedelta(hours=2)],
columns=["location", "water_level"])

points = data_frame_to_list_of_points(data_frame=data_frame,
point_settings=PointSettings(),
data_frame_measurement_name='measu rement',
data_frame_tag_columns={"tag"})

self.assertEqual(2, len(points))
self.assertEqual("measu\\ rement location=\"coyote_creek\",water_level=100i 1586048400000000000",
points[0])
self.assertEqual("measu\\ rement location=\"coyote_creek\",water_level=200i 1586052000000000000",
points[1])

points = data_frame_to_list_of_points(data_frame=data_frame,
point_settings=PointSettings(),
data_frame_measurement_name='measu\nrement2',
data_frame_tag_columns={"tag"})

self.assertEqual(2, len(points))
self.assertEqual("measu\\nrement2 location=\"coyote_creek\",water_level=100i 1586048400000000000",
points[0])
self.assertEqual("measu\\nrement2 location=\"coyote_creek\",water_level=200i 1586052000000000000",
points[1])

def test_tag_escaping_key_and_value(self):
from influxdb_client.extras import pd, np

now = pd.Timestamp('2020-04-05 00:00+00:00')

data_frame = pd.DataFrame(data=[["carriage\nreturn", "new\nline", "t\tab", np.int64(2)], ],
index=[now + timedelta(hours=1), ],
columns=["carriage\rreturn", "new\nline", "t\tab", "l\ne\rv\tel"])

points = data_frame_to_list_of_points(data_frame=data_frame,
point_settings=PointSettings(),
data_frame_measurement_name='h\n2\ro\t_data',
data_frame_tag_columns={"new\nline", "carriage\rreturn", "t\tab"})

self.assertEqual(1, len(points))
self.assertEqual(
"h\\n2\\ro\\t_data,carriage\\rreturn=carriage\\nreturn,new\\nline=new\\nline,t\\tab=t\\tab l\\ne\\rv\\tel=2i 1586048400000000000",
points[0])

def test_tags_order(self):
from influxdb_client.extras import pd, np

now = pd.Timestamp('2020-04-05 00:00+00:00')

data_frame = pd.DataFrame(data=[["c", "a", "b", np.int64(2)], ],
index=[now + timedelta(hours=1), ],
columns=["c", "a", "b", "level"])

points = data_frame_to_list_of_points(data_frame=data_frame,
point_settings=PointSettings(),
data_frame_measurement_name='h2o',
data_frame_tag_columns={"c", "a", "b"})

self.assertEqual(1, len(points))
self.assertEqual("h2o,a=a,b=b,c=c level=2i 1586048400000000000", points[0])