Skip to content

Commit

Permalink
Optional reading to NumPy arrays
Browse files Browse the repository at this point in the history
  • Loading branch information
xzkostyan committed Oct 20, 2020
1 parent 09c355f commit a217f96
Show file tree
Hide file tree
Showing 35 changed files with 1,738 additions and 28 deletions.
14 changes: 14 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ env:
- VERSION=20.5.5.74
- VERSION=20.4.9.110
- VERSION=20.3.20.6
- VERSION=20.3.20.6 USE_NUMPY=1
- VERSION=19.16.17.80
- VERSION=19.15.3.6
- VERSION=19.9.2.4 # allow_suspicious_low_cardinality_types
Expand Down Expand Up @@ -65,11 +66,23 @@ install:
- pip install --upgrade pip setuptools
- pip install flake8 flake8-print coveralls cython
script:
- if [ -z ${USE_NUMPY+x} ]; then pip uninstall -y numpy pandas; fi
- flake8 && coverage run --source=clickhouse_driver setup.py test
after_success:
coveralls

jobs:
# Exclude numpy unsupported versions,
exclude:
- python: 3.4
env: VERSION=20.3.20.6 USE_NUMPY=1
- python: 3.9-dev
env: VERSION=20.3.20.6 USE_NUMPY=1
- python: pypy2.7-5.10.0
env: VERSION=20.3.20.6 USE_NUMPY=1
- python: pypy3.5
env: VERSION=20.3.20.6 USE_NUMPY=1

include:
- stage: valgrind
name: Valgrind check
Expand Down Expand Up @@ -100,6 +113,7 @@ jobs:

env:
- VERSION=20.3.7.46
- USE_NUMPY=1
- PYTHONMALLOC=malloc

- stage: wheels
Expand Down
2 changes: 1 addition & 1 deletion clickhouse_driver/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from .dbapi import connect


VERSION = (0, 1, 5)
VERSION = (0, 1, 6)
__version__ = '.'.join(str(x) for x in VERSION)

__all__ = ['Client', 'connect']
55 changes: 51 additions & 4 deletions clickhouse_driver/client.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import re
import ssl
from time import time
import types
Expand Down Expand Up @@ -33,12 +34,15 @@ class Client(object):
* strings_encoding -- specifies string encoding. UTF-8 by default.
* use_numpy -- Use numpy for columns reading.
"""

available_client_settings = (
'insert_block_size', # TODO: rename to max_insert_block_size
'strings_as_bytes',
'strings_encoding'
'strings_encoding',
'use_numpy'
)

def __init__(self, *args, **kwargs):
Expand All @@ -53,9 +57,28 @@ def __init__(self, *args, **kwargs):
),
'strings_encoding': self.settings.pop(
'strings_encoding', defines.STRINGS_ENCODING
),
'use_numpy': self.settings.pop(
'use_numpy', False
)
}

if self.client_settings['use_numpy']:
try:
from .numpy.result import (
NumpyIterQueryResult, NumpyProgressQueryResult,
NumpyQueryResult
)
self.query_result_cls = NumpyQueryResult
self.iter_query_result_cls = NumpyIterQueryResult
self.progress_query_result_cls = NumpyProgressQueryResult
except ImportError:
raise RuntimeError('Extras for NumPy must be installed')
else:
self.query_result_cls = QueryResult
self.iter_query_result_cls = IterQueryResult
self.progress_query_result_cls = ProgressQueryResult

self.connection = Connection(*args, **kwargs)
self.connection.context.settings = self.settings
self.connection.context.client_settings = self.client_settings
Expand All @@ -78,20 +101,24 @@ def receive_result(self, with_column_types=False, progress=False,
gen = self.packet_generator()

if progress:
return ProgressQueryResult(
return self.progress_query_result_cls(
gen, with_column_types=with_column_types, columnar=columnar
)

else:
result = QueryResult(
result = self.query_result_cls(
gen, with_column_types=with_column_types, columnar=columnar
)
return result.get_result()

def iter_receive_result(self, with_column_types=False):
gen = self.packet_generator()

for rows in IterQueryResult(gen, with_column_types=with_column_types):
result = self.iter_query_result_cls(
gen, with_column_types=with_column_types
)

for rows in result:
for row in rows:
yield row

Expand Down Expand Up @@ -318,6 +345,23 @@ def execute_iter(
self.disconnect()
raise

def query_dataframe(self, query, params=None, external_tables=None,
query_id=None, settings=None):
try:
import pandas as pd
except ImportError:
raise RuntimeError('Extras for NumPy must be installed')

data, columns = self.execute(
query, columnar=True, with_column_types=True, params=params,
external_tables=external_tables, query_id=query_id,
settings=settings
)

return pd.DataFrame(
{re.sub(r'\W', '_', col[0]): d for d, col in zip(data, columns)}
)

def process_ordinary_query_with_progress(
self, query, params=None, with_column_types=False,
external_tables=None, query_id=None,
Expand Down Expand Up @@ -487,6 +531,9 @@ def from_url(cls, url):
elif name == 'secure':
kwargs[name] = asbool(value)

elif name == 'use_numpy':
kwargs[name] = asbool(value)

elif name == 'client_name':
kwargs[name] = value

Expand Down
6 changes: 3 additions & 3 deletions clickhouse_driver/columns/floatcolumn.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ class FloatColumn(FormatColumn):
py_types = (float, ) + compat.integer_types


class Float32(FloatColumn):
class Float32Column(FloatColumn):
ch_type = 'Float32'
format = 'f'

def __init__(self, types_check=False, **kwargs):
super(Float32, self).__init__(types_check=types_check, **kwargs)
super(Float32Column, self).__init__(types_check=types_check, **kwargs)

if types_check:
# Chop only bytes that fit current type.
Expand All @@ -30,6 +30,6 @@ def before_write_items(items, nulls_map=None):
self.before_write_items = before_write_items


class Float64(FloatColumn):
class Float64Column(FloatColumn):
ch_type = 'Float64'
format = 'd'
Empty file.
14 changes: 14 additions & 0 deletions clickhouse_driver/columns/numpy/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import numpy as np

from ..base import Column


class NumpyColumn(Column):
dtype = None

def read_items(self, n_items, buf):
data = buf.read(n_items * self.dtype.itemsize)
return np.frombuffer(data, self.dtype, n_items)

def write_items(self, items, buf):
raise RuntimeError('Write is not implemented')
12 changes: 12 additions & 0 deletions clickhouse_driver/columns/numpy/datecolumn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import numpy as np

from .base import NumpyColumn


class NumpyDateColumn(NumpyColumn):
dtype = np.dtype(np.uint16)
ch_type = 'Date'

def read_items(self, n_items, buf):
data = super(NumpyDateColumn, self).read_items(n_items, buf)
return data.astype('datetime64[D]')
102 changes: 102 additions & 0 deletions clickhouse_driver/columns/numpy/datetimecolumn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
try:
import numpy as np
except ImportError:
numpy = None

try:
import pandas as pd
except ImportError:
pandas = None

from pytz import timezone as get_timezone
from tzlocal import get_localzone

from .base import NumpyColumn


class NumpyDateTimeColumn(NumpyColumn):
dtype = np.dtype(np.uint32)

def __init__(self, timezone=None, offset_naive=True, local_timezone=None,
**kwargs):
self.timezone = timezone
self.offset_naive = offset_naive
self.local_timezone = local_timezone
super(NumpyDateTimeColumn, self).__init__(**kwargs)

def apply_timezones(self, dt):
ts = pd.to_datetime(dt, utc=True)
timezone = self.timezone if self.timezone else self.local_timezone

ts = ts.tz_convert(timezone)
if self.offset_naive:
ts = ts.tz_localize(None)

return ts.to_numpy()

def read_items(self, n_items, buf):
data = super(NumpyDateTimeColumn, self).read_items(n_items, buf)
dt = data.astype('datetime64[s]')
return self.apply_timezones(dt)


class NumpyDateTime64Column(NumpyDateTimeColumn):
dtype = np.dtype(np.uint64)

max_scale = 6

def __init__(self, scale=0, **kwargs):
self.scale = scale
super(NumpyDateTime64Column, self).__init__(**kwargs)

def read_items(self, n_items, buf):
scale = 10 ** self.scale
frac_scale = 10 ** (self.max_scale - self.scale)

data = super(NumpyDateTimeColumn, self).read_items(n_items, buf)
seconds = (data // scale).astype('datetime64[s]')
microseconds = ((data % scale) * frac_scale).astype('timedelta64[us]')

dt = seconds + microseconds
return self.apply_timezones(dt)


def create_numpy_datetime_column(spec, column_options):
if spec.startswith('DateTime64'):
cls = NumpyDateTime64Column
spec = spec[11:-1]
params = spec.split(',', 1)
column_options['scale'] = int(params[0])
if len(params) > 1:
spec = params[1].strip() + ')'
else:
cls = NumpyDateTimeColumn
spec = spec[9:]

context = column_options['context']

tz_name = timezone = None
offset_naive = True
local_timezone = None

# As Numpy do not use local timezone for converting timestamp to
# datetime we need always detect local timezone for manual converting.
try:
local_timezone = get_localzone().zone
except Exception:
pass

# Use column's timezone if it's specified.
if spec and spec[-1] == ')':
tz_name = spec[1:-2]
offset_naive = False
else:
if not context.settings.get('use_client_time_zone', False):
if local_timezone != context.server_info.timezone:
tz_name = context.server_info.timezone

if tz_name:
timezone = get_timezone(tz_name)

return cls(timezone=timezone, offset_naive=offset_naive,
local_timezone=local_timezone, **column_options)
13 changes: 13 additions & 0 deletions clickhouse_driver/columns/numpy/floatcolumn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import numpy as np

from .base import NumpyColumn


class NumpyFloat32Column(NumpyColumn):
dtype = np.dtype(np.float32)
ch_type = 'Float32'


class NumpyFloat64Column(NumpyColumn):
dtype = np.dtype(np.float64)
ch_type = 'Float64'
43 changes: 43 additions & 0 deletions clickhouse_driver/columns/numpy/intcolumn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import numpy as np

from .base import NumpyColumn


class NumpyInt8Column(NumpyColumn):
dtype = np.dtype(np.int8)
ch_type = 'Int8'


class NumpyUInt8Column(NumpyColumn):
dtype = np.dtype(np.uint8)
ch_type = 'UInt8'


class NumpyInt16Column(NumpyColumn):
dtype = np.dtype(np.int16)
ch_type = 'Int16'


class NumpyUInt16Column(NumpyColumn):
dtype = np.dtype(np.uint16)
ch_type = 'UInt16'


class NumpyInt32Column(NumpyColumn):
dtype = np.dtype(np.int32)
ch_type = 'Int32'


class NumpyUInt32Column(NumpyColumn):
dtype = np.dtype(np.uint32)
ch_type = 'UInt32'


class NumpyInt64Column(NumpyColumn):
dtype = np.dtype(np.int64)
ch_type = 'Int64'


class NumpyUInt64Column(NumpyColumn):
dtype = np.dtype(np.uint64)
ch_type = 'UInt64'
Loading

0 comments on commit a217f96

Please sign in to comment.