Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement aggregate on windows. #23

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft

Conversation

Wosin
Copy link
Contributor

@Wosin Wosin commented Feb 19, 2023

Attempt to implement aggregate on windows using Monoid for some simplicity.

I'm not too convinced about using the Monoid there so I'm happy to discuss that.

it("should apply aggregation based on Monoid") {
val env = FlinkExecutor.newEnv(parallelism = 1)
val stream = env.fromCollection((1 to 200).toList.map(_ => 1))
implicit val semigroup = new Monoid[Int] {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

implicit val monoid

typeInformation: TypeInformation[O]): DataStream[O] = {
val reducer = new AggregateFunction[T, A, O] {

override def createAccumulator(): A = Monoid.empty[A]
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though this sounds like a good idea, I have never used a Monoid in practice because Monoid.empty[A] lacks the context of a key (that's needed to do anything meaningly aggregation at scale).
If you find it useful though, happy to have it.

Copy link
Contributor Author

@Wosin Wosin Feb 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gonna remove that as it seems it doesn't make that much sense.

@ariskk
Copy link
Owner

ariskk commented Feb 21, 2023

sbt scalafmt is needed

@Wosin Wosin marked this pull request as draft February 23, 2023 23:20
@Wosin
Copy link
Contributor Author

Wosin commented Feb 23, 2023

Converting to Draft as I need to think a little bout how to model that best.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants