Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add KafkaBatchPublisher #19

Merged
merged 12 commits into from
Oct 3, 2024
Merged

Conversation

wtfzambo
Copy link
Contributor

@wtfzambo wtfzambo commented Sep 24, 2024

In this PR

  • KafkaBatchPublisher class as requested
  • Related unit tests
  • Some refactoring of adjacent unit tests
  • Dev package chispa for easy handling of pyspark DataFrames equality assertions (see screenshot for example)
  • Clean up of Pytest logs so now they're readable

chispa output example
image

@wtfzambo wtfzambo linked an issue Sep 24, 2024 that may be closed by this pull request
@wtfzambo wtfzambo changed the title feat: add kafka-ui service so I can see what goes on feat: add KafkaBatchPublisher Sep 24, 2024
@wtfzambo wtfzambo force-pushed the feature/14-add-kafka-batch-publisher branch from 9c9d7f0 to c87f136 Compare September 25, 2024 15:44
@wtfzambo wtfzambo marked this pull request as ready for review September 25, 2024 15:50
@wtfzambo wtfzambo requested a review from shahinism September 25, 2024 15:50
pyproject.toml Outdated
@@ -70,7 +71,15 @@ source = [
skip_empty = true

[tool.pytest.ini_options]
addopts = "-vv --tb=auto --disable-warnings"
addopts = "-v --tb=short --disable-warnings -ra --no-header --show-capture=log"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a description to this configuration and why it's added. Also drop --disable-warnings.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I'm doing all this logging-related stuff because I use the terminal to run pytest. With default settings, the output is too cluttered with unnecessary information, making it painful to read.

  • vv was producing output too verbose with lots of unneeded information for testing purposes only
  • tb=short makes tracebacks for failed tests more concise: prints the failing line + related context instead of the whole damn function
  • ra prints a Recap at the end of the output. a stands for all tests except passed ones.
  • no-header reduces verbosity at the beginning of the output with little helpful info
  • show-capture=log This is because pytest, by default, captures EVERYTHING. Usually, this is not an issue until we work with libraries (spark) that produce A LOT of output which often we don't need for unit testing. This is captured and printed. On top of that, logging written by us gets printed twice, once in the captured stderr/out, once in the captured logs. The end result is that pytest output is very cluttered. By capturing log only, it's much cleaner without missing out useful info (especially logging of our own).

"""Initialize the KafkaBatchPublisher object.

Args:
kafka_options (dict[str, Any]): Kafka options for the connection.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With regards to your communication in Slack @wtfzambo I completely agree with you, this kind of configuration, is contradicting the whole purpose of with_config. So initially when we introduced it in the previous iteration, was we were taking in the kafka_options, as an argument list in the construct. If that's not necessary, why not just take in the KafkaConfig type here as the input to initialize the construct and drop with_config altogether?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Admittedly, I still have limited vision, but I'd also drop the with_config method.

Especially if KafkaReader and KafkaWriter are supposed to be made public to the user.

The only counter-aspect that I can think of is that passing the whole Config object is a lot more verbose than the current constructor method, which takes just the bare minimum of values to work with Kafka.

src/sparkle/writer/kafka_writer.py Outdated Show resolved Hide resolved
(
kafka_df.write.format("kafka")
.options(**self.kafka_options)
.option("topic", self.kafka_topic)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean we need to initialize one instance of this writer everytime we want to write to a topic? I think it's more flexible to get the topic name when we want to invoke write, that way, we can also get multiple topics, and just iterate if needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same in the other writer I copied from, and the other observation I had. I think I spoke to you about this on slack.

I said something like "the topic is set during the istantiation, so for each different topic we gotta create a different instance of the writer class?"

I kept it like this in the batch writer because it's not my design and IDK where we're going yet. I don't like to change other's designs until I know exactly what the route is.

tests/conftest.py Outdated Show resolved Hide resolved
def cleanup_logging_handlers():
"""Fixture to cleanup logging handlers after tests.

Prevents logging errors at the end of the report.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it causing any harm? If not I'd suggest to not implement it like this as it's just silencing any possible error in handlers, and with future updates might introduce unwanted behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it causing any harm?
To my eyes, yes. It's also related to pytest logs being cluttered.

This is a long standing known issue of pytest and a sufficiently deep rabbit hole: pytest-dev/pytest#5502 (comment).

There are some situations/tests (pardon the imprecision; I delved into this too many years ago) where pytest attempts to write to closed streams AFTER the session is over.

This results in error messages being printed at the very end of the pytest recap, which provide no useful information and clutter the output.

This fixture prevents this from happening, since it's scope is session it shouldn't tamper with anything test related.

def log_spark_dataframe(df: DataFrame, *, truncate: bool = False, name: str = "") -> None:
"""Logs the contents of a Spark DataFrame in tabular format.

Useful when Pytest is configured to capture only logs, so `df.show()` won't work.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it better to configure Pytest to not only capture logs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually yes, but see explanation above

Federico Zambelli and others added 12 commits October 3, 2024 10:22
Also:
- reshaped some data so that type checker stops complaining
Chispa doesn't have stubs or py.typed marker.
Mypy throws error for that. This seems unnecessarily strict for
a package that is just used for comparing dataframes in unit testing.

This commit disables the behavior for this specific package.
Co-authored-by: Reza Khanipour <[email protected]>
@wtfzambo wtfzambo force-pushed the feature/14-add-kafka-batch-publisher branch from 750c607 to f992d87 Compare October 3, 2024 08:49
@farbodahm farbodahm merged commit 91901f7 into main Oct 3, 2024
2 checks passed
@farbodahm farbodahm deleted the feature/14-add-kafka-batch-publisher branch October 3, 2024 09:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add Kafka batch publisher
3 participants