diff --git a/CHANGELOG.md b/CHANGELOG.md index a362adc5..68a23061 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ ### Bug Fixes 1. [#117](https://github.com/influxdata/influxdb-client-python/pull/117): Fixed appending default tags for single Point 1. [#115](https://github.com/influxdata/influxdb-client-python/pull/115): Fixed serialization of `\n`, `\r` and `\t` to Line Protocol, `=` is valid sign for measurement name +1. [#118](https://github.com/influxdata/influxdb-client-python/issues/118): Fixed serialization of DataFrame with empty (NaN) values ## 1.8.0 [2020-06-19] diff --git a/influxdb_client/client/write/dataframe_serializer.py b/influxdb_client/client/write/dataframe_serializer.py new file mode 100644 index 00000000..e8fac4bb --- /dev/null +++ b/influxdb_client/client/write/dataframe_serializer.py @@ -0,0 +1,103 @@ +import re +from functools import reduce +from itertools import chain + +from influxdb_client.client.write.point import _ESCAPE_KEY, _ESCAPE_MEASUREMENT + +""" +Functions for serialize Pandas DataFrame. +Much of the code here is inspired by that in the aioinflux packet found here: https://github.com/gusutabopb/aioinflux +""" + + +def _replace(data_frame): + from ...extras import np + + # string columns + obj_cols = {k for k, v in dict(data_frame.dtypes).items() if v is np.dtype('O')} + + # number columns + other_cols = set(data_frame.columns) - obj_cols + + obj_nans = (f'{k}=nan' for k in obj_cols) + other_nans = (f'{k}=nani?' for k in other_cols) + + replacements = [ + ('|'.join(chain(obj_nans, other_nans)), ''), + (',{2,}', ','), + ('|'.join([', ,', ', ', ' ,']), ' '), + ] + + return replacements + + +def _itertuples(data_frame): + cols = [data_frame.iloc[:, k] for k in range(len(data_frame.columns))] + return zip(data_frame.index, *cols) + + +def data_frame_to_list_of_points(data_frame, point_settings, **kwargs): + from ...extras import pd, np + if not isinstance(data_frame, pd.DataFrame): + raise TypeError('Must be DataFrame, but type was: {0}.' + .format(type(data_frame))) + + if 'data_frame_measurement_name' not in kwargs: + raise TypeError('"data_frame_measurement_name" is a Required Argument') + + if isinstance(data_frame.index, pd.PeriodIndex): + data_frame.index = data_frame.index.to_timestamp() + else: + data_frame.index = pd.to_datetime(data_frame.index) + + if data_frame.index.tzinfo is None: + data_frame.index = data_frame.index.tz_localize('UTC') + + measurement_name = str(kwargs.get('data_frame_measurement_name')).translate(_ESCAPE_MEASUREMENT) + data_frame_tag_columns = kwargs.get('data_frame_tag_columns') + data_frame_tag_columns = set(data_frame_tag_columns or []) + + tags = [] + fields = [] + keys = [] + + if point_settings.defaultTags: + for key, value in point_settings.defaultTags.items(): + data_frame[key] = value + data_frame_tag_columns.add(key) + + for index, (key, value) in enumerate(data_frame.dtypes.items()): + key = str(key) + keys.append(key.translate(_ESCAPE_KEY)) + key_format = f'{{keys[{index}]}}' + + if key in data_frame_tag_columns: + tags.append({'key': key, 'value': f"{key_format}={{str(p[{index + 1}]).translate(_ESCAPE_KEY)}}"}) + elif issubclass(value.type, np.integer): + fields.append(f"{key_format}={{p[{index + 1}]}}i") + elif issubclass(value.type, (np.float, np.bool_)): + fields.append(f"{key_format}={{p[{index + 1}]}}") + else: + fields.append(f"{key_format}=\"{{str(p[{index + 1}]).translate(_ESCAPE_KEY)}}\"") + + tags.sort(key=lambda x: x['key']) + tags = ','.join(map(lambda y: y['value'], tags)) + + fmt = (f'{{measurement_name}}', f'{"," if tags else ""}', tags, + ' ', ','.join(fields), ' {p[0].value}') + f = eval("lambda p: f'{}'".format(''.join(fmt)), + {'measurement_name': measurement_name, '_ESCAPE_KEY': _ESCAPE_KEY, 'keys': keys}) + + for k, v in dict(data_frame.dtypes).items(): + if k in data_frame_tag_columns: + data_frame[k].replace('', np.nan, inplace=True) + + isnull = data_frame.isnull().any(axis=1) + + if isnull.any(): + rep = _replace(data_frame) + lp = (reduce(lambda a, b: re.sub(*b, a), rep, f(p)) + for p in _itertuples(data_frame)) + return list(lp) + else: + return list(map(f, _itertuples(data_frame))) \ No newline at end of file diff --git a/influxdb_client/client/write_api.py b/influxdb_client/client/write_api.py index cf71fa97..75b7d114 100644 --- a/influxdb_client/client/write_api.py +++ b/influxdb_client/client/write_api.py @@ -14,7 +14,8 @@ from rx.subject import Subject from influxdb_client import WritePrecision, WriteService -from influxdb_client.client.write.point import Point, DEFAULT_WRITE_PRECISION, _ESCAPE_KEY +from influxdb_client.client.write.dataframe_serializer import data_frame_to_list_of_points +from influxdb_client.client.write.point import Point, DEFAULT_WRITE_PRECISION from influxdb_client.rest import ApiException logger = logging.getLogger(__name__) @@ -258,7 +259,7 @@ def _serialize(self, record, write_precision, payload, **kwargs): self._serialize(Point.from_dict(record, write_precision=write_precision), write_precision, payload, **kwargs) elif 'DataFrame' in type(record).__name__: - _data = self._data_frame_to_list_of_points(record, precision=write_precision, **kwargs) + _data = data_frame_to_list_of_points(record, self._point_settings, **kwargs) self._serialize(_data, write_precision, payload, **kwargs) elif isinstance(record, list): @@ -284,7 +285,7 @@ def _write_batching(self, bucket, org, data, precision, **kwargs) elif 'DataFrame' in type(data).__name__: - self._write_batching(bucket, org, self._data_frame_to_list_of_points(data, precision, **kwargs), + self._write_batching(bucket, org, data_frame_to_list_of_points(data, self._point_settings, **kwargs), precision, **kwargs) elif isinstance(data, list): @@ -306,57 +307,6 @@ def _append_default_tag(self, key, val, record): for item in record: self._append_default_tag(key, val, item) - def _itertuples(self, data_frame): - cols = [data_frame.iloc[:, k] for k in range(len(data_frame.columns))] - return zip(data_frame.index, *cols) - - def _data_frame_to_list_of_points(self, data_frame, precision, **kwargs): - from ..extras import pd, np - if not isinstance(data_frame, pd.DataFrame): - raise TypeError('Must be DataFrame, but type was: {0}.' - .format(type(data_frame))) - - if 'data_frame_measurement_name' not in kwargs: - raise TypeError('"data_frame_measurement_name" is a Required Argument') - - if isinstance(data_frame.index, pd.PeriodIndex): - data_frame.index = data_frame.index.to_timestamp() - else: - data_frame.index = pd.to_datetime(data_frame.index) - - if data_frame.index.tzinfo is None: - data_frame.index = data_frame.index.tz_localize('UTC') - - measurement_name = kwargs.get('data_frame_measurement_name') - data_frame_tag_columns = kwargs.get('data_frame_tag_columns') - data_frame_tag_columns = set(data_frame_tag_columns or []) - - tags = [] - fields = [] - - if self._point_settings.defaultTags: - for key, value in self._point_settings.defaultTags.items(): - data_frame[key] = value - data_frame_tag_columns.add(key) - - for index, (key, value) in enumerate(data_frame.dtypes.items()): - key = str(key).translate(_ESCAPE_KEY) - - if key in data_frame_tag_columns: - tags.append(f"{key}={{p[{index + 1}].translate(_ESCAPE_KEY)}}") - elif issubclass(value.type, np.integer): - fields.append(f"{key}={{p[{index + 1}]}}i") - elif issubclass(value.type, (np.float, np.bool_)): - fields.append(f"{key}={{p[{index + 1}]}}") - else: - fields.append(f"{key}=\"{{p[{index + 1}].translate(_ESCAPE_KEY)}}\"") - - fmt = (f'{measurement_name}', f'{"," if tags else ""}', ','.join(tags), - ' ', ','.join(fields), ' {p[0].value}') - f = eval("lambda p: f'{}'".format(''.join(fmt))) - - return list(map(f, self._itertuples(data_frame))) - def _http(self, batch_item: _BatchItem): logger.debug("Write time series data into InfluxDB: %s", batch_item) diff --git a/tests/test_WriteApiDataFrame.py b/tests/test_WriteApiDataFrame.py index 01f52c95..69dc29e9 100644 --- a/tests/test_WriteApiDataFrame.py +++ b/tests/test_WriteApiDataFrame.py @@ -5,7 +5,8 @@ from datetime import timedelta from influxdb_client import InfluxDBClient, WriteOptions, WriteApi -from influxdb_client.client.write_api import SYNCHRONOUS +from influxdb_client.client.write.dataframe_serializer import data_frame_to_list_of_points +from influxdb_client.client.write_api import SYNCHRONOUS, PointSettings from tests.base_test import BaseTest @@ -86,3 +87,130 @@ def test_write_num_py(self): self.assertEqual(result[0].records[1].get_value(), 200.0) pass + + def test_write_nan(self): + from influxdb_client.extras import pd, np + + now = pd.Timestamp('2020-04-05 00:00+00:00') + + data_frame = pd.DataFrame(data=[[3.1955, np.nan, 20.514305, np.nan], + [5.7310, np.nan, 23.328710, np.nan], + [np.nan, 3.138664, np.nan, 20.755026], + [5.7310, 5.139563, 23.328710, 19.791240]], + index=[now, now + timedelta(minutes=30), now + timedelta(minutes=60), + now + timedelta(minutes=90)], + columns=["actual_kw_price", "forecast_kw_price", "actual_general_use", + "forecast_general_use"]) + + points = data_frame_to_list_of_points(data_frame=data_frame, point_settings=PointSettings(), + data_frame_measurement_name='measurement') + + self.assertEqual(4, len(points)) + self.assertEqual("measurement actual_kw_price=3.1955,actual_general_use=20.514305 1586044800000000000", + points[0]) + self.assertEqual("measurement actual_kw_price=5.731,actual_general_use=23.32871 1586046600000000000", + points[1]) + self.assertEqual("measurement forecast_kw_price=3.138664,forecast_general_use=20.755026 1586048400000000000", + points[2]) + self.assertEqual("measurement actual_kw_price=5.731,forecast_kw_price=5.139563,actual_general_use=23.32871," + "forecast_general_use=19.79124 1586050200000000000", + points[3]) + + def test_write_tag_nan(self): + from influxdb_client.extras import pd, np + + now = pd.Timestamp('2020-04-05 00:00+00:00') + + data_frame = pd.DataFrame(data=[["", 3.1955, 20.514305], + ['', 5.7310, 23.328710], + [np.nan, 5.7310, 23.328710], + ["tag", 3.138664, 20.755026]], + index=[now, now + timedelta(minutes=30), + now + timedelta(minutes=60), now + timedelta(minutes=90)], + columns=["tag", "actual_kw_price", "forecast_kw_price"]) + + write_api = self.client.write_api(write_options=SYNCHRONOUS, point_settings=PointSettings()) + + points = data_frame_to_list_of_points(data_frame=data_frame, + point_settings=PointSettings(), + data_frame_measurement_name='measurement', + data_frame_tag_columns={"tag"}) + + self.assertEqual(4, len(points)) + self.assertEqual("measurement actual_kw_price=3.1955,forecast_kw_price=20.514305 1586044800000000000", + points[0]) + self.assertEqual("measurement actual_kw_price=5.731,forecast_kw_price=23.32871 1586046600000000000", + points[1]) + self.assertEqual("measurement actual_kw_price=5.731,forecast_kw_price=23.32871 1586048400000000000", + points[2]) + self.assertEqual("measurement,tag=tag actual_kw_price=3.138664,forecast_kw_price=20.755026 1586050200000000000", + points[3]) + + write_api.__del__() + + def test_escaping_measurement(self): + from influxdb_client.extras import pd, np + + now = pd.Timestamp('2020-04-05 00:00+00:00') + + data_frame = pd.DataFrame(data=[["coyote_creek", np.int64(100.5)], ["coyote_creek", np.int64(200)]], + index=[now + timedelta(hours=1), now + timedelta(hours=2)], + columns=["location", "water_level"]) + + points = data_frame_to_list_of_points(data_frame=data_frame, + point_settings=PointSettings(), + data_frame_measurement_name='measu rement', + data_frame_tag_columns={"tag"}) + + self.assertEqual(2, len(points)) + self.assertEqual("measu\\ rement location=\"coyote_creek\",water_level=100i 1586048400000000000", + points[0]) + self.assertEqual("measu\\ rement location=\"coyote_creek\",water_level=200i 1586052000000000000", + points[1]) + + points = data_frame_to_list_of_points(data_frame=data_frame, + point_settings=PointSettings(), + data_frame_measurement_name='measu\nrement2', + data_frame_tag_columns={"tag"}) + + self.assertEqual(2, len(points)) + self.assertEqual("measu\\nrement2 location=\"coyote_creek\",water_level=100i 1586048400000000000", + points[0]) + self.assertEqual("measu\\nrement2 location=\"coyote_creek\",water_level=200i 1586052000000000000", + points[1]) + + def test_tag_escaping_key_and_value(self): + from influxdb_client.extras import pd, np + + now = pd.Timestamp('2020-04-05 00:00+00:00') + + data_frame = pd.DataFrame(data=[["carriage\nreturn", "new\nline", "t\tab", np.int64(2)], ], + index=[now + timedelta(hours=1), ], + columns=["carriage\rreturn", "new\nline", "t\tab", "l\ne\rv\tel"]) + + points = data_frame_to_list_of_points(data_frame=data_frame, + point_settings=PointSettings(), + data_frame_measurement_name='h\n2\ro\t_data', + data_frame_tag_columns={"new\nline", "carriage\rreturn", "t\tab"}) + + self.assertEqual(1, len(points)) + self.assertEqual( + "h\\n2\\ro\\t_data,carriage\\rreturn=carriage\\nreturn,new\\nline=new\\nline,t\\tab=t\\tab l\\ne\\rv\\tel=2i 1586048400000000000", + points[0]) + + def test_tags_order(self): + from influxdb_client.extras import pd, np + + now = pd.Timestamp('2020-04-05 00:00+00:00') + + data_frame = pd.DataFrame(data=[["c", "a", "b", np.int64(2)], ], + index=[now + timedelta(hours=1), ], + columns=["c", "a", "b", "level"]) + + points = data_frame_to_list_of_points(data_frame=data_frame, + point_settings=PointSettings(), + data_frame_measurement_name='h2o', + data_frame_tag_columns={"c", "a", "b"}) + + self.assertEqual(1, len(points)) + self.assertEqual("h2o,a=a,b=b,c=c level=2i 1586048400000000000", points[0])