This repository has been archived by the owner on Oct 29, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 521
/
line_protocol.py
138 lines (118 loc) · 3.83 KB
/
line_protocol.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from calendar import timegm
from copy import copy
from datetime import datetime
from numbers import Integral
from dateutil.parser import parse
from six import binary_type, text_type, integer_types
def _convert_timestamp(timestamp, precision=None):
if isinstance(timestamp, Integral):
return timestamp # assume precision is correct if timestamp is int
if isinstance(_get_unicode(timestamp), text_type):
timestamp = parse(timestamp)
if isinstance(timestamp, datetime):
ns = (
timegm(timestamp.utctimetuple()) * 1e9 +
timestamp.microsecond * 1e3
)
if precision is None or precision == 'n':
return ns
elif precision == 'u':
return ns / 1e3
elif precision == 'ms':
return ns / 1e6
elif precision == 's':
return ns / 1e9
elif precision == 'm':
return ns / 1e9 / 60
elif precision == 'h':
return ns / 1e9 / 3600
raise ValueError(timestamp)
def _escape_tag(tag):
tag = _get_unicode(tag, force=True)
return tag.replace(
"\\", "\\\\"
).replace(
" ", "\\ "
).replace(
",", "\\,"
).replace(
"=", "\\="
)
def _escape_value(value):
value = _get_unicode(value)
if isinstance(value, text_type) and value != '':
return "\"{0}\"".format(
value.replace(
"\"", "\\\""
).replace(
"\n", "\\n"
)
)
elif isinstance(value, integer_types) and not isinstance(value, bool):
return str(value) + 'i'
else:
return str(value)
def _get_unicode(data, force=False):
"""
Try to return a text aka unicode object from the given data.
"""
if isinstance(data, binary_type):
return data.decode('utf-8')
elif data is None:
return ''
elif force:
return str(data)
else:
return data
def make_lines(data, precision=None):
"""
Extracts the points from the given dict and returns a Unicode string
matching the line protocol introduced in InfluxDB 0.9.0.
"""
lines = []
static_tags = data.get('tags', None)
for point in data['points']:
elements = []
# add measurement name
measurement = _escape_tag(_get_unicode(
point.get('measurement', data.get('measurement'))
))
key_values = [measurement]
# add tags
if static_tags is None:
tags = point.get('tags', {})
else:
tags = copy(static_tags)
tags.update(point.get('tags', {}))
# tags should be sorted client-side to take load off server
for tag_key in sorted(tags.keys()):
key = _escape_tag(tag_key)
value = _escape_tag(tags[tag_key])
if key != '' and value != '':
key_values.append("{key}={value}".format(key=key, value=value))
key_values = ','.join(key_values)
elements.append(key_values)
# add fields
field_values = []
for field_key in sorted(point['fields'].keys()):
key = _escape_tag(field_key)
value = _escape_value(point['fields'][field_key])
if key != '' and value != '':
field_values.append("{key}={value}".format(
key=key,
value=value
))
field_values = ','.join(field_values)
elements.append(field_values)
# add timestamp
if 'time' in point:
timestamp = _get_unicode(str(int(
_convert_timestamp(point['time'], precision)
)))
elements.append(timestamp)
line = ' '.join(elements)
lines.append(line)
lines = '\n'.join(lines)
return lines + '\n'