From c903d73efcf49b4e340490072d777d8f34ac8e1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stefan=20=22hr=22=20Berder=20=28=E7=99=BD=E5=B3=B0=29?= Date: Sat, 11 Apr 2020 01:10:40 +0800 Subject: [PATCH] Fix chunked query to return chunk resultsets (#753) When querying large data sets, it's vital to get a chunked responses to manage memory usage. Wrapping the query response in a generator and streaming the request provides the desired result. It also fixes `InfluxDBClient.query()` behavior for chunked queries that is currently not working according to [specs](https://github.com/influxdata/influxdb-python/blob/master/influxdb/client.py#L429) Closes #585. Closes #531. Closes #538. --- influxdb/client.py | 10 +++++--- influxdb/tests/client_test.py | 44 ++++++++++++++--------------------- 2 files changed, 25 insertions(+), 29 deletions(-) diff --git a/influxdb/client.py b/influxdb/client.py index 46424bc2..b28ed1b5 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -249,7 +249,7 @@ def switch_user(self, username, password): self._username = username self._password = password - def request(self, url, method='GET', params=None, data=None, + def request(self, url, method='GET', params=None, data=None, stream=False, expected_response_code=200, headers=None): """Make a HTTP request to the InfluxDB API. @@ -261,6 +261,8 @@ def request(self, url, method='GET', params=None, data=None, :type params: dict :param data: the data of the request, defaults to None :type data: str + :param stream: True if a query uses chunked responses + :type stream: bool :param expected_response_code: the expected response code of the request, defaults to 200 :type expected_response_code: int @@ -312,6 +314,7 @@ def request(self, url, method='GET', params=None, data=None, auth=(self._username, self._password), params=params, data=data, + stream=stream, headers=headers, proxies=self._proxies, verify=self._verify_ssl, @@ -398,17 +401,17 @@ def write(self, data, params=None, expected_response_code=204, @staticmethod def _read_chunked_response(response, raise_errors=True): - result_set = {} for line in response.iter_lines(): if isinstance(line, bytes): line = line.decode('utf-8') data = json.loads(line) + result_set = {} for result in data.get('results', []): for _key in result: if isinstance(result[_key], list): result_set.setdefault( _key, []).extend(result[_key]) - return ResultSet(result_set, raise_errors=raise_errors) + yield ResultSet(result_set, raise_errors=raise_errors) def query(self, query, @@ -499,6 +502,7 @@ def query(self, method=method, params=params, data=None, + stream=chunked, expected_response_code=expected_response_code ) diff --git a/influxdb/tests/client_test.py b/influxdb/tests/client_test.py index a8f8e864..fd3c06bb 100644 --- a/influxdb/tests/client_test.py +++ b/influxdb/tests/client_test.py @@ -1400,16 +1400,11 @@ def test_invalid_port_fails(self): def test_chunked_response(self): """Test chunked reponse for TestInfluxDBClient object.""" example_response = \ - u'{"results":[{"statement_id":0,"series":' \ - '[{"name":"cpu","columns":["fieldKey","fieldType"],"values":' \ - '[["value","integer"]]}],"partial":true}]}\n{"results":' \ - '[{"statement_id":0,"series":[{"name":"iops","columns":' \ - '["fieldKey","fieldType"],"values":[["value","integer"]]}],' \ - '"partial":true}]}\n{"results":[{"statement_id":0,"series":' \ - '[{"name":"load","columns":["fieldKey","fieldType"],"values":' \ - '[["value","integer"]]}],"partial":true}]}\n{"results":' \ - '[{"statement_id":0,"series":[{"name":"memory","columns":' \ - '["fieldKey","fieldType"],"values":[["value","integer"]]}]}]}\n' + u'{"results":[{"statement_id":0,"series":[{"columns":["key"],' \ + '"values":[["cpu"],["memory"],["iops"],["network"]],"partial":' \ + 'true}],"partial":true}]}\n{"results":[{"statement_id":0,' \ + '"series":[{"columns":["key"],"values":[["qps"],["uptime"],' \ + '["df"],["mount"]]}]}]}\n' with requests_mock.Mocker() as m: m.register_uri( @@ -1417,23 +1412,20 @@ def test_chunked_response(self): "http://localhost:8086/query", text=example_response ) - response = self.cli.query('show series limit 4 offset 0', + response = self.cli.query('show series', chunked=True, chunk_size=4) - self.assertTrue(len(response) == 4) - self.assertEqual(response.__repr__(), ResultSet( - {'series': [{'values': [['value', 'integer']], - 'name': 'cpu', - 'columns': ['fieldKey', 'fieldType']}, - {'values': [['value', 'integer']], - 'name': 'iops', - 'columns': ['fieldKey', 'fieldType']}, - {'values': [['value', 'integer']], - 'name': 'load', - 'columns': ['fieldKey', 'fieldType']}, - {'values': [['value', 'integer']], - 'name': 'memory', - 'columns': ['fieldKey', 'fieldType']}]} - ).__repr__()) + res = list(response) + self.assertTrue(len(res) == 2) + self.assertEqual(res[0].__repr__(), ResultSet( + {'series': [{ + 'columns': ['key'], + 'values': [['cpu'], ['memory'], ['iops'], ['network']] + }]}).__repr__()) + self.assertEqual(res[1].__repr__(), ResultSet( + {'series': [{ + 'columns': ['key'], + 'values': [['qps'], ['uptime'], ['df'], ['mount']] + }]}).__repr__()) class FakeClient(InfluxDBClient):