Skip to content

Commit

Permalink
docs: added an example how to use RxPY and sync batching (#202)
Browse files Browse the repository at this point in the history
  • Loading branch information
bednar authored Mar 11, 2021
1 parent 75f4579 commit 4f1e14e
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 12 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## 1.16.0 [unreleased]

### Documentation
1. [#202](https://github.com/influxdata/influxdb-client-python/pull/202): Added an example how to use RxPY and sync batching

## 1.15.0 [2021-03-05]

### Bug Fixes
Expand Down
22 changes: 10 additions & 12 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,6 @@ The data could be written as
Batching
""""""""

.. marker-batching-start
The batching is configurable by ``write_options``\ :

.. list-table::
Expand Down Expand Up @@ -348,11 +346,9 @@ The batching is configurable by ``write_options``\ :
_write_client.close()
_client.close()
.. marker-batching-end
Default Tags
""""""""""""
.. marker-default-tags-start

Sometimes is useful to store same information in every measurement e.g. ``hostname``, ``location``, ``customer``.
The client is able to use static value or env property as a tag value.
Expand Down Expand Up @@ -415,8 +411,6 @@ Examples:
self.client = InfluxDBClient.from_env_properties()
.. marker-default-tags-end
Asynchronous client
"""""""""""""""""""

Expand Down Expand Up @@ -458,6 +452,8 @@ Data are writes in a synchronous HTTP request.
client.close()
.. marker-writes-end
Queries
^^^^^^^

Expand Down Expand Up @@ -595,6 +591,8 @@ Output:
Examples
^^^^^^^^

.. marker-examples-start
How to efficiently import large dataset
"""""""""""""""""""""""""""""""""""""""

Expand Down Expand Up @@ -703,12 +701,8 @@ If you would like to import gigabytes of data then use our multiprocessing examp
"""
client.close()
.. marker-writes-end
Efficiency write data from IOT sensor
"""""""""""""""""""""""""""""""""""""
.. marker-iot-start

* sources - `iot_sensor.py <https://github.com/influxdata/influxdb-client-python/blob/master/examples/iot_sensor.py>`_

Expand Down Expand Up @@ -791,8 +785,6 @@ Efficiency write data from IOT sensor
input()
.. marker-iot-end
Connect to InfluxDB Cloud
"""""""""""""""""""""""""
The following example demonstrate a simplest way how to write and query date with the InfluxDB Cloud.
Expand Down Expand Up @@ -888,6 +880,12 @@ The second example shows how to use client capabilities to realtime visualizatio

.. image:: https://raw.githubusercontent.com/influxdata/influxdb-client-python/master/docs/images/realtime-result.gif

Other examples
""""""""""""""

You could find all examples at GitHub: `influxdb-client-python/examples <https://github.com/influxdata/influxdb-client-python/tree/master/examples#examples>`_.

.. marker-examples-end
Advanced Usage
--------------
Expand Down
6 changes: 6 additions & 0 deletions docs/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,9 @@ Both request header and body will be logged to standard output.
.. code-block:: python
_client = InfluxDBClient(url="http://localhost:8086", token="my-token", debug=True, org="my-org")
Examples
^^^^^^^^
.. include:: ../README.rst
:start-after: marker-examples-start
:end-before: marker-examples-end
18 changes: 18 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Examples

## Writes
- [import_data_set.py](import_data_set.py) - How to import CSV file
- [import_data_set_multiprocessing.py](import_data_set_multiprocessing.py) - How to large CSV file by Python Multiprocessing
- [ingest_dataframe_default_tags.py](ingest_dataframe_default_tags.py) - How to ingest DataFrame with default tags
- [iot_sensor.py](iot_sensor.py) - How to write sensor data every minute by [RxPY](https://rxpy.readthedocs.io/en/latest/)
- [import_data_set_sync_batching.py](import_data_set_sync_batching.py) - How to use [RxPY](https://rxpy.readthedocs.io/en/latest/) to prepare batches for synchronous write into InfluxDB

## Queries
- [query.py](query.py) - How to query data into `FluxTable`s, `Stream` and `CSV`
- [query_from_file.py](query_from_file.py) - How to use a Flux query defined in a separate file

## Others
- [influx_cloud.py](influx_cloud.py) - How to connect to InfluxDB 2 Cloud
- [influxdb_18_example.py](influxdb_18_example.py) - How to connect to InfluxDB 1.8
- [nanosecond_precision.py](nanosecond_precision.py) - How to use nanoseconds precision

68 changes: 68 additions & 0 deletions examples/import_data_set_sync_batching.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""
How to use RxPY to prepare batches for synchronous write into InfluxDB
"""

from csv import DictReader

import rx
from rx import operators as ops

from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write.retry import WritesRetry
from influxdb_client.client.write_api import SYNCHRONOUS


def csv_to_generator(csv_file_path):
"""
Parse your CSV file into generator
"""
for row in DictReader(open(csv_file_path, 'r')):
point = Point('financial-analysis') \
.tag('type', 'vix-daily') \
.field('open', float(row['VIX Open'])) \
.field('high', float(row['VIX High'])) \
.field('low', float(row['VIX Low'])) \
.field('close', float(row['VIX Close'])) \
.time(row['Date'])
yield point


"""
Define Retry strategy - 3 attempts => 2, 4, 8
"""
retries = WritesRetry(total=3, backoff_factor=1, exponential_base=2)
client = InfluxDBClient(url='http://localhost:8086', token='my-token', org='my-org', retries=retries)

"""
Use synchronous version of WriteApi to strongly depends on result of write
"""
write_api = client.write_api(write_options=SYNCHRONOUS)

"""
Prepare batches from generator
"""
batches = rx \
.from_iterable(csv_to_generator('vix-daily.csv')) \
.pipe(ops.buffer_with_count(500))


def write_batch(batch):
"""
Synchronous write
"""
print(f'Writing... {len(batch)}')
write_api.write(bucket='my-bucket', record=batch)


"""
Write batches
"""
batches.subscribe(on_next=lambda batch: write_batch(batch),
on_error=lambda ex: print(f'Unexpected error: {ex}'),
on_completed=lambda: print('Import finished!'))

"""
Dispose client
"""
write_api.close()
client.close()

0 comments on commit 4f1e14e

Please sign in to comment.