Skip to content
This repository has been archived by the owner on Oct 29, 2024. It is now read-only.

Commit

Permalink
Fix chunked query to return chunk resultsets (#753)
Browse files Browse the repository at this point in the history
When querying large data sets, it's vital to get a chunked responses to
manage memory usage. Wrapping the query response in a generator and
streaming the request provides the desired result.
It also fixes `InfluxDBClient.query()` behavior for chunked queries that
is currently not working according to
[specs](https://github.com/influxdata/influxdb-python/blob/master/influxdb/client.py#L429)

Closes #585.
Closes #531.
Closes #538.
  • Loading branch information
hrbonz authored Apr 10, 2020
1 parent d6192a7 commit c903d73
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 29 deletions.
10 changes: 7 additions & 3 deletions influxdb/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ def switch_user(self, username, password):
self._username = username
self._password = password

def request(self, url, method='GET', params=None, data=None,
def request(self, url, method='GET', params=None, data=None, stream=False,
expected_response_code=200, headers=None):
"""Make a HTTP request to the InfluxDB API.
Expand All @@ -261,6 +261,8 @@ def request(self, url, method='GET', params=None, data=None,
:type params: dict
:param data: the data of the request, defaults to None
:type data: str
:param stream: True if a query uses chunked responses
:type stream: bool
:param expected_response_code: the expected response code of
the request, defaults to 200
:type expected_response_code: int
Expand Down Expand Up @@ -312,6 +314,7 @@ def request(self, url, method='GET', params=None, data=None,
auth=(self._username, self._password),
params=params,
data=data,
stream=stream,
headers=headers,
proxies=self._proxies,
verify=self._verify_ssl,
Expand Down Expand Up @@ -398,17 +401,17 @@ def write(self, data, params=None, expected_response_code=204,

@staticmethod
def _read_chunked_response(response, raise_errors=True):
result_set = {}
for line in response.iter_lines():
if isinstance(line, bytes):
line = line.decode('utf-8')
data = json.loads(line)
result_set = {}
for result in data.get('results', []):
for _key in result:
if isinstance(result[_key], list):
result_set.setdefault(
_key, []).extend(result[_key])
return ResultSet(result_set, raise_errors=raise_errors)
yield ResultSet(result_set, raise_errors=raise_errors)

def query(self,
query,
Expand Down Expand Up @@ -499,6 +502,7 @@ def query(self,
method=method,
params=params,
data=None,
stream=chunked,
expected_response_code=expected_response_code
)

Expand Down
44 changes: 18 additions & 26 deletions influxdb/tests/client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1400,40 +1400,32 @@ def test_invalid_port_fails(self):
def test_chunked_response(self):
"""Test chunked reponse for TestInfluxDBClient object."""
example_response = \
u'{"results":[{"statement_id":0,"series":' \
'[{"name":"cpu","columns":["fieldKey","fieldType"],"values":' \
'[["value","integer"]]}],"partial":true}]}\n{"results":' \
'[{"statement_id":0,"series":[{"name":"iops","columns":' \
'["fieldKey","fieldType"],"values":[["value","integer"]]}],' \
'"partial":true}]}\n{"results":[{"statement_id":0,"series":' \
'[{"name":"load","columns":["fieldKey","fieldType"],"values":' \
'[["value","integer"]]}],"partial":true}]}\n{"results":' \
'[{"statement_id":0,"series":[{"name":"memory","columns":' \
'["fieldKey","fieldType"],"values":[["value","integer"]]}]}]}\n'
u'{"results":[{"statement_id":0,"series":[{"columns":["key"],' \
'"values":[["cpu"],["memory"],["iops"],["network"]],"partial":' \
'true}],"partial":true}]}\n{"results":[{"statement_id":0,' \
'"series":[{"columns":["key"],"values":[["qps"],["uptime"],' \
'["df"],["mount"]]}]}]}\n'

with requests_mock.Mocker() as m:
m.register_uri(
requests_mock.GET,
"http://localhost:8086/query",
text=example_response
)
response = self.cli.query('show series limit 4 offset 0',
response = self.cli.query('show series',
chunked=True, chunk_size=4)
self.assertTrue(len(response) == 4)
self.assertEqual(response.__repr__(), ResultSet(
{'series': [{'values': [['value', 'integer']],
'name': 'cpu',
'columns': ['fieldKey', 'fieldType']},
{'values': [['value', 'integer']],
'name': 'iops',
'columns': ['fieldKey', 'fieldType']},
{'values': [['value', 'integer']],
'name': 'load',
'columns': ['fieldKey', 'fieldType']},
{'values': [['value', 'integer']],
'name': 'memory',
'columns': ['fieldKey', 'fieldType']}]}
).__repr__())
res = list(response)
self.assertTrue(len(res) == 2)
self.assertEqual(res[0].__repr__(), ResultSet(
{'series': [{
'columns': ['key'],
'values': [['cpu'], ['memory'], ['iops'], ['network']]
}]}).__repr__())
self.assertEqual(res[1].__repr__(), ResultSet(
{'series': [{
'columns': ['key'],
'values': [['qps'], ['uptime'], ['df'], ['mount']]
}]}).__repr__())


class FakeClient(InfluxDBClient):
Expand Down

0 comments on commit c903d73

Please sign in to comment.