Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data loss but no exception raised #198

Closed
lafrech opened this issue Feb 24, 2021 · 7 comments · Fixed by #202
Closed

Data loss but no exception raised #198

lafrech opened this issue Feb 24, 2021 · 7 comments · Fixed by #202
Milestone

Comments

@lafrech
Copy link
Contributor

lafrech commented Feb 24, 2021

TL;DR

Is it possible that the data fails to be written but no exception is raised?


Hi.

I'm currently investigating a recurring data loss issue.

I've been searching on the bug tracker but couldn't find anything relevant. Like in #80 (comment), I get no error. Opening a new issue as this is not related to #80.

I'm using default parameters (except a batch size of 5000 but I don't think it matters).

My program reads timeseries from CSV files in a directory and sends the data to InfluxDB. Once done, it records the last file modification time to know where to start next time, so as to only process new files.

There is not much exception handling. If an exception is not caught, the program just crashes. If this happens, the last modification times are not updated and the program reloads all the files next time.

Apparently, the program doesn't crash, but there is missing data in the database. Batches of contiguous missing data. The size of the holes may vary and generally spans over several files.

Sometimes, files are half loaded, so it's not the time-based file filtering mechanism that skips a file it shouldn't.

If I re-run the program manually on incriminated files, the data is correctly loaded, so the files are correctly formed.

Hence, my question:

Is it possible that the data fails to be written but no exception is raised?

I understand there is a retry mechanism. I assume if influxdb_client raises no exception, then all data was correctly acknowledged by InfluxDB. There might have been retries but those were successful in the end.

Is this a safe assumption?

I do call __del__ on write_api and client, so I don't think it is a matter or flushing buffers. I tried once with debug=True and I could check there was no data read after the last POST to InfluxDB, so all data is sent before closing the program.

I wouldn't mind providing the code but it's a bit convoluted, not really a minimal example. Since we do this kind of things for different projects, I wrote a file importer class to handle the common part (database connection and file selection) and child classes just need to specify how to read the files and add ad-hoc tags from project-specific metadata. Not the ideal scenario to provide a reproducible case.

As such, this is not really a bug report, more of a clarification request. Please feel free to redirect me if this is not the right place to ask.

Thanks.

@bednar
Copy link
Contributor

bednar commented Feb 25, 2021

Hi @lafrech,

thanks for using our client and detail explanation of your problem.

Is it possible that the data fails to be written but no exception is raised?

The default configuration of WriteApi uses underlying batching queue to flushing data into InfluxDB. The unrecoverable exception are logged and recoverable are handled with retry strategy.

The following piece of code doesn't throw an exception because the record is processed in underlying Thread.

def write(self, record):
        """Write record to InfluxDB database"""
   if not DRY_RUN.get():
      self._write_api.write(
          bucket=self._database_cfg["bucket"],
          record=record
   )

https://github.com/Nobatek/influxdb-file-importer/blob/0b7b83d130d1289fb5304741709c9684917c24a6/influxdb_file_importer.py#L55

My program reads timeseries from CSV files in a directory and sends the data to InfluxDB. Once done, it records the last file modification time to know where to start next time, so as to only process new files.

Could be files updated during the import into InfluxDB?

If you want to strongly rely to success response from InfluxDB, you could use something like that:

from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write.retry import WritesRetry
from influxdb_client.client.write_api import SYNCHRONOUS

"""
Define Retry strategy - 3 attempts => 2, 4, 8
"""
retries = WritesRetry(total=3, backoff_factor=1, exponential_base=2)
client = InfluxDBClient(url='http://localhost:8087', token='my-token', org='my-org', retries=retries)

"""
Use synchronous version of WriteApi to strongly depends on result of write
"""
write_api = client.write_api(write_options=SYNCHRONOUS)

"""
Prepare my batches
"""
batch = Point("mem").tag("host", "host1").field("used_percent", 25.43234543)

try:
    write_api.write(bucket='my-bucket', record=batch)
except Exception as e:
    print(f'Something is wrong: {e}')

"""
Dispose lient
"""
write_api.__del__()
client.close()

@lafrech
Copy link
Contributor Author

lafrech commented Mar 1, 2021

Hi @bednar and Thanks for answering.

I don't think the files are modified while loading because they are created on the remote machine then synced. Good point, anyway. I just modified my script to get the mtime before loading the file so that if the file was modified while loading, it would be loaded again.

I've been reading your answer and the docs several times and things are still a bit unclear to me. I also dived into the code but it's a pretty big code base so I'd rather ask here than jump to wrong conclusions.

The WriteApi supports synchronous, asynchronous and batching writes into InfluxDB 2.0.

I understand "batching mode": write when the buffer is full rather than upon each wall to write method.

To me, "synchronous" is a bit ambiguous in this context, as it can mean "write upon each call to write, as opposed to batching, or write in the main thread, as opposed to write in a separate thread.

I guess "asynchronous" means "write in a separate thread" but this could be done either upon each call to write or only when the buffer is full, that is asynchronous batching.

In other words, all combinations of batching / non batching and sync / async should be possible.

Trying to sum this up in a table, here's what I understand. Is this correct?

Mode Sync Async
Not batch SYNCHRONOUS ASYNCHRONOUS
Batch ? BATCHING

I'd like to achieve batching, for performance reasons. I used to do it myself with my former implementation (I believe it relied on the old InfluxDB python lib) but since this lib does it, I'm using it from the lib.

I don't mind sending data in the main thread and IIUC it would make catching errors easier as exceptions would crash the app (which may or may not be wanted depending on the use case but one can always catch exceptions to keep the program running).

Is there a way to achieve batching and write in main thread? (That would be the ? cell in my table.)

Looks like you're suggesting that I use synchronous mode and prepare the batches in my code, so I guess batching mode in influxdb-client uses asynchronous write.

What I'd need could be just a flush(sync=True) flag. Let's call this an enhancement request, then.

Until then, assuming what I wrote above is correct, my options are

  • Use synchronous mode and never mind the performance. I think my volume of data allows it in daily usage but it might be an issue when loading or reloading a whole history. And it is not really satisfying

  • Use synchronous mode and manage batching in my client code. Would work. But not really satisfying either.

  • Keep things as they are and try to track/log those errors, at least to reload manually when they happen. Not really practical either. I could at least increase the number of retries.

I just tried to send data while being disconnected from the network and I confirm the script doesn't fail this answers my initial question) and the data is sent in a separate thread.

Also, I'm really surprised those errors happen in the first place. I tried to look in InfluxDB logs (journalctl) but didn't find anything really explicit in there. Is there something specific I could search?

Thanks.

@bednar
Copy link
Contributor

bednar commented Mar 1, 2021

To me, "synchronous" is a bit ambiguous in this context, as it can mean "write upon each call to write, as opposed to batching, or write in the main thread, as opposed to write in a separate thread.

The synchronous means "synchronously write upon each call to write.

I guess "asynchronous" means "write in a separate thread" but this could be done either upon each call to write or only when the buffer is full, that is asynchronous batching.

The asynchronous means "asynchronously write upon each call to write.

In other words, all combinations of batching / non batching and sync / async should be possible.

Trying to sum this up in a table, here's why I understand. Is this correct?

Mode Sync Async
Not batch SYNCHRONOUS ASYNCHRONOUS
Batch ? BATCHING

Mode Thread Batching
SYNCHRONOUS main no
ASYNCHRONOUS background no
BATCHING background yes

I'd like to achieve batching, for performance reasons. I used to do it myself with my former implementation (I believe it relied on the old InfluxDB python lib) but since this lib does it, I'm using it from the lib.

I don't mind sending data in the main thread and IIUC it would make catching errors easier as exceptions would crash the app (which may or may not be wanted depending on the use case but one can always catch exceptions to keep the program running).

Is there a way to achieve batching and write in main thread? (That would be the ? cell in my table.)

Currently not.

Looks like you're suggesting that I use synchronous mode and prepare the batches in my code, so I guess batching mode in influxdb-client uses asynchronous write.

What I'd need could be just a flush(sync=True) flag. Let's call this an enhancement request, then.

So, the client behaviour will be: buffering data until flush(sync=True) and then start write batches into InfluxDB. Is it right?

Until then, assuming what I wrote above is correct, my options are

  • Use synchronous mode and never mind the performance. I think my volume of data allows it in daily usage but it might be an issue when loading or reloading a whole history. And it is not really satisfying
  • Use synchronous mode and manage batching in my client code. Would work. But not really satisfying either.
  • Keep things as they are and try to track/log those errors, at least to reload manually when they happen. Not really practical either. I could at least increase the number of retries.

I just tried to send data while being disconnected from the network and I confirm the script doesn't fail. (This answers my initial question.)

Also, I'm really surprised those errors happen in the first place. I tried to look in InfluxDB logs (journalctl) but didn't find anything really explicit in there. Is there something specific I could search?

Any error or warnings.

Thanks.

@lafrech
Copy link
Contributor Author

lafrech commented Mar 1, 2021

Thanks for reading through my comment and confirming my understanding.

Yes, basically, what I'd like is batching in main thread.

Mode Thread Batching
SYNCHRONOUS main no
ASYNCHRONOUS background no
BATCHING background yes
SYNC_BATCHING main yes

So, the client behaviour will be: buffering data until flush(sync=True) and then start write batches into InfluxDB. Is it right?

On second thought, the sync flag might be a bad API, adding useless complexity.

It could be on write. let's call it batch for clarity.

write(data, batch=False)  # False is current behaviour with `SYNCHRONOUS` / `ASYNCHRONOUS`.

But this conflicts with BATCHING mode.

Also, adding this to write would allow the same client to alternate batching and non-batching calls, which is useless IMHO. One might as well use two WriteApi instances.

The API that makes most sense to me would be to create a fourth mode (what I called SYNC_BATCHING above, short of a better name). At least from user (my) perspective. I can't tell from code perspective.

@bednar
Copy link
Contributor

bednar commented Mar 2, 2021

You could implement something like that with rxpy:

from csv import DictReader

import rx
from rx import operators as ops

from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write.retry import WritesRetry
from influxdb_client.client.write_api import SYNCHRONOUS


def csv_to_generator(csv_file_path):
    """
    Parse your CSV file into generator
    """
    for row in DictReader(open(csv_file_path, "r")):
        point = Point('financial-analysis') \
            .tag("type", "vix-daily") \
            .field("open", float(row['VIX Open'])) \
            .field("high", float(row['VIX High'])) \
            .field("low", float(row['VIX Low'])) \
            .field("close", float(row['VIX Close'])) \
            .time(row['Date'])
        yield point


"""
Define Retry strategy - 3 attempts => 2, 4, 8
"""
retries = WritesRetry(total=3, backoff_factor=1, exponential_base=2)
client = InfluxDBClient(url='http://localhost:8086', token='my-token', org='my-org', retries=retries)

"""
Use synchronous version of WriteApi to strongly depends on result of write
"""
write_api = client.write_api(write_options=SYNCHRONOUS)

"""
Prepare batches from generator
"""
batches = rx \
    .from_iterable(csv_to_generator('vix-daily.csv')) \
    .pipe(ops.buffer_with_count(500))


def write_batch(batch):
    """
    Synchronous write
    """
    print(f"Writing... {len(batch)}")
    write_api.write(bucket='my-bucket', record=batch)


"""
Write batches
"""
batches.subscribe(on_next=lambda batch: write_batch(batch),
                  on_error=lambda ex: print(f'Unexpected error: {ex}'),
                  on_completed=lambda: print("Import finished!"))

"""
Dispose client
"""
write_api.close()
client.close()

@lafrech
Copy link
Contributor Author

lafrech commented Mar 4, 2021

Thanks for the tip. I still think this mode would be a nice addition to the lib but doing it in user code as you demonstrate is not too much work.

I've been giving this a try and it seems to work like a charm. The use of generators actually makes the code even neater as only my lib needs to worry about write_api and the import method overridden for each project only needs to yield records rather than write them to database. Thus, the api_write object doesn't need to be accessed there and the code is simpler.

Thanks!

@bednar
Copy link
Contributor

bednar commented Mar 8, 2021

I will create a new example for this usecase and If there will be more requests for this mode We will prepare implementation in client.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants