Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Configurable batch size and max wait limit for targets #1876

Open
wants to merge 97 commits into
base: main
Choose a base branch
from

Conversation

BuzzCutNorman
Copy link
Contributor

@BuzzCutNorman BuzzCutNorman commented Jul 25, 2023

This is an attempt at implementing the batch size and wait limit for targets as outlined.

Closes #1626


📚 Documentation preview 📚: https://meltano-sdk--1876.org.readthedocs.build/en/1876/

@codecov
Copy link

codecov bot commented Jul 25, 2023

Codecov Report

Attention: 5 lines in your changes are missing coverage. Please review.

Comparison is base (5734e69) 88.47% compared to head (a0e34c9) 88.69%.

❗ Current head a0e34c9 differs from pull request most recent head 49a64e6. Consider uploading reports for the commit 49a64e6 to get more accurate results

Files Patch % Lines
singer_sdk/sinks/core.py 92.30% 2 Missing and 2 partials ⚠️
singer_sdk/helpers/_perftimer.py 98.73% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1876      +/-   ##
==========================================
+ Coverage   88.47%   88.69%   +0.21%     
==========================================
  Files          54       55       +1     
  Lines        4721     4857     +136     
  Branches      919      952      +33     
==========================================
+ Hits         4177     4308     +131     
- Misses        383      385       +2     
- Partials      161      164       +3     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@BuzzCutNorman BuzzCutNorman marked this pull request as ready for review August 8, 2023 22:32
Copy link
Collaborator

@edgarrmondragon edgarrmondragon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your work on this @BuzzCutNorman!

I think this

@property
def is_full(self) -> bool:
"""Check against size limit.
Returns:
True if the sink needs to be drained.
"""
return self.current_size >= self.max_size

has to be updated to take the timer into account, wdyt?

singer_sdk/helpers/_perftimer.py Outdated Show resolved Hide resolved
singer_sdk/helpers/_perftimer.py Outdated Show resolved Hide resolved
singer_sdk/helpers/_perftimer.py Outdated Show resolved Hide resolved
singer_sdk/helpers/_perftimer.py Outdated Show resolved Hide resolved
singer_sdk/helpers/_perftimer.py Outdated Show resolved Hide resolved
singer_sdk/helpers/_perftimer.py Outdated Show resolved Hide resolved
singer_sdk/helpers/_perftimer.py Outdated Show resolved Hide resolved
tests/core/test_perf_timer.py Outdated Show resolved Hide resolved
tests/core/test_perf_timer.py Outdated Show resolved Hide resolved
Comment on lines 402 to 409
# Finish Line for max_size perf counter
if self.sink_timer is not None:
if self.sink_timer.start_time is not None:
self.sink_timer.stop()
self.batch_size_rows = self.sink_timer.counter_based_max_size()

# Starting Line for max_size perf counter
self.sink_timer.start()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wdyt about moving this up the stack? Perhaps to

sink.process_batch(draining_status)

That way all targets, not just SQL, benefit from this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like it. The code has been moved up. Please let me know if this matches up with what you had in mind.

@BuzzCutNorman
Copy link
Contributor Author

BuzzCutNorman commented Aug 14, 2023

Thanks for your work on this @BuzzCutNorman!

I think this

@property
def is_full(self) -> bool:
"""Check against size limit.
Returns:
True if the sink needs to be drained.
"""
return self.current_size >= self.max_size

has to be updated to take the timer into account, wdyt?

@edgarrmondragon I tried to account for this in the following way. I reflect the timer batch size row changes in max_size by changing self.max_size to point to self.batch_size_rows not self.MAX_SIZE_DEFAULT

    @property
    def max_size(self) -> int:
        """Get max batch size.

        Returns:
            Max number of records to batch before `is_full=True`
        """
        return self.batch_size_rows

Then have _batch_size_rows grab the size from the target config or MAX_SIZE_DEFAULT during initialization.


        # Batch Performace Timer
        self._batch_size_rows: int = target.config.get(
            "batch_size_rows",
            self.MAX_SIZE_DEFAULT,
        )

The timer then updates self.batch_size_rows via the setter associated to it.

 sink.batch_size_rows = sink.sink_timer.counter_based_max_size()

@BuzzCutNorman
Copy link
Contributor Author

BuzzCutNorman commented Aug 29, 2023

I'm failing to parse this

When the Meltano Target configuration option batch_dynamic_management is set to True you are asking the Sink.sink_timer to find the maximum rows is full mark that keeps the time to fill a batch with records and write those records to the Target's target within the time in seconds given.

@edgarrmondragon I might be overselling what the option is accomplishing. When I look at the steps in the drain_one method they are as follows:

        draining_status = sink.start_drain()
        sink.process_batch(draining_status)
        sink._lap_manager()
        sink.mark_drained()

The _lap_manager starts the clock just after a process_batch finishes. The batch performance timer is running when the batch is filled, the drain is started, and the process batch is run which I thought triggered writing the batched records to the target system. Once those steps are done _lap_manager stops the clock, calls counter_based_max_size which uses the difference between the lap time and batch_wait_limit_in_seconds, to determines if a correction needs to be made to the is full marker, then starts the clock again.

@BuzzCutNorman
Copy link
Contributor Author

@edgarrmondragon if you would please review this PR again when you get a chance.

docs/implementation/target_batch_full.md Outdated Show resolved Hide resolved
docs/implementation/target_batch_full.md Outdated Show resolved Hide resolved
docs/implementation/target_batch_full.md Outdated Show resolved Hide resolved
docs/implementation/target_batch_full.md Outdated Show resolved Hide resolved
docs/implementation/target_batch_full.md Outdated Show resolved Hide resolved
docs/implementation/target_batch_full.md Outdated Show resolved Hide resolved
docs/implementation/target_batch_full.md Outdated Show resolved Hide resolved
docs/implementation/target_batch_full.md Outdated Show resolved Hide resolved
docs/implementation/target_batch_full.md Outdated Show resolved Hide resolved
Comment on lines +56 to +58
When the Meltano Target configuration option `batch_dynamic_management` is set to `True` you are asking the `Sink.sink_timer` to find the maximum rows is full mark that keeps the time to fill a batch with records and write those records to the Target's target within the time in seconds given.

The `Sink.sink_timer` is passed the given `batch_size_rows` or the `DEFAULT_MAX_SIZE` constant which is `10000` if it is `None` and is also passed the given `batch_wait_limit_seconds` if present or the `WAIT_LIMIT_SECONDS_DEFAULT` constant which is `30` if it is `None`. Internally the `rows` passed turns into `Sink.sink_timer.SINK_MAX_SIZE_CEILING` which is the max size a batch can reach. The `time` in `seconds` passed turns into `Sink.sink_timer.max_perf_counter` which is the time in seconds a full cycle should take. The attribute `Sink.sink_timer.sink_max_size` starts at a predefined size of `100`. During the `Target.drain_one(Sink)` process `Sink._lap_manager` is called and the timer method `counter_based_max_size` runs and checks if `Sink.sink_timer.perf_diff`, which is `max_perf_counter` - `lap_time`, is greater than `Sink.sink_timer.perf_diff_allowed_max` or less than `Sink.sink_timer.perf_diff_allowed_min`. If `Sink.sink_timer.perf_diff` is greater than `Sink.sink_timer.perf_diff_allowed_max` the `Sink.sink_timer.sink_max_size` is increased as long as the `Sink.sink_timer.sink_max_size` is less than `Sink.sink_timer.SINK_MAX_SIZE_CEILING`. If `Sink.sink_timer.perf_diff` is less than `Sink.sink_timer.perf_diff_allowed_min` the `Sink.sink_timer.sink_max_size` is reduced. If the `Sink.sink_timer.perf_diff` is between `Sink.sink_timer.perf_diff_allowed_max` and `Sink.sink_timer.perf_diff_allowed_min` no correction to `Sink.sink_timer.sink_max_size` is made since the optimal rows size has been reached. This process is repeated when each `Sink` is initialized and starts processing records.
Copy link
Collaborator

@edgarrmondragon edgarrmondragon Sep 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still finding this a bit confusing. I think we want the max batch size to have a default value, and the max wait time to default to None so that in the default case, only the current batch size is checked.

Essentially:

flowchart TD
    A[Process next row] --> B{current_size ><br/>max_size?}:::q
    B -- Yes --> C[Drain]:::drain
    B -- No ---> D{is max_wait_time<br/>set?}:::q
    D -- No ---> A
    D -- Yes --> F{elapsed ><br/>max_wait_time?}:::q
    F -- No ---> A
    F -- Yes --> C
    C --> G[Reset counter<br/>and timer]
    G --> A
    classDef q fill:#FFE666
    classDef drain fill:#FF6680
Loading

Does it make sense? I'm sorry, I feel like I'm missing something 🙏

Copy link
Contributor Author

@BuzzCutNorman BuzzCutNorman Sep 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The diagram makes sense. I will need to work through it again tomorrow. I think you feeling like you are missing something is a result of my poor writing. Below is the best flowchart for batch_dynamic_management I could make.

flowchart TD
    C1-->A2
    D2-->A3
    E3-->E2
    C3-->A4
    G4-->D3
    F2-->A1
    subgraph Target._process_record_message
    A1[Process next row] --> B1{is_full<p>_drain_function calls is_full_dynamic<p>sink.current_size > = sink_timer.sink_max_size}
    B1-->|True| C1[drain]
    B1-->|False| A1
   end
   subgraph Target.drain_one
   A2([start])-->B2[sink.start_drain]
   B2-->C2[sink.process_batch]
   C2-->D2[sink._lap_manager]
   D2~~~E2[sink.mark_drained]
   E2-->F2([end])
   end
   subgraph Sink.lap_manager
   A3([start])-->B3[sink_timer.stop]
   B3-->C3[sink_timer.counter_based_max_size]
   C3~~~D3[sink_timer.start]
   D3-->E3([end])  
   end
   subgraph sink_timer. counter_based_max_size
   A4([start])-->B4{perf_diff < self.perf_diff_allowed_min}
   B4-->|True| C4[correction = decrease amount]
   B4-->D4{perf_diff >= perf_diff_allowed_max<p>and<p>sink_max_size < SINK_MAX_SIZE_CEILING}
   D4-->|True| E4[correction = increase amount]
   D4-->F4[sink_max_size += correction]
   F4-->G4([end])
   end
Loading

Copy link
Contributor Author

@BuzzCutNorman BuzzCutNorman Sep 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the default case,

This would be when batch_dynamic_management is set to True in the target config and batch_size_rows and batch_wait_limit_seconds are not preset in the target config?

or

Are you meaning default as in none of the three target config settings batch_size_rows, batch_wait_limit_seconds, and batch_dynamic_management are present?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edgarrmondragon When you have a free moment would you please provide me clarification on what you see the "default case" to be. 🙏😃

@codspeed-hq
Copy link

codspeed-hq bot commented Oct 16, 2023

CodSpeed Performance Report

Merging #1876 will not alter performance

Comparing BuzzCutNorman:1626-configurable-batch-size-and-max-wait-limit-for-targets (49a64e6) with main (5734e69)

Summary

✅ 6 untouched benchmarks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

feat: Configurable batch size and max wait limit for targets
3 participants