diff --git a/spec/OpenLineage.md b/spec/OpenLineage.md new file mode 100644 index 0000000000000..a806892012967 --- /dev/null +++ b/spec/OpenLineage.md @@ -0,0 +1,113 @@ +# OpenLineage Spec + +## Specification + +The specification for OpenLineage is formalized as an OpenAPI spec: [OpenLineage.yml](OpenLineage.yml) +published at: https://openlineage.github.io/ +It allows extensions to the spec using `Custom Facets` as described in this document. + +## Core concepts + +### Core Lineage Model + +![Open Lineage model](OpenLineageModel.svg) + +- **Run State Update**: and event describing an observed state of a job run. It is required to at least send one event for a START transition and a COMPLETE/FAIL/ABORT transition. Aditional events are optional. + +- **Job**: a process definition that consumes and produces datasets (defined as its inputs and outputs). It is identified by a unique name within a namespace (which is typicaly assigned to the scheduler starting the jobs). The *Job* evolves over time and this change is captured when the job runs. + +- **Dataset**: an abstract representation of data. It has a unique name within a namespace derived from its physical location (for example db.host.database.schema.table). Typicaly, a *Dataset* changes when a job writing to it completes. + +- **Run**: An instance of a running job with a start and completion (or failure) time. It is uniquely identified by an id relative to its job definition. + +- **Facet**: A piece of metadata attached to one of the entities defined above. + +example: +Here is an example of a simple start run event not adding any facet information: +``` +{ + "transition": "START", + "eventTime": "2020-12-09T23:37:31.081Z", + "run": { + "runId": "345", + }, + "job": { + "namespace": "my-scheduler-namespace", + "name": "myjob.mytask", + }, + "inputs": [ + { + "namespace": "my-datasource-namespace", + "name": "instance.schema.table", + } + ], + "outputs": [ + { + "namespace": "my-datasource-namespace", + "name": "instance.schema.output_table", + } + ], + "producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client" +} +``` + +### Lifecycle + +The OpenLineage API defines events to capture the lifecycle of a *Run* for a given *Job*. +When a *job* is being *run*, we capture metadata by sending run events when the state of the job transitions to a different state. +We might observe different aspects of the job run at different stages. This means that different metadata might be collected in each event during the lyfecycle of a run. +All metadata is additive. for example, if more inputs or outputs are detected as the job is running we might send additional events specifically for those datasets without re-emiting previously observed inputs or outputs. +Example: + - When the run starts, we collect the following Metadata: + - Run Id + - Job id + - transition: START + - event time + - source location and version (ex: git sha) + - If known: Job inputs and outputs. (input schema, ...) + - When the run completes: + - Run Id + - Job id + - transition: COMPLETE + - event time + - Output datasets schema (and other metadata). + + + +### Facets + +Facets are pieces of metadata that can be attached to the core entities: +- Run +- Job +- Dataset + +A facet is an atomic piece of metadata identified by its name. This means that emiting a new facet whith the same name for the same entity replaces the previous facet instance for that entity entirely). It is defined as a JSON object that can be either part of the spec or custom facets defined in a different project. + +Custom facets must use a distinct prefix named after the project defining them to avoid colision with standard facets defined in the [OpenLineage.yml](OpenLineage.yml) OpenAPI spec. +They have a schemaURL field pointing to the corresponding version of the facet schema (as a [$ref URL location](https://swagger.io/docs/specification/using-ref/) ). + +Example: https://github.com/OpenLineage/OpenLineage/blob/v1/spec/OpenLineage.yml#MyCustomJobFacet + +The versioned URL must be an immutable pointer to the version of the facet schema. For example, it should include a tag of a git sha and not a branch name. This should also be a canonical URL. There should be only one URL used for a given version of a schema. + +Custom facets can be promoted to the standard by including them in the spec. + +### Standard Facets + +#### Run Facets + +- **nominalTime**: Captures the time this run is scheduled for. This is a typical usage for time based scheduled job. The job has a nominal schedule time that will be different from the actual time it is running at. + +- **parent**: Captures the parent job and Run when the run was spawn from a parent run. For example in the case of Airflow, there's a run for the DAG that then spawns runs for individual tasks that would refer to the parent run as the DAG run. Similarly when a SparkOperator starts a Spark job, this creates a separate run that refers to the task run as its parent. + +#### Job Facets + +- **sourceCodeLocation**: Captures the source code location and version (example: git sha) of the job. + +- **sql**: Capture the SQL query if this job is a SQL query. + +#### Dataset Facets + +- **schema**: Captures the schema of the dataset + +- **dataSource**: Captures the Database instance containing this datasets (ex: Database schema. Object store bucket, ...) diff --git a/spec/OpenLineage.yml b/spec/OpenLineage.yml new file mode 100644 index 0000000000000..463eeeff10e97 --- /dev/null +++ b/spec/OpenLineage.yml @@ -0,0 +1,299 @@ +openapi: 3.0.2 +info: + title: OpenLineage + version: 0.0.1 + description: OpenLineage is an open source **lineage and metadata collection API** for the data ecosystem. + license: + name: Apache 2.0 + url: http://www.apache.org/licenses/LICENSE-2.0.html +paths: + /lineage: + post: + summary: Send an event related to the state of a run + description: Updates a run state for a job. + operationId: postRunStateUpdate + tags: + - OpenLineage + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/RunStateUpdate' + responses: + '200': + description: OK + +components: + schemas: + RunStateUpdate: + type: object + properties: + eventType: + description: the current transition of the run state. It is required to issue 1 START event and 1 of [ COMPLETE, ABORT, FAIL ] event per run. Additional events with OTHER eventType can be added to the same run. For example to send additional metadata after the run is complete + type: string + enum: [START, COMPLETE, ABORT, FAIL, OTHER] + example: 'START|COMPLETE|ABORT|FAIL|OTHER' + eventTime: + description: the time the event occured at + type: string + format: date-time + run: + $ref: '#/components/schemas/Run' + job: + $ref: '#/components/schemas/Job' + inputs: + description: The set of **input** datasets. + type: array + items: + $ref: '#/components/schemas/Dataset' + outputs: + description: The set of **output** datasets. + type: array + items: + $ref: '#/components/schemas/Dataset' + producer: + description: URI identifying the producer of this metadata. For example this could be a git url with a given tag or sha + type: string + format: uri + example: https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client + required: + - run + - job + - eventTime + - producer + + Run: + type: object + properties: + runId: + description: The id of the run, unique relative to the job + type: string + facets: + description: The run facets. + type: object + properties: + nominalTime: + $ref: '#/components/schemas/NominalTimeRunFacet' + parent: + $ref: '#/components/schemas/ParentRunFacet' + additionalProperties: + $ref: '#/components/schemas/CustomFacet' + required: + - runId + + Job: + type: object + properties: + namespace: + description: The namespace containing that job + type: string + example: "my-scheduler-namespace" + name: + description: The unique name for that job within that namespace + type: string + example: "myjob.mytask" + facets: + description: The job facets. + type: object + properties: + documentation: + $ref: '#/components/schemas/DocumentationJobFacet' + sourceCodeLocation: + $ref: '#/components/schemas/SourceCodeLocationJobFacet' + sql: + $ref: '#/components/schemas/SQLJobFacet' + additionalProperties: + $ref: '#/components/schemas/CustomFacet' + required: + - namespace + - name + + Dataset: + type: object + properties: + namespace: + description: The namespace containing that dataset + type: string + example: "my-datasource-namespace" + name: + description: The unique name for that dataset within that namespace + type: string + example: "instance.schema.table" + facets: + description: The facets for this dataset + type: object + properties: + documentation: + $ref: '#/components/schemas/DocumentationDatasetFacet' + schema: + $ref: '#/components/schemas/SchemaDatasetFacet' + dataSource: + $ref: '#/components/schemas/DatasourceDatasetFacet' + additionalProperties: + $ref: '#/components/schemas/CustomFacet' + required: + - namespace + - name + +# Base Facet + + BaseFacet: + description: all fields of the base facet are prefixed with _ to avoid name conflicts in facets + type: object + properties: + _producer: + description: URI identifying the producer of this metadata. For example this could be a git url with a given tag or sha + type: string + format: uri + example: https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client + _schemaURL: + description: The URL to the corresponding version of the schema definition following a $ref URL Reference (see https://swagger.io/docs/specification/using-ref/) + type: string + format: uri + example: https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/spec/OpenLineage.yml#MyCustomJobFacet + required: + - producer + - schemaURL + +# Custom facets + CustomFacet: + description: a Custom Facet is defined is a separate spec. field must not start with _ + allOf: + - $ref: '#/components/schemas/BaseFacet' + - type: object + additionalProperties: true + +## Run Facets + NominalTimeRunFacet: + allOf: + - $ref: '#/components/schemas/BaseFacet' + - type: object + properties: + nominalStartTime: + description: An [ISO-8601](https://en.wikipedia.org/wiki/ISO_8601) timestamp representing the nominal start time (included) of the run. AKA the schedule time + type: string + format: date-time + example: "2020-12-17T03:00:00.000Z" + nominalEndTime: + description: An [ISO-8601](https://en.wikipedia.org/wiki/ISO_8601) timestamp representing the nominal end time (excluded) of the run. (Should be the nominal start time of the next run) + type: string + format: date-time + example: "2020-12-17T04:00:00.000Z" + required: + - nominalStartTime + + ParentRunFacet: + description: the id of the parent run and job, iff this run was spawn from an other run (for example, the Dag run scheduling its tasks) + allOf: + - $ref: '#/components/schemas/BaseFacet' + - type: object + properties: + run: + type: object + properties: + runId: + type: string + format: uuid + required: + - runId + job: + properties: + namespace: + description: The namespace containing that job + type: string + example: "my-scheduler-namespace" + name: + description: The unique name for that job within that namespace + type: string + example: "myjob.mytask" + required: + - namespace + - name + required: + - run + - job + +## Job Facets + DocumentationJobFacet: + allOf: + - $ref: '#/components/schemas/BaseFacet' + - type: object + properties: + description: + description: The description of the job. + type: string + required: + - description + + SourceCodeLocationJobFacet: + allOf: + - $ref: '#/components/schemas/BaseFacet' + - type: object + properties: + type: + type: string + example: git + url: + type: string + format: uri + + SQLJobFacet: + allOf: + - $ref: '#/components/schemas/BaseFacet' + - type: object + properties: + query: + type: string + example: SELECT * FROM foo + required: + - query + +## Dataset facets + DocumentationDatasetFacet: + allOf: + - $ref: '#/components/schemas/BaseFacet' + - type: object + properties: + description: + description: The description of the dataset. + type: string + example: "canonical representation of entity Foo" + required: + - description + + SchemaDatasetFacet: + allOf: + - $ref: '#/components/schemas/BaseFacet' + - type: object + properties: + fields: + description: The fields of the table. + type: array + items: + type: object + properties: + name: + description: The name of the field. + type: string + example: column1 + type: + description: The type of the field. + type: string + example: VARCHAR|INT|... + description: + description: The description of the field. + type: string + required: + - name + - type + + DatasourceDatasetFacet: + allOf: + - $ref: '#/components/schemas/BaseFacet' + - type: object + properties: + name: + type: string + uri: + type: string + format: uri \ No newline at end of file diff --git a/spec/OpenLineageModel.svg b/spec/OpenLineageModel.svg new file mode 100644 index 0000000000000..6e9bf409e3fc8 --- /dev/null +++ b/spec/OpenLineageModel.svg @@ -0,0 +1 @@ + \ No newline at end of file