Skip to content

Commit

Permalink
#421 data/model lineage java17 upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
csun-cpointe committed Oct 23, 2024
1 parent 91857d9 commit 8cd088d
Show file tree
Hide file tree
Showing 11 changed files with 66 additions and 46 deletions.
3 changes: 3 additions & 0 deletions build-parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@
<version.helm.plugin>6.14.3</version.helm.plugin>
<version.helm.min>3.12.0</version.helm.min>
<version.snakeyaml>2.0</version.snakeyaml>

<!-- OpenLineage java -->
<version.open.lineage.java>1.23.0</version.open.lineage.java>
</properties>

<dependencyManagement>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,6 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-fault-tolerance</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-reactive-jackson</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-openapi</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client-reactive-jackson</artifactId>
Expand Down Expand Up @@ -96,9 +88,9 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.tomakehurst</groupId>
<artifactId>wiremock-jre8</artifactId>
<version>2.35.1</version>
<groupId>org.wiremock</groupId>
<artifactId>wiremock-standalone</artifactId>
<version>3.9.1</version>
<scope>test</scope>
</dependency>
<dependency>
Expand All @@ -109,7 +101,7 @@
<dependency>
<groupId>io.openlineage</groupId>
<artifactId>openlineage-java</artifactId>
<version>0.22.0</version>
<version>${version.open.lineage.java}</version>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM docker.io/nvidia/cuda:12.1.1-base-ubuntu22.04
FROM docker.io/nvidia/cuda:12.6.2-base-ubuntu22.04

LABEL org.opencontainers.image.source="https://github.com/boozallen/aissemble"

Expand All @@ -11,7 +11,7 @@ RUN chmod 755 /bin/uname

RUN apt-get update \
&& apt-get upgrade -y \
&& apt-get install -y openjdk-11-jdk \
&& apt-get install -y openjdk-17-jdk \
&& update-ca-certificates \
&& rm -rf /var/lib/apt/lists/* \
&& apt-get clean
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,16 @@
<artifactId>foundation-data-lineage-java</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</dependency>
<dependency>
<groupId>com.boozallen.aissemble</groupId>
<artifactId>foundation-messaging-java</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.openlineage</groupId>
<artifactId>openlineage-java</artifactId>
<version>${version.open.lineage.java}</version>
</dependency>

<!-- Test dependencies: -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,12 @@
<dependency>
<groupId>io.openlineage</groupId>
<artifactId>openlineage-java</artifactId>
<version>0.22.0</version>
<version>${version.open.lineage.java}</version>
</dependency>
<dependency>
<groupId>org.technologybrewery.krausening</groupId>
<artifactId>krausening</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</dependency>
<dependency>
<groupId>com.boozallen.aissemble</groupId>
<artifactId>foundation-messaging-java</artifactId>
Expand All @@ -73,6 +65,25 @@
<artifactId>foundation-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-provider</artifactId>
<version>${version.smallrye.reactive.messaging}</version>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-api</artifactId>
<version>${version.smallrye.reactive.messaging}</version>
</dependency>
<dependency>
<groupId>com.boozallen.aissemble</groupId>
<artifactId>foundation-core-java</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>


<!-- Test dependencies: -->
Expand All @@ -99,6 +110,18 @@
<version>${version.cucumber.reporting.plugin}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jboss.weld.se</groupId>
<artifactId>weld-se-core</artifactId>
<version>${version.weld}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jboss.weld</groupId>
<artifactId>weld-api</artifactId>
<version>5.0.SP3</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class DefaultDatasetFacet extends OpenLineage.DefaultDatasetFacet {
private final URI schemaUrl;

public DefaultDatasetFacet(String schemaUrl, URI producer) {
super(producer);
super(producer, null);
this.schemaUrl = URI.create(schemaUrl);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class DefaultJobFacet extends OpenLineage.DefaultJobFacet {
private final URI schemaUrl;

public DefaultJobFacet(String schemaUrl, URI producer) {
super(producer);
super(producer, null);
this.schemaUrl = URI.create(schemaUrl);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ public OpenLineage.RunEvent getOpenLineageRunEvent() {
});
}

return openLineage.newRunEvent(EventType.valueOf(this.eventType),
zTime,
return openLineage.newRunEvent(zTime, EventType.valueOf(this.eventType),
run.getOpenLineageRun(),
job.getOpenLineageJob(),
!olInputs.isEmpty() ? olInputs : null,
Expand Down
3 changes: 0 additions & 3 deletions foundation/foundation-mda/src/main/resources/profiles.json
Original file line number Diff line number Diff line change
Expand Up @@ -542,9 +542,6 @@
},
{
"name": "mlflowLineageProperties"
},
{
"name": "trainingGitkeepFile"
}
]
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
data.lineage.enabled=true
data.lineage.producer=${scmUrl}
data.lineage.producer=${scmUrl}
data.lineage.namespace=default
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,11 @@ class ${pipeline.capitalizedName}Base(ABC):
#if ($pipeline.trainingStep.isModelLineageEnabled())
# pylint: disable-next=assignment-from-none
event_data = self.create_base_lineage_event_data()
default_namespace = self.get_default_namespace()
#end
start = datetime.utcnow()
#if ($pipeline.trainingStep.isModelLineageEnabled())
self.record_lineage(self.create_lineage_start_event(run_id=run_id, job_name=job_name, parent_run_facet=parent_run_facet, event_data=event_data, start_time=start))
self.record_lineage(self.create_lineage_start_event(run_id=run_id, job_name=job_name, default_namespace=default_namespace, parent_run_facet=parent_run_facet, event_data=event_data, start_time=start))
#end
loaded_dataset = self.load_dataset()
prepped_dataset = self.prep_dataset(loaded_dataset)
Expand All @@ -186,12 +187,12 @@ class ${pipeline.capitalizedName}Base(ABC):
self.log_information(start, end, loaded_dataset, features)
self.logger.info('Complete')
#if ($pipeline.trainingStep.isModelLineageEnabled())
self.record_lineage(self.create_lineage_complete_event(run_id=run_id, job_name=job_name, parent_run_facet=parent_run_facet, event_data=event_data, start_time=start, end_time=end))
self.record_lineage(self.create_lineage_complete_event(run_id=run_id, job_name=job_name, default_namespace=default_namespace, parent_run_facet=parent_run_facet, event_data=event_data, start_time=start, end_time=end))

#end
except Exception as error:
#if ($pipeline.trainingStep.isModelLineageEnabled())
self.record_lineage(self.create_lineage_fail_event(run_id=run_id, job_name=job_name, event_data=event_data, parent_run_facet=parent_run_facet, start_time=start, end_time=datetime.now(), error=error))
self.record_lineage(self.create_lineage_fail_event(run_id=run_id, job_name=job_name, event_data=event_data, default_namespace=default_namespace, parent_run_facet=parent_run_facet, start_time=start, end_time=datetime.now(), error=error))
PipelineBase().record_pipeline_lineage_fail_event()
#end
raise Exception(error)
Expand Down Expand Up @@ -237,7 +238,7 @@ class ${pipeline.capitalizedName}Base(ABC):

return LineageEventData(job_facets=job_facets, run_facets=run_facets, event_inputs=[input_dataset])

def create_lineage_start_event(self, run_id: str = None, job_name: str = "", parent_run_facet: ParentRunFacet = None, event_data: LineageEventData = None, **kwargs) -> RunEvent:
def create_lineage_start_event(self, run_id: str = None, job_name: str = "", default_namespace:str = None, parent_run_facet: ParentRunFacet = None, event_data: LineageEventData = None, **kwargs) -> RunEvent:
"""
Creates the Start RunEvent with given uuid, parent run facet, job name, lineage data event or any input parameters
To customize the event, override the customize_lineage_start_event(...) function to include the job facets, run facets
Expand All @@ -253,6 +254,7 @@ class ${pipeline.capitalizedName}Base(ABC):
run_id=run_id,
parent_run_facet=parent_run_facet,
job_name=job_name,
default_namespace=default_namespace,
event_data=event_data)
event = self.customize_lineage_start_event(event, **kwargs)
return self.customize_run_event(event)
Expand All @@ -273,7 +275,7 @@ class ${pipeline.capitalizedName}Base(ABC):

return event

def create_lineage_complete_event(self, run_id: str = None, job_name: str = "", parent_run_facet: ParentRunFacet = None, event_data: LineageEventData = None, **kwargs) -> RunEvent:
def create_lineage_complete_event(self, run_id: str = None, job_name: str = "", default_namespace:str = None, parent_run_facet: ParentRunFacet = None, event_data: LineageEventData = None, **kwargs) -> RunEvent:
"""
Creates the Complete RunEvent with given uuid, parent run facet, job name, lineage data event or any input parameters
To customize the event, override the customize_lineage_complete_event(...) function to include the job facets, run facets
Expand All @@ -289,6 +291,7 @@ class ${pipeline.capitalizedName}Base(ABC):
run_id=run_id,
parent_run_facet=parent_run_facet,
job_name=job_name,
default_namespace=default_namespace,
event_data=event_data)
event = self.customize_lineage_complete_event(event, **kwargs)
return self.customize_run_event(event)
Expand All @@ -305,7 +308,7 @@ class ${pipeline.capitalizedName}Base(ABC):
event.run.facets.update(self.record_run_end(kwargs["start_time"], kwargs["end_time"]))
return event

def create_lineage_fail_event(self, run_id: str = None, job_name: str = "", parent_run_facet: ParentRunFacet = None, event_data: LineageEventData = None, **kwargs) -> RunEvent:
def create_lineage_fail_event(self, run_id: str = None, job_name: str = "", default_namespace:str = None, parent_run_facet: ParentRunFacet = None, event_data: LineageEventData = None, **kwargs) -> RunEvent:
"""
Creates the Fail RunEvent with given uuid, parent run facet, job name, lineage data event or any input parameters
To customize the event, override the customize_lineage_fail_event(...) function to include the job facets, run facets
Expand All @@ -321,6 +324,7 @@ class ${pipeline.capitalizedName}Base(ABC):
run_id=run_id,
parent_run_facet=parent_run_facet,
job_name=job_name,
default_namespace=default_namespace,
event_data=event_data)
event = self.customize_lineage_fail_event(event, **kwargs)
return self.customize_run_event(event)
Expand Down Expand Up @@ -377,6 +381,11 @@ class ${pipeline.capitalizedName}Base(ABC):
"""
return "${pipeline.capitalizedName}.${pipeline.trainingStep.name}"

def get_default_namespace(self) -> str:
"""
The default namespace is the Pipeline name. Override this function to change the default namespace.
"""
return "${pipeline.capitalizedName}"
#end

def set_dataset_origin(self, origin: str) -> None:
Expand Down

0 comments on commit 8cd088d

Please sign in to comment.