diff --git a/CHANGELOG.md b/CHANGELOG.md index 16f035a4..255be7dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ### Features 1. [#281](https://github.com/influxdata/influxdb-client-python/pull/281): `FluxTable`, `FluxColumn` and `FluxRecord` objects have helpful reprs +1. [#293](https://github.com/influxdata/influxdb-client-python/pull/293): `dataframe_serializer` supports batching ### Bug Fixes 1. [#283](https://github.com/influxdata/influxdb-client-python/pull/283): Set proxy server in config file diff --git a/examples/README.md b/examples/README.md index cafef218..2cb4ac72 100644 --- a/examples/README.md +++ b/examples/README.md @@ -4,6 +4,7 @@ - [import_data_set.py](import_data_set.py) - How to import CSV file - [import_data_set_multiprocessing.py](import_data_set_multiprocessing.py) - How to large CSV file by Python Multiprocessing - [ingest_dataframe_default_tags.py](ingest_dataframe_default_tags.py) - How to ingest DataFrame with default tags +- [ingest_large_dataframe.py](ingest_large_dataframe.py) - How to ingest large DataFrame - [iot_sensor.py](iot_sensor.py) - How to write sensor data every minute by [RxPY](https://rxpy.readthedocs.io/en/latest/) - [import_data_set_sync_batching.py](import_data_set_sync_batching.py) - How to use [RxPY](https://rxpy.readthedocs.io/en/latest/) to prepare batches for synchronous write into InfluxDB diff --git a/examples/ingest_large_dataframe.py b/examples/ingest_large_dataframe.py new file mode 100644 index 00000000..b653ac15 --- /dev/null +++ b/examples/ingest_large_dataframe.py @@ -0,0 +1,69 @@ +""" +How to ingest large DataFrame by splitting into chunks. +""" +import logging +import random +from datetime import datetime + +from influxdb_client import InfluxDBClient +from influxdb_client.extras import pd, np + +""" +Enable logging for DataFrame serializer +""" +loggerSerializer = logging.getLogger('influxdb_client.client.write.dataframe_serializer') +loggerSerializer.setLevel(level=logging.DEBUG) +handler = logging.StreamHandler() +handler.setFormatter(logging.Formatter('%(asctime)s | %(message)s')) +loggerSerializer.addHandler(handler) + +""" +Configuration +""" +url = 'http://localhost:8086' +token = 'my-token' +org = 'my-org' +bucket = 'my-bucket' + +""" +Generate Dataframe +""" +print() +print("=== Generating DataFrame ===") +print() +dataframe_rows_count = 150_000 + +col_data = { + 'time': np.arange(0, dataframe_rows_count, 1, dtype=int), + 'tag': np.random.choice(['tag_a', 'tag_b', 'test_c'], size=(dataframe_rows_count,)), +} +for n in range(2, 2999): + col_data[f'col{n}'] = random.randint(1, 10) + +data_frame = pd.DataFrame(data=col_data).set_index('time') +print(data_frame) + +""" +Ingest DataFrame +""" +print() +print("=== Ingesting DataFrame via batching API ===") +print() +startTime = datetime.now() + +with InfluxDBClient(url=url, token=token, org=org) as client: + + """ + Use batching API + """ + with client.write_api() as write_api: + write_api.write(bucket=bucket, record=data_frame, + data_frame_tag_columns=['tag'], + data_frame_measurement_name="measurement_name") + print() + print("Wait to finishing ingesting DataFrame...") + print() + +print() +print(f'Import finished in: {datetime.now() - startTime}') +print() diff --git a/influxdb_client/client/write/dataframe_serializer.py b/influxdb_client/client/write/dataframe_serializer.py index cf7bb898..a0825cc6 100644 --- a/influxdb_client/client/write/dataframe_serializer.py +++ b/influxdb_client/client/write/dataframe_serializer.py @@ -4,12 +4,15 @@ Much of the code here is inspired by that in the aioinflux packet found here: https://github.com/gusutabopb/aioinflux """ -import re +import logging import math +import re from influxdb_client import WritePrecision from influxdb_client.client.write.point import _ESCAPE_KEY, _ESCAPE_STRING, _ESCAPE_MEASUREMENT, DEFAULT_WRITE_PRECISION +logger = logging.getLogger(__name__) + def _itertuples(data_frame): cols = [data_frame.iloc[:, k] for k in range(len(data_frame.columns))] @@ -24,6 +27,238 @@ def _any_not_nan(p, indexes): return any(map(lambda x: _not_nan(p[x]), indexes)) +class DataframeSerializer: + """Serialize DataFrame into LineProtocols.""" + + def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION, chunk_size: int = None, + **kwargs) -> None: + """ + Init serializer. + + :param data_frame: Pandas DataFrame to serialize + :param point_settings: Default Tags + :param precision: The precision for the unix timestamps within the body line-protocol. + :param chunk_size: The size of chunk for serializing into chunks. + :key data_frame_measurement_name: name of measurement for writing Pandas DataFrame + :key data_frame_tag_columns: list of DataFrame columns which are tags, rest columns will be fields + """ + # This function is hard to understand but for good reason: + # the approach used here is considerably more efficient + # than the alternatives. + # + # We build up a Python expression that efficiently converts a data point + # tuple into line-protocol entry, and then evaluate the expression + # as a lambda so that we can call it. This avoids the overhead of + # invoking a function on every data value - we only have one function + # call per row instead. The expression consists of exactly + # one f-string, so we build up the parts of it as segments + # that are concatenated together to make the full f-string inside + # the lambda. + # + # Things are made a little more complex because fields and tags with NaN + # values and empty tags are omitted from the generated line-protocol + # output. + # + # As an example, say we have a data frame with two value columns: + # a float + # b int + # + # This will generate a lambda expression to be evaluated that looks like + # this: + # + # lambda p: f"""{measurement_name} {keys[0]}={p[1]},{keys[1]}={p[2]}i {p[0].value}""" + # + # This lambda is then executed for each row p. + # + # When NaNs are present, the expression looks like this (split + # across two lines to satisfy the code-style checker) + # + # lambda p: f"""{measurement_name} {"" if math.isnan(p[1]) + # else f"{keys[0]}={p[1]}"},{keys[1]}={p[2]}i {p[0].value}""" + # + # When there's a NaN value in column a, we'll end up with a comma at the start of the + # fields, so we run a regexp substitution after generating the line-protocol entries + # to remove this. + # + # We're careful to run these potentially costly extra steps only when NaN values actually + # exist in the data. + + from ...extras import pd, np + if not isinstance(data_frame, pd.DataFrame): + raise TypeError('Must be DataFrame, but type was: {0}.' + .format(type(data_frame))) + + data_frame_measurement_name = kwargs.get('data_frame_measurement_name') + if data_frame_measurement_name is None: + raise TypeError('"data_frame_measurement_name" is a Required Argument') + + data_frame = data_frame.copy(deep=False) + if isinstance(data_frame.index, pd.PeriodIndex): + data_frame.index = data_frame.index.to_timestamp() + else: + # TODO: this is almost certainly not what you want + # when the index is the default RangeIndex. + # Instead, it would probably be better to leave + # out the timestamp unless a time column is explicitly + # enabled. + data_frame.index = pd.to_datetime(data_frame.index) + + if data_frame.index.tzinfo is None: + data_frame.index = data_frame.index.tz_localize('UTC') + + data_frame_tag_columns = kwargs.get('data_frame_tag_columns') + data_frame_tag_columns = set(data_frame_tag_columns or []) + + # keys holds a list of string keys. + keys = [] + # tags holds a list of tag f-string segments ordered alphabetically by tag key. + tags = [] + # fields holds a list of field f-string segments ordered alphebetically by field key + fields = [] + # field_indexes holds the index into each row of all the fields. + field_indexes = [] + + if point_settings.defaultTags: + for key, value in point_settings.defaultTags.items(): + # Avoid overwriting existing data if there's a column + # that already exists with the default tag's name. + # Note: when a new column is added, the old DataFrame + # that we've made a shallow copy of is unaffected. + # TODO: when there are NaN or empty values in + # the column, we could make a deep copy of the + # data and fill in those values with the default tag value. + if key not in data_frame.columns: + data_frame[key] = value + data_frame_tag_columns.add(key) + + # Get a list of all the columns sorted by field/tag key. + # We want to iterate through the columns in sorted order + # so that we know when we're on the first field so we + # can know whether a comma is needed for that + # field. + columns = sorted(enumerate(data_frame.dtypes.items()), key=lambda col: col[1][0]) + + # null_columns has a bool value for each column holding + # whether that column contains any null (NaN or None) values. + null_columns = data_frame.isnull().any() + + # Iterate through the columns building up the expression for each column. + for index, (key, value) in columns: + key = str(key) + key_format = f'{{keys[{len(keys)}]}}' + keys.append(key.translate(_ESCAPE_KEY)) + # The field index is one more than the column index because the + # time index is at column zero in the finally zipped-together + # result columns. + field_index = index + 1 + val_format = f'p[{field_index}]' + + if key in data_frame_tag_columns: + # This column is a tag column. + if null_columns[index]: + key_value = f"""{{ + '' if {val_format} == '' or type({val_format}) == float and math.isnan({val_format}) else + f',{key_format}={{str({val_format}).translate(_ESCAPE_STRING)}}' + }}""" + else: + key_value = f',{key_format}={{str({val_format}).translate(_ESCAPE_KEY)}}' + tags.append(key_value) + continue + + # This column is a field column. + # Note: no comma separator is needed for the first field. + # It's important to omit it because when the first + # field column has no nulls, we don't run the comma-removal + # regexp substitution step. + sep = '' if len(field_indexes) == 0 else ',' + if issubclass(value.type, np.integer): + field_value = f"{sep}{key_format}={{{val_format}}}i" + elif issubclass(value.type, np.bool_): + field_value = f'{sep}{key_format}={{{val_format}}}' + elif issubclass(value.type, np.floating): + if null_columns[index]: + field_value = f"""{{"" if math.isnan({val_format}) else f"{sep}{key_format}={{{val_format}}}"}}""" + else: + field_value = f'{sep}{key_format}={{{val_format}}}' + else: + if null_columns[index]: + field_value = f"""{{ + '' if type({val_format}) == float and math.isnan({val_format}) else + f'{sep}{key_format}="{{str({val_format}).translate(_ESCAPE_STRING)}}"' + }}""" + else: + field_value = f'''{sep}{key_format}="{{str({val_format}).translate(_ESCAPE_STRING)}}"''' + field_indexes.append(field_index) + fields.append(field_value) + + measurement_name = str(data_frame_measurement_name).translate(_ESCAPE_MEASUREMENT) + + tags = ''.join(tags) + fields = ''.join(fields) + timestamp = '{p[0].value}' + if precision == WritePrecision.US: + timestamp = '{int(p[0].value / 1e3)}' + elif precision == WritePrecision.MS: + timestamp = '{int(p[0].value / 1e6)}' + elif precision == WritePrecision.S: + timestamp = '{int(p[0].value / 1e9)}' + + f = eval(f'lambda p: f"""{{measurement_name}}{tags} {fields} {timestamp}"""', { + 'measurement_name': measurement_name, + '_ESCAPE_KEY': _ESCAPE_KEY, + '_ESCAPE_STRING': _ESCAPE_STRING, + 'keys': keys, + 'math': math, + }) + + for k, v in dict(data_frame.dtypes).items(): + if k in data_frame_tag_columns: + data_frame[k].replace('', np.nan, inplace=True) + + self.data_frame = data_frame + self.f = f + self.field_indexes = field_indexes + self.first_field_maybe_null = null_columns[field_indexes[0] - 1] + + # + # prepare chunks + # + if chunk_size is not None: + self.number_of_chunks = int(math.ceil(len(data_frame) / float(chunk_size))) + self.chunk_size = chunk_size + else: + self.number_of_chunks = None + + def serialize(self, chunk_idx: int = None): + """ + Serialize chunk into LineProtocols. + + :param chunk_idx: The index of chunk to serialize. If `None` then serialize whole dataframe. + """ + if chunk_idx is None: + chunk = self.data_frame + else: + logger.debug("Serialize chunk %s/%s ...", chunk_idx + 1, self.number_of_chunks) + chunk = self.data_frame[chunk_idx * self.chunk_size:(chunk_idx + 1) * self.chunk_size] + + if self.first_field_maybe_null: + # When the first field is null (None/NaN), we'll have + # a spurious leading comma which needs to be removed. + lp = (re.sub('^((\\ |[^ ])* ),', '\\1', self.f(p)) + for p in filter(lambda x: _any_not_nan(x, self.field_indexes), _itertuples(chunk))) + return list(lp) + else: + return list(map(self.f, _itertuples(chunk))) + + def number_of_chunks(self): + """ + Return the number of chunks. + + :return: number of chunks or None if chunk_size is not specified. + """ + return self.number_of_chunks + + def data_frame_to_list_of_points(data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION, **kwargs): """ Serialize DataFrame into LineProtocols. @@ -34,185 +269,4 @@ def data_frame_to_list_of_points(data_frame, point_settings, precision=DEFAULT_W :key data_frame_measurement_name: name of measurement for writing Pandas DataFrame :key data_frame_tag_columns: list of DataFrame columns which are tags, rest columns will be fields """ - # This function is hard to understand but for good reason: - # the approach used here is considerably more efficient - # than the alternatives. - # - # We build up a Python expression that efficiently converts a data point - # tuple into line-protocol entry, and then evaluate the expression - # as a lambda so that we can call it. This avoids the overhead of - # invoking a function on every data value - we only have one function - # call per row instead. The expression consists of exactly - # one f-string, so we build up the parts of it as segments - # that are concatenated together to make the full f-string inside - # the lambda. - # - # Things are made a little more complex because fields and tags with NaN - # values and empty tags are omitted from the generated line-protocol - # output. - # - # As an example, say we have a data frame with two value columns: - # a float - # b int - # - # This will generate a lambda expression to be evaluated that looks like - # this: - # - # lambda p: f"""{measurement_name} {keys[0]}={p[1]},{keys[1]}={p[2]}i {p[0].value}""" - # - # This lambda is then executed for each row p. - # - # When NaNs are present, the expression looks like this (split - # across two lines to satisfy the code-style checker) - # - # lambda p: f"""{measurement_name} {"" if math.isnan(p[1]) - # else f"{keys[0]}={p[1]}"},{keys[1]}={p[2]}i {p[0].value}""" - # - # When there's a NaN value in column a, we'll end up with a comma at the start of the - # fields, so we run a regexp substitution after generating the line-protocol entries - # to remove this. - # - # We're careful to run these potentially costly extra steps only when NaN values actually - # exist in the data. - - from ...extras import pd, np - if not isinstance(data_frame, pd.DataFrame): - raise TypeError('Must be DataFrame, but type was: {0}.' - .format(type(data_frame))) - - data_frame_measurement_name = kwargs.get('data_frame_measurement_name') - if data_frame_measurement_name is None: - raise TypeError('"data_frame_measurement_name" is a Required Argument') - - data_frame = data_frame.copy(deep=False) - if isinstance(data_frame.index, pd.PeriodIndex): - data_frame.index = data_frame.index.to_timestamp() - else: - # TODO: this is almost certainly not what you want - # when the index is the default RangeIndex. - # Instead, it would probably be better to leave - # out the timestamp unless a time column is explicitly - # enabled. - data_frame.index = pd.to_datetime(data_frame.index) - - if data_frame.index.tzinfo is None: - data_frame.index = data_frame.index.tz_localize('UTC') - - data_frame_tag_columns = kwargs.get('data_frame_tag_columns') - data_frame_tag_columns = set(data_frame_tag_columns or []) - - # keys holds a list of string keys. - keys = [] - # tags holds a list of tag f-string segments ordered alphabetically by tag key. - tags = [] - # fields holds a list of field f-string segments ordered alphebetically by field key - fields = [] - # field_indexes holds the index into each row of all the fields. - field_indexes = [] - - if point_settings.defaultTags: - for key, value in point_settings.defaultTags.items(): - # Avoid overwriting existing data if there's a column - # that already exists with the default tag's name. - # Note: when a new column is added, the old DataFrame - # that we've made a shallow copy of is unaffected. - # TODO: when there are NaN or empty values in - # the column, we could make a deep copy of the - # data and fill in those values with the default tag value. - if key not in data_frame.columns: - data_frame[key] = value - data_frame_tag_columns.add(key) - - # Get a list of all the columns sorted by field/tag key. - # We want to iterate through the columns in sorted order - # so that we know when we're on the first field so we - # can know whether a comma is needed for that - # field. - columns = sorted(enumerate(data_frame.dtypes.items()), key=lambda col: col[1][0]) - - # null_columns has a bool value for each column holding - # whether that column contains any null (NaN or None) values. - null_columns = data_frame.isnull().any() - - # Iterate through the columns building up the expression for each column. - for index, (key, value) in columns: - key = str(key) - key_format = f'{{keys[{len(keys)}]}}' - keys.append(key.translate(_ESCAPE_KEY)) - # The field index is one more than the column index because the - # time index is at column zero in the finally zipped-together - # result columns. - field_index = index + 1 - val_format = f'p[{field_index}]' - - if key in data_frame_tag_columns: - # This column is a tag column. - if null_columns[index]: - key_value = f"""{{ - '' if {val_format} == '' or type({val_format}) == float and math.isnan({val_format}) else - f',{key_format}={{str({val_format}).translate(_ESCAPE_STRING)}}' - }}""" - else: - key_value = f',{key_format}={{str({val_format}).translate(_ESCAPE_KEY)}}' - tags.append(key_value) - continue - - # This column is a field column. - # Note: no comma separator is needed for the first field. - # It's important to omit it because when the first - # field column has no nulls, we don't run the comma-removal - # regexp substitution step. - sep = '' if len(field_indexes) == 0 else ',' - if issubclass(value.type, np.integer): - field_value = f"{sep}{key_format}={{{val_format}}}i" - elif issubclass(value.type, np.bool_): - field_value = f'{sep}{key_format}={{{val_format}}}' - elif issubclass(value.type, np.floating): - if null_columns[index]: - field_value = f"""{{"" if math.isnan({val_format}) else f"{sep}{key_format}={{{val_format}}}"}}""" - else: - field_value = f'{sep}{key_format}={{{val_format}}}' - else: - if null_columns[index]: - field_value = f"""{{ - '' if type({val_format}) == float and math.isnan({val_format}) else - f'{sep}{key_format}="{{str({val_format}).translate(_ESCAPE_STRING)}}"' - }}""" - else: - field_value = f'''{sep}{key_format}="{{str({val_format}).translate(_ESCAPE_STRING)}}"''' - field_indexes.append(field_index) - fields.append(field_value) - - measurement_name = str(data_frame_measurement_name).translate(_ESCAPE_MEASUREMENT) - - tags = ''.join(tags) - fields = ''.join(fields) - timestamp = '{p[0].value}' - if precision == WritePrecision.US: - timestamp = '{int(p[0].value / 1e3)}' - elif precision == WritePrecision.MS: - timestamp = '{int(p[0].value / 1e6)}' - elif precision == WritePrecision.S: - timestamp = '{int(p[0].value / 1e9)}' - - f = eval(f'lambda p: f"""{{measurement_name}}{tags} {fields} {timestamp}"""', { - 'measurement_name': measurement_name, - '_ESCAPE_KEY': _ESCAPE_KEY, - '_ESCAPE_STRING': _ESCAPE_STRING, - 'keys': keys, - 'math': math, - }) - - for k, v in dict(data_frame.dtypes).items(): - if k in data_frame_tag_columns: - data_frame[k].replace('', np.nan, inplace=True) - - first_field_maybe_null = null_columns[field_indexes[0] - 1] - if first_field_maybe_null: - # When the first field is null (None/NaN), we'll have - # a spurious leading comma which needs to be removed. - lp = (re.sub('^((\\ |[^ ])* ),', '\\1', f(p)) - for p in filter(lambda x: _any_not_nan(x, field_indexes), _itertuples(data_frame))) - return list(lp) - else: - return list(map(f, _itertuples(data_frame))) + return DataframeSerializer(data_frame, point_settings, precision, **kwargs).serialize() diff --git a/influxdb_client/client/write_api.py b/influxdb_client/client/write_api.py index 027b353e..3380224c 100644 --- a/influxdb_client/client/write_api.py +++ b/influxdb_client/client/write_api.py @@ -17,7 +17,7 @@ from influxdb_client import WritePrecision, WriteService from influxdb_client.client.util.helpers import get_org_query_param -from influxdb_client.client.write.dataframe_serializer import data_frame_to_list_of_points +from influxdb_client.client.write.dataframe_serializer import DataframeSerializer from influxdb_client.client.write.point import Point, DEFAULT_WRITE_PRECISION from influxdb_client.client.write.retry import WritesRetry @@ -134,7 +134,7 @@ def __init__(self, key, data, size=1) -> None: pass def __str__(self) -> str: - return '_BatchItem[key:\'{}\', \'{}\']' \ + return '_BatchItem[key:\'{}\', size: \'{}\']' \ .format(str(self.key), str(self.size)) @@ -312,8 +312,8 @@ def _serialize(self, record, write_precision, payload, **kwargs): self._serialize(Point.from_dict(record, write_precision=write_precision), write_precision, payload, **kwargs) elif 'DataFrame' in type(record).__name__: - _data = data_frame_to_list_of_points(record, self._point_settings, write_precision, **kwargs) - self._serialize(_data, write_precision, payload, **kwargs) + serializer = DataframeSerializer(record, self._point_settings, write_precision, **kwargs) + self._serialize(serializer.serialize(), write_precision, payload, **kwargs) elif isinstance(record, Iterable): for item in record: @@ -338,9 +338,12 @@ def _write_batching(self, bucket, org, data, precision, **kwargs) elif 'DataFrame' in type(data).__name__: - self._write_batching(bucket, org, - data_frame_to_list_of_points(data, self._point_settings, precision, **kwargs), - precision, **kwargs) + serializer = DataframeSerializer(data, self._point_settings, precision, self._write_options.batch_size, + **kwargs) + for chunk_idx in range(serializer.number_of_chunks): + self._write_batching(bucket, org, + serializer.serialize(chunk_idx), + precision, **kwargs) elif isinstance(data, Iterable): for item in data: diff --git a/tests/test_WriteApiDataFrame.py b/tests/test_WriteApiDataFrame.py index 52c93378..c5724e4a 100644 --- a/tests/test_WriteApiDataFrame.py +++ b/tests/test_WriteApiDataFrame.py @@ -3,7 +3,8 @@ from datetime import timedelta from influxdb_client import InfluxDBClient, WriteOptions, WriteApi, WritePrecision -from influxdb_client.client.write.dataframe_serializer import data_frame_to_list_of_points +from influxdb_client.client.write.dataframe_serializer import data_frame_to_list_of_points, DataframeSerializer +from influxdb_client.client.write.point import DEFAULT_WRITE_PRECISION from influxdb_client.client.write_api import SYNCHRONOUS, PointSettings from tests.base_test import BaseTest @@ -352,3 +353,52 @@ def test_write_precision(self): precision=precision[0]) self.assertEqual(1, len(points)) self.assertEqual(f"h2o level=15i {precision[1]}", points[0]) + + +class DataSerializerChunksTest(unittest.TestCase): + def test_chunks(self): + from influxdb_client.extras import pd + data_frame = pd.DataFrame( + data=[ + ["a", 1, 2], + ["b", 3, 4], + ["c", 5, 6], + ["d", 7, 8], + ], + index=[1, 2, 3, 4], + columns=["tag", "field1", "field2"]) + + # + # Batch size = 2 + # + serializer = DataframeSerializer(data_frame, PointSettings(), DEFAULT_WRITE_PRECISION, 2, + data_frame_measurement_name='m', data_frame_tag_columns={"tag"}) + self.assertEqual(2, serializer.number_of_chunks) + self.assertEqual(['m,tag=a field1=1i,field2=2i 1', + 'm,tag=b field1=3i,field2=4i 2'], serializer.serialize(chunk_idx=0)) + self.assertEqual(['m,tag=c field1=5i,field2=6i 3', + 'm,tag=d field1=7i,field2=8i 4'], serializer.serialize(chunk_idx=1)) + + # + # Batch size = 10 + # + serializer = DataframeSerializer(data_frame, PointSettings(), DEFAULT_WRITE_PRECISION, 10, + data_frame_measurement_name='m', data_frame_tag_columns={"tag"}) + self.assertEqual(1, serializer.number_of_chunks) + self.assertEqual(['m,tag=a field1=1i,field2=2i 1', + 'm,tag=b field1=3i,field2=4i 2', + 'm,tag=c field1=5i,field2=6i 3', + 'm,tag=d field1=7i,field2=8i 4' + ], serializer.serialize(chunk_idx=0)) + + # + # Batch size = 3 + # + serializer = DataframeSerializer(data_frame, PointSettings(), DEFAULT_WRITE_PRECISION, 3, + data_frame_measurement_name='m', data_frame_tag_columns={"tag"}) + self.assertEqual(2, serializer.number_of_chunks) + self.assertEqual(['m,tag=a field1=1i,field2=2i 1', + 'm,tag=b field1=3i,field2=4i 2', + 'm,tag=c field1=5i,field2=6i 3' + ], serializer.serialize(chunk_idx=0)) + self.assertEqual(['m,tag=d field1=7i,field2=8i 4'], serializer.serialize(chunk_idx=1))