From 90be2337a65ce8716019c4caab5829e4569c1576 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Bedn=C3=A1=C5=99?= Date: Thu, 11 Mar 2021 11:52:08 +0100 Subject: [PATCH] docs: added an example how to use RxPY and sync batching (#202) --- CHANGELOG.md | 3 + README.rst | 22 ++++---- docs/usage.rst | 6 ++ examples/README.md | 18 ++++++ examples/import_data_set_sync_batching.py | 68 +++++++++++++++++++++++ 5 files changed, 105 insertions(+), 12 deletions(-) create mode 100644 examples/README.md create mode 100644 examples/import_data_set_sync_batching.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 38e27472..36afb4fe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## 1.16.0 [unreleased] +### Documentation +1. [#202](https://github.com/influxdata/influxdb-client-python/pull/202): Added an example how to use RxPY and sync batching + ## 1.15.0 [2021-03-05] ### Bug Fixes diff --git a/README.rst b/README.rst index b050ed08..cc9d396d 100644 --- a/README.rst +++ b/README.rst @@ -236,8 +236,6 @@ The data could be written as Batching """""""" -.. marker-batching-start - The batching is configurable by ``write_options``\ : .. list-table:: @@ -348,11 +346,9 @@ The batching is configurable by ``write_options``\ : _write_client.close() _client.close() -.. marker-batching-end Default Tags """""""""""" -.. marker-default-tags-start Sometimes is useful to store same information in every measurement e.g. ``hostname``, ``location``, ``customer``. The client is able to use static value or env property as a tag value. @@ -415,8 +411,6 @@ Examples: self.client = InfluxDBClient.from_env_properties() -.. marker-default-tags-end - Asynchronous client """"""""""""""""""" @@ -458,6 +452,8 @@ Data are writes in a synchronous HTTP request. client.close() +.. marker-writes-end + Queries ^^^^^^^ @@ -595,6 +591,8 @@ Output: Examples ^^^^^^^^ +.. marker-examples-start + How to efficiently import large dataset """"""""""""""""""""""""""""""""""""""" @@ -703,12 +701,8 @@ If you would like to import gigabytes of data then use our multiprocessing examp """ client.close() -.. marker-writes-end - - Efficiency write data from IOT sensor """"""""""""""""""""""""""""""""""""" -.. marker-iot-start * sources - `iot_sensor.py `_ @@ -791,8 +785,6 @@ Efficiency write data from IOT sensor input() -.. marker-iot-end - Connect to InfluxDB Cloud """"""""""""""""""""""""" The following example demonstrate a simplest way how to write and query date with the InfluxDB Cloud. @@ -888,6 +880,12 @@ The second example shows how to use client capabilities to realtime visualizatio .. image:: https://raw.githubusercontent.com/influxdata/influxdb-client-python/master/docs/images/realtime-result.gif +Other examples +"""""""""""""" + +You could find all examples at GitHub: `influxdb-client-python/examples `_. + +.. marker-examples-end Advanced Usage -------------- diff --git a/docs/usage.rst b/docs/usage.rst index 88ed8408..26f83156 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -43,3 +43,9 @@ Both request header and body will be logged to standard output. .. code-block:: python _client = InfluxDBClient(url="http://localhost:8086", token="my-token", debug=True, org="my-org") + +Examples +^^^^^^^^ +.. include:: ../README.rst + :start-after: marker-examples-start + :end-before: marker-examples-end \ No newline at end of file diff --git a/examples/README.md b/examples/README.md new file mode 100644 index 00000000..91f248b7 --- /dev/null +++ b/examples/README.md @@ -0,0 +1,18 @@ +# Examples + +## Writes +- [import_data_set.py](import_data_set.py) - How to import CSV file +- [import_data_set_multiprocessing.py](import_data_set_multiprocessing.py) - How to large CSV file by Python Multiprocessing +- [ingest_dataframe_default_tags.py](ingest_dataframe_default_tags.py) - How to ingest DataFrame with default tags +- [iot_sensor.py](iot_sensor.py) - How to write sensor data every minute by [RxPY](https://rxpy.readthedocs.io/en/latest/) +- [import_data_set_sync_batching.py](import_data_set_sync_batching.py) - How to use [RxPY](https://rxpy.readthedocs.io/en/latest/) to prepare batches for synchronous write into InfluxDB + +## Queries +- [query.py](query.py) - How to query data into `FluxTable`s, `Stream` and `CSV` +- [query_from_file.py](query_from_file.py) - How to use a Flux query defined in a separate file + +## Others +- [influx_cloud.py](influx_cloud.py) - How to connect to InfluxDB 2 Cloud +- [influxdb_18_example.py](influxdb_18_example.py) - How to connect to InfluxDB 1.8 +- [nanosecond_precision.py](nanosecond_precision.py) - How to use nanoseconds precision + \ No newline at end of file diff --git a/examples/import_data_set_sync_batching.py b/examples/import_data_set_sync_batching.py new file mode 100644 index 00000000..19974314 --- /dev/null +++ b/examples/import_data_set_sync_batching.py @@ -0,0 +1,68 @@ +""" +How to use RxPY to prepare batches for synchronous write into InfluxDB +""" + +from csv import DictReader + +import rx +from rx import operators as ops + +from influxdb_client import InfluxDBClient, Point +from influxdb_client.client.write.retry import WritesRetry +from influxdb_client.client.write_api import SYNCHRONOUS + + +def csv_to_generator(csv_file_path): + """ + Parse your CSV file into generator + """ + for row in DictReader(open(csv_file_path, 'r')): + point = Point('financial-analysis') \ + .tag('type', 'vix-daily') \ + .field('open', float(row['VIX Open'])) \ + .field('high', float(row['VIX High'])) \ + .field('low', float(row['VIX Low'])) \ + .field('close', float(row['VIX Close'])) \ + .time(row['Date']) + yield point + + +""" +Define Retry strategy - 3 attempts => 2, 4, 8 +""" +retries = WritesRetry(total=3, backoff_factor=1, exponential_base=2) +client = InfluxDBClient(url='http://localhost:8086', token='my-token', org='my-org', retries=retries) + +""" +Use synchronous version of WriteApi to strongly depends on result of write +""" +write_api = client.write_api(write_options=SYNCHRONOUS) + +""" +Prepare batches from generator +""" +batches = rx \ + .from_iterable(csv_to_generator('vix-daily.csv')) \ + .pipe(ops.buffer_with_count(500)) + + +def write_batch(batch): + """ + Synchronous write + """ + print(f'Writing... {len(batch)}') + write_api.write(bucket='my-bucket', record=batch) + + +""" +Write batches +""" +batches.subscribe(on_next=lambda batch: write_batch(batch), + on_error=lambda ex: print(f'Unexpected error: {ex}'), + on_completed=lambda: print('Import finished!')) + +""" +Dispose client +""" +write_api.close() +client.close()