Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Faster per-record processing #301

Merged
merged 4 commits into from
Jul 18, 2024
Merged

Conversation

aaronsteers
Copy link
Contributor

@aaronsteers aaronsteers commented Jul 18, 2024

This brings per-record parsing time down from approx. 20us to roughly 10us. We accomplish this by decoupling stream-level logic from record-level processing, meaning we build the logic handler for each stream at the start of the stream, and then reuse the handler's logic on those individual records.

image

Note:

  • This doesn't modify the operation to write to gzip file, which now is about equal with the parsing logic - roughly another 10-13us for each write to gzip file.
  • Since the gzip write is entirely one step performed by the gzip module, the only options to speed up are: (a) write more records at once instead of one at a time, (b) use another library if there is a faster option, (c) do both a+b but also just migrate to something else entirely like parquet.

Summary by CodeRabbit

  • New Features

    • Introduced StreamRecordHandler to enhance stream record processing capabilities.
  • Refactor

    • Updated several functions to use StreamRecordHandler instead of dictionaries for stream schemas, improving schema handling and processing efficiency.
  • Tests

    • Enhanced test cases to incorporate StreamRecordHandler with updated stream schema handling parameters.

Copy link

coderabbitai bot commented Jul 18, 2024

Walkthrough

Walkthrough

The recent changes focus on refactoring the code to utilize a new StreamRecordHandler class for managing stream records and handling JSON schemas more effectively. This involved updating function signatures to replace stream_schema dictionaries with StreamRecordHandler objects and adapting related imports and method calls across various modules and test files to support this new handler-based approach.

Changes

File(s) Change Summary
airbyte/_future_cdk/record_processor.py Updated to use StreamRecordHandler for stream schema handling, modifying import statements and function parameters accordingly.
airbyte/_future_cdk/sql_processor.py Similar changes as record_processor.py, replacing stream_schema with StreamRecordHandler in function signatures and imports.
airbyte/_processors/file/base.py Modified to replace stream_schema with StreamRecordHandler in process_record_message and updated imports.
airbyte/records.py Added StreamRecordHandler class and updated StreamRecord class methods to utilize this handler for schema and field management.
airbyte/sources/base.py Updated imports and added initialization of StreamRecordHandler for stream processing.
tests/unit_tests/test_text_normalization.py Added StreamRecordHandler to imports, refactored tests to use StreamRecordHandler for schema handling and adjusted test logic accordingly.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant RecordProcessor
    participant StreamRecordHandler
    participant StreamRecord

    Client->>RecordProcessor: Call process_record_message()
    RecordProcessor->>StreamRecordHandler: Initialize handler with schema
    RecordProcessor->>StreamRecord: Create StreamRecord with handler
    StreamRecord->>StreamRecordHandler: Validate and process record
    StreamRecordHandler-->>StreamRecord: Processed data
    StreamRecord-->>RecordProcessor: Processed StreamRecord
    RecordProcessor-->>Client: Return processed record
Loading
sequenceDiagram
    participant TestSuite
    participant TestCase
    participant StreamRecordHandler
    participant StreamRecord

    TestSuite->>TestCase: Run test cases
    TestCase->>StreamRecordHandler: Initialize handler with test schema
    TestCase->>StreamRecord: Create StreamRecord with handler
    StreamRecord->>StreamRecordHandler: Validate and process test record
    StreamRecordHandler-->>StreamRecord: Processed test data
    StreamRecord-->>TestCase: Return processed test record
    TestCase-->>TestSuite: Test pass/fail result
Loading

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

Share
Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai generate interesting stats about this repository and render them as a table.
    • @coderabbitai show all the console.log statements in this repository.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (invoked as PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Additionally, you can add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 0616d8a and 7f583d5.

Files selected for processing (5)
  • airbyte/_future_cdk/record_processor.py (3 hunks)
  • airbyte/_future_cdk/sql_processor.py (3 hunks)
  • airbyte/_processors/file/base.py (3 hunks)
  • airbyte/records.py (3 hunks)
  • airbyte/sources/base.py (3 hunks)
Additional comments not posted (20)
airbyte/_processors/file/base.py (3)

16-16: Import statement change is appropriate.

The addition of StreamRecordHandler is necessary for the new functionality.


144-144: Function signature change is appropriate.

The function now accepts stream_record_handler instead of stream_schema, aligning with the new object-oriented approach.


169-169: Ensure correct usage of stream_record_handler.

The stream_record_handler is correctly used in the StreamRecord.from_record_message call.

airbyte/_future_cdk/record_processor.py (5)

26-26: Import statement change is appropriate.

The addition of StreamRecordHandler is necessary for the new functionality.


160-160: Function signature change is appropriate.

The function now accepts stream_record_handler instead of stream_schema, aligning with the new object-oriented approach.


184-184: Variable replacement is appropriate.

The variable stream_schemas has been replaced with stream_record_handlers, aligning with the new object-oriented approach.


192-197: Ensure correct instantiation of StreamRecordHandler.

The StreamRecordHandler is correctly instantiated with the required parameters.


202-202: Ensure correct usage of stream_record_handler.

The stream_record_handler is correctly used in the process_record_message call.

airbyte/records.py (6)

77-77: Import statement change is appropriate.

The import statement now includes only NameNormalizerBase, which is necessary for the new functionality.


92-97: New class StreamRecordHandler is appropriate.

The class introduction is necessary for the new object-oriented approach.


99-145: Ensure correct implementation of StreamRecordHandler methods.

The methods in StreamRecordHandler are correctly implemented to handle key normalization and processing.


194-221: Ensure correct implementation of StreamRecord methods.

The __init__ method and other methods in StreamRecord are correctly updated to handle field processing.


228-243: Ensure correct usage of stream_record_handler in from_record_message.

The stream_record_handler is correctly used in the from_record_message method.


247-279: Ensure correct implementation of dictionary operations in StreamRecord.

The methods __getitem__, __setitem__, __delitem__, and __contains__ are correctly updated to handle key processing and dictionary operations.

airbyte/sources/base.py (3)

7-7: Import statement change is appropriate.

The addition of StreamRecordHandler is necessary for the new functionality.


456-461: Ensure correct instantiation of StreamRecordHandler.

The StreamRecordHandler is correctly instantiated with the required parameters.


466-466: Ensure correct usage of stream_record_handler.

The stream_record_handler is correctly used in the StreamRecord.from_record_message call.

airbyte/_future_cdk/sql_processor.py (3)

65-65: Import statement for StreamRecordHandler added.

The import statement for StreamRecordHandler is correctly added.


231-231: Function signature updated to use stream_record_handler.

The function signature is updated to replace stream_schema with stream_record_handler.


242-242: Function call updated to use stream_record_handler.

The function call to process_record_message is updated to use stream_record_handler.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 7f583d5 and ad2a44d.

Files selected for processing (4)
  • airbyte/_future_cdk/record_processor.py (3 hunks)
  • airbyte/records.py (2 hunks)
  • airbyte/sources/base.py (3 hunks)
  • tests/unit_tests/test_text_normalization.py (4 hunks)
Files skipped from review as they are similar to previous changes (3)
  • airbyte/_future_cdk/record_processor.py
  • airbyte/records.py
  • airbyte/sources/base.py
Additional comments not posted (7)
tests/unit_tests/test_text_normalization.py (7)

5-5: Import statement addition looks good.

The addition of StreamRecordHandler is consistent with the refactoring described.


8-17: Fixture addition looks good.

The stream_json_schema fixture is well-defined and provides a reusable JSON schema for the tests.


20-36: Function modification looks good.

The changes to test_record_columns_list are consistent with the refactoring, and the function correctly initializes and uses StreamRecordHandler.


53-60: Function modification looks good.

The changes to test_case_insensitive_dict are consistent with the refactoring, and the function correctly initializes and uses StreamRecordHandler with the specified parameters.


119-128: Function modification looks good.

The changes to test_case_insensitive_dict_w are consistent with the refactoring, and the function correctly initializes and uses StreamRecordHandler with the updated schema.


154-163: Function modification looks good.

The changes to test_case_insensitive_w_pretty_keys are consistent with the refactoring, and the function correctly initializes and uses StreamRecordHandler with normalize_keys=False.


216-220: Function adjustment looks good.

The changes to test_lower_case_normalizer improve readability without altering the logic.

@aaronsteers aaronsteers merged commit ce20b56 into main Jul 18, 2024
19 checks passed
@aaronsteers aaronsteers deleted the feat/faster-record-processing branch July 18, 2024 19:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant