-
Notifications
You must be signed in to change notification settings - Fork 320
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support streaming jobs in Marquez #2682
Conversation
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## main #2682 +/- ##
============================================
+ Coverage 84.15% 84.24% +0.09%
- Complexity 1390 1405 +15
============================================
Files 249 249
Lines 6322 6371 +49
Branches 286 291 +5
============================================
+ Hits 5320 5367 +47
- Misses 850 851 +1
- Partials 152 153 +1 ☔ View full report in Codecov by Sentry. |
d7166cc
to
17810f2
Compare
8e55ff4
to
b4590d0
Compare
0f4c077
to
6008834
Compare
8efc020
to
2ff9048
Compare
17810f2
to
c00b988
Compare
e90d7ec
to
9d2eea8
Compare
* | ||
* <p>In this case, a job version is created based on the list of input and output datasets | ||
* referenced by this job. If a job starts with inputs:{A,B} and outputs:{C}, new job version is | ||
* created immediately at job start. If a following event produces inputs:{A}, outputs:{C}, then |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a job has {A,B}
as initial inputs, the only {A}
shouldn't that be a new job version? Given that all inputs/outputs are expected when a job run has been started, we should create a job version anytime the inputs or outputs change and associated the run with the new job version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Existing algorithm to compute version of the job relies on all the inputs & datasets for the particular run. We should not modify it, as this would cause new job version for all the existing jobs, but we could create a separate version to evaluate version of the streaming job if we wanted.
However, when looking into the approach, I found it useful. It's a cumulative approach, where new job version is created if a new input/output dataset is involved in processing. If some dataset was included in the past events, but is no longer present, the version does not get change.
The benefit of this assumption is that we don't require producer to emit all the datasets all the time. If you emit amount of bytes written into output dataset, without containing input dataset in the event, it doesn't mean there is new job version without the inputs.
Is it OK?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do feel if a stream is removed, you'll want to remove that edge from the current lineage graph. But, I also understand the limitations here, as you mentioned, with bytes written to an output dataset present with no input datasets specified; that said, and after thinking it through, this may be a noop and the logic you have is reasonable.
For example, let's say we have a streaming job X
with inputs {A,B}
and outputs C
. The job runs and is marked as RUNNING
with the run ID 74f117af-eb90-4ffd-98e1-c1bc215934df
. To change the inputs from {A,B}
to {B}
(or similarly for the outputs), the user will have to redeploy the job (with new code possibly) and therefore be associated with a new run ID. So, what I think you have is logically correct given how streaming jobs are deployed. For batch jobs, versioning is more straight forward as we version from run-to-run.
api/src/test/java/marquez/service/OpenLineageServiceIntegrationTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We finally have support for streaming jobs! 💯 💯 💯
c00b988
to
a8cdbe0
Compare
Signed-off-by: Pawel Leszczynski <[email protected]>
Signed-off-by: Pawel Leszczynski <[email protected]>
9d2eea8
to
93922db
Compare
✅ Deploy Preview for peppy-sprite-186812 canceled.
|
Signed-off-by: Pawel Leszczynski <[email protected]>
93922db
to
a4e6fe7
Compare
Problem
Currently,
job_version
is created once the job finishes. This assumption no longer holds for streaming jobs which can run for days or weeks while continuously writing output datasets.It makes sense in this case, to create job version and identify input/output datasets at the beginning of the job and modify it on the fly if a job changes.
In particular, a
lineage
endpoint shall include streaming jobs in lineage graph as they're running and datasets being written by streaming jobs should be available (and have dataset version modified) once the job is started.Solution
JobTypeJobFacet
to determine if a job isbatch
orstreaming
jobs
table.One-line summary:
Checklist
CHANGELOG.md
(Depending on the change, this may not be necessary)..sql
database schema migration according to Flyway's naming convention (if relevant)