Skip to content

Commit

Permalink
columnar argumentum for client.execute
Browse files Browse the repository at this point in the history
  • Loading branch information
kszucs committed Oct 11, 2017
1 parent 1f22e5e commit 26b3927
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 29 deletions.
3 changes: 3 additions & 0 deletions src/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ def check_rows(self, data):
if len(row) != expected_row_len:
raise ValueError('Different rows length')

def get_columns(self):
return self.data

def get_rows(self):
if not self.data:
return self.data
Expand Down
55 changes: 26 additions & 29 deletions src/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,19 @@ def __init__(self, *args, **kwargs):
def disconnect(self):
self.connection.disconnect()

def receive_result(self, with_column_types=False, progress=False):
def receive_result(self, with_column_types=False, progress=False,
columnar=False):
result = QueryResult(with_column_types=with_column_types)

if progress:
progress_gen = self.receive_progress_result(result)
progress_gen = self.receive_progress_result(result, columnar)
return Progress(self.connection, result, progress_gen)

else:
self.receive_no_progress_result(result)
self.receive_no_progress_result(result, columnar)
return result.get_result()

def receive_progress_result(self, result):
def receive_progress_result(self, result, columnar=False):
rows_read, approx_rows_to_read = 0, 0
while True:
packet = self.receive_packet()
Expand All @@ -93,9 +94,9 @@ def receive_progress_result(self, result):
yield rows_read, approx_rows_to_read

else:
self.store_query_result(packet, result)
self.store_query_result(packet, result, columnar)

def receive_no_progress_result(self, result):
def receive_no_progress_result(self, result, columnar=False):
while True:
packet = self.receive_packet()
if not packet:
Expand All @@ -104,16 +105,19 @@ def receive_no_progress_result(self, result):
if packet is True:
continue

self.store_query_result(packet, result)
self.store_query_result(packet, result, columnar)

def store_query_result(self, packet, result):
def store_query_result(self, packet, result, columnar=False):
block = getattr(packet, 'block', None)
if block is None:
return

# Header block contains no rows. Pick columns from it.
if block.rows:
result.data.extend(block.get_rows())
if columnar:
result.data.extend(block.get_columns())
else:
result.data.extend(block.get_rows())
elif not result.columns_with_types:
result.columns_with_types = block.columns_with_types

Expand Down Expand Up @@ -143,7 +147,7 @@ def receive_packet(self):

def execute(self, query, params=None, with_column_types=False,
external_tables=None, query_id=None, settings=None,
types_check=False):
types_check=False, columnar=False):
self.connection.force_connect()

try:
Expand All @@ -159,7 +163,7 @@ def execute(self, query, params=None, with_column_types=False,
query, with_column_types=with_column_types,
external_tables=external_tables,
query_id=query_id, settings=settings,
types_check=types_check
types_check=types_check, columnar=columnar
)

except Exception:
Expand All @@ -184,35 +188,28 @@ def execute_with_progress(self, query, with_column_types=False,

def process_ordinary_query_with_progress(
self, query, with_column_types=False, external_tables=None,
query_id=None, settings=None, types_check=False):
self.connection.send_query(
query,
query_id=query_id, settings=settings
)
query_id=None, settings=None, types_check=False, columnar=False):
self.connection.send_query(query, query_id=query_id, settings=settings)
self.connection.send_external_tables(external_tables,
types_check=types_check)
return self.receive_result(
with_column_types=with_column_types, progress=True
)
return self.receive_result(with_column_types=with_column_types,
progress=True, columnar=columnar)

def process_ordinary_query(self, query, with_column_types=False,
external_tables=None, query_id=None,
settings=None, types_check=False):
self.connection.send_query(
query,
query_id=query_id, settings=settings
)
settings=None, types_check=False,
columnar=False):
self.connection.send_query(query, query_id=query_id, settings=settings)
self.connection.send_external_tables(external_tables,
types_check=types_check)
return self.receive_result(with_column_types=with_column_types)
return self.receive_result(with_column_types=with_column_types,
columnar=columnar)

def process_insert_query(self, query_without_data, data,
external_tables=None, query_id=None,
settings=None, types_check=False):
self.connection.send_query(
query_without_data,
query_id=query_id, settings=settings
)
self.connection.send_query(query_without_data, query_id=query_id,
settings=settings)
self.connection.send_external_tables(external_tables,
types_check=types_check)

Expand Down
16 changes: 16 additions & 0 deletions tests/test_blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@


class BlocksTestCase(BaseTestCase):

def test_return_totals_extremes(self):
rv = self.client.execute(
'SELECT a, sum(b + a) FROM ('
Expand All @@ -26,6 +27,21 @@ def test_return_totals_extremes(self):
(1, 10)
])

def test_columnar_result(self):
rv = self.client.execute(
'SELECT a, sum(b + a) FROM ('
'SELECT arrayJoin(range(3)) - 1 AS a,'
'arrayJoin(range(4)) AS b'
') AS t '
'GROUP BY a '
'ORDER BY a',
columnar=True
)
self.assertEqual(rv, [
(-1, 0, 1),
(2, 6, 10)
])

def test_select_with_column_types(self):
rv = self.client.execute(
'SELECT CAST(1 AS Int32) AS x', with_column_types=True
Expand Down

0 comments on commit 26b3927

Please sign in to comment.