Skip to content
This repository has been archived by the owner on Oct 29, 2024. It is now read-only.

Commit

Permalink
feat(line_protocol): split out make_line function from core make_lines (
Browse files Browse the repository at this point in the history
#810)

* feat(line_protocol): split out make_line function from core make_lines

* chore(line_protocol): fix malformed testcase
  • Loading branch information
sebito91 authored Apr 8, 2020
1 parent ad5e5b6 commit 4799c58
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 51 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
### Changed
- Clean up stale CI config (#755)
- Add legacy client test (#752 & #318 thx @oldmantaiter & @sebito91)
- Update make_lines section in line_protocol.py to split out core function (#375 thx @aisbaa)

### Removed

Expand Down
123 changes: 73 additions & 50 deletions influxdb/line_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from pytz import UTC
from dateutil.parser import parse
from six import iteritems, binary_type, text_type, integer_types, PY2
from six import binary_type, text_type, integer_types, PY2

EPOCH = UTC.localize(datetime.utcfromtimestamp(0))

Expand All @@ -30,15 +30,20 @@ def _convert_timestamp(timestamp, precision=None):
ns = (timestamp - EPOCH).total_seconds() * 1e9
if precision is None or precision == 'n':
return ns
elif precision == 'u':

if precision == 'u':
return ns / 1e3
elif precision == 'ms':

if precision == 'ms':
return ns / 1e6
elif precision == 's':

if precision == 's':
return ns / 1e9
elif precision == 'm':

if precision == 'm':
return ns / 1e9 / 60
elif precision == 'h':

if precision == 'h':
return ns / 1e9 / 3600

raise ValueError(timestamp)
Expand Down Expand Up @@ -95,9 +100,11 @@ def _escape_value(value):

if isinstance(value, text_type) and value != '':
return quote_ident(value)
elif isinstance(value, integer_types) and not isinstance(value, bool):

if isinstance(value, integer_types) and not isinstance(value, bool):
return str(value) + 'i'
elif _is_float(value):

if _is_float(value):
return repr(value)

return str(value)
Expand All @@ -107,15 +114,60 @@ def _get_unicode(data, force=False):
"""Try to return a text aka unicode object from the given data."""
if isinstance(data, binary_type):
return data.decode('utf-8')
elif data is None:

if data is None:
return ''
elif force:

if force:
if PY2:
return unicode(data)
else:
return str(data)
else:
return data
return str(data)

return data


def make_line(measurement, tags=None, fields=None, time=None, precision=None):
"""Extract the actual point from a given measurement line."""
tags = tags or {}
fields = fields or {}

line = _escape_tag(_get_unicode(measurement))

# tags should be sorted client-side to take load off server
tag_list = []
for tag_key in sorted(tags.keys()):
key = _escape_tag(tag_key)
value = _escape_tag(tags[tag_key])

if key != '' and value != '':
tag_list.append(
"{key}={value}".format(key=key, value=value)
)

if tag_list:
line += ',' + ','.join(tag_list)

field_list = []
for field_key in sorted(fields.keys()):
key = _escape_tag(field_key)
value = _escape_value(fields[field_key])

if key != '' and value != '':
field_list.append("{key}={value}".format(
key=key,
value=value
))

if field_list:
line += ' ' + ','.join(field_list)

if time is not None:
timestamp = _get_unicode(str(int(
_convert_timestamp(time, precision)
)))
line += ' ' + timestamp

return line


def make_lines(data, precision=None):
Expand All @@ -127,48 +179,19 @@ def make_lines(data, precision=None):
lines = []
static_tags = data.get('tags')
for point in data['points']:
elements = []

# add measurement name
measurement = _escape_tag(_get_unicode(
point.get('measurement', data.get('measurement'))))
key_values = [measurement]

# add tags
if static_tags:
tags = dict(static_tags) # make a copy, since we'll modify
tags.update(point.get('tags') or {})
else:
tags = point.get('tags') or {}

# tags should be sorted client-side to take load off server
for tag_key, tag_value in sorted(iteritems(tags)):
key = _escape_tag(tag_key)
value = _escape_tag_value(tag_value)

if key != '' and value != '':
key_values.append(key + "=" + value)

elements.append(','.join(key_values))

# add fields
field_values = []
for field_key, field_value in sorted(iteritems(point['fields'])):
key = _escape_tag(field_key)
value = _escape_value(field_value)

if key != '' and value != '':
field_values.append(key + "=" + value)

elements.append(','.join(field_values))

# add timestamp
if 'time' in point:
timestamp = _get_unicode(str(int(
_convert_timestamp(point['time'], precision))))
elements.append(timestamp)

line = ' '.join(elements)
line = make_line(
point.get('measurement', data.get('measurement')),
tags=tags,
fields=point.get('fields'),
precision=precision,
time=point.get('time')
)
lines.append(line)

return '\n'.join(lines) + '\n'
2 changes: 1 addition & 1 deletion influxdb/tests/test_line_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def test_make_lines(self):

self.assertEqual(
line_protocol.make_lines(data),
'test,backslash_tag=C:\\\\ ,integer_tag=2,string_tag=hello '
'test,backslash_tag=C:\\\\,integer_tag=2,string_tag=hello '
'bool_val=True,float_val=1.1,int_val=1i,string_val="hello!"\n'
)

Expand Down

0 comments on commit 4799c58

Please sign in to comment.