Skip to content

Commit

Permalink
fix: Fixed serialization of DataFrame with empty (NaN) values #118
Browse files Browse the repository at this point in the history
  • Loading branch information
rolincova committed Jul 13, 2020
1 parent fe71d6f commit 2ebf5a3
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 3 deletions.
9 changes: 6 additions & 3 deletions influxdb_client/client/write/dataframe_serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,18 @@ def data_frame_to_list_of_points(data_frame, point_settings, **kwargs):
key_format = f'{{keys[{index}]}}'

if key in data_frame_tag_columns:
tags.append(f"{key_format}={{str(p[{index + 1}]).translate(_ESCAPE_KEY)}}")
tags.append({'key': key, 'value': f"{key_format}={{str(p[{index + 1}]).translate(_ESCAPE_KEY)}}"})
elif issubclass(value.type, np.integer):
fields.append(f"{key_format}={{p[{index + 1}]}}i")
elif issubclass(value.type, (np.float, np.bool_)):
fields.append(f"{key_format}={{p[{index + 1}]}}")
else:
fields.append(f"{key_format}=\"{{str(p[{index + 1}]).translate(_ESCAPE_KEY)}}\"")

fmt = (f'{{measurement_name}}', f'{"," if tags else ""}', ','.join(tags),
tags.sort(key=lambda x: x['key'])
tags = ','.join(map(lambda y: y['value'], tags))

fmt = (f'{{measurement_name}}', f'{"," if tags else ""}', tags,
' ', ','.join(fields), ' {p[0].value}')
f = eval("lambda p: f'{}'".format(''.join(fmt)),
{'measurement_name': measurement_name, '_ESCAPE_KEY': _ESCAPE_KEY, 'keys': keys})
Expand All @@ -97,4 +100,4 @@ def data_frame_to_list_of_points(data_frame, point_settings, **kwargs):
for p in _itertuples(data_frame))
return list(lp)
else:
return list(map(f, _itertuples(data_frame)))
return list(map(f, _itertuples(data_frame)))
17 changes: 17 additions & 0 deletions tests/test_WriteApiDataFrame.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,20 @@ def test_tag_escaping_key_and_value(self):
self.assertEqual(
"h\\n2\\ro\\t_data,carriage\\rreturn=carriage\\nreturn,new\\nline=new\\nline,t\\tab=t\\tab l\\ne\\rv\\tel=2i 1586048400000000000",
points[0])

def test_tags_order(self):
from influxdb_client.extras import pd, np

now = pd.Timestamp('2020-04-05 00:00+00:00')

data_frame = pd.DataFrame(data=[["c", "a", "b", np.int64(2)], ],
index=[now + timedelta(hours=1), ],
columns=["c", "a", "b", "level"])

points = data_frame_to_list_of_points(data_frame=data_frame,
point_settings=PointSettings(),
data_frame_measurement_name='h2o',
data_frame_tag_columns={"c", "a", "b"})

self.assertEqual(1, len(points))
self.assertEqual("h2o,a=a,b=b,c=c level=2i 1586048400000000000", points[0])

0 comments on commit 2ebf5a3

Please sign in to comment.