Skip to content
This repository has been archived by the owner on Dec 12, 2024. It is now read-only.

Commit

Permalink
starts using PipesMetadataValue
Browse files Browse the repository at this point in the history
  • Loading branch information
marijncv committed Dec 8, 2024
1 parent 5a083ac commit 9cef9b8
Show file tree
Hide file tree
Showing 4 changed files with 274 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -1,18 +1,36 @@
use dagster_pipes_rust::{open_dagster_pipes, AssetCheckSeverity, DagsterPipesError};
use dagster_pipes_rust::{
open_dagster_pipes, AssetCheckSeverity, DagsterPipesError
};
use dagster_pipes_rust::types::{PipesMetadataValue, RawValue, Type};
use serde_json::json;

use std::collections::HashMap;

fn main() -> Result<(), DagsterPipesError> {
let mut context = open_dagster_pipes()?;
// See supported metadata types here:
// https://github.com/dagster-io/dagster/blob/master/python_modules/dagster/dagster/_core/pipes/context.py#L133
let metadata = json!({"row_count": {"raw_value": 100, "type": "int"}});
context.report_asset_materialization("example_rust_subprocess_asset", metadata);

let asset_metadata = HashMap::from([(
"row_count".to_string(),
PipesMetadataValue {
raw_value: Some(RawValue::Integer(100)),
pipes_metadata_value_type: Some(Type::Int),
},
)]);
context.report_asset_materialization("example_rust_subprocess_asset", asset_metadata);

let check_metadata = HashMap::from([(
"quality".to_string(),
PipesMetadataValue{
raw_value: Some(RawValue::Integer(100)),
pipes_metadata_value_type: Some(Type::Int),
},
)]);
context.report_asset_check(
"example_rust_subprocess_check",
true,
"example_rust_subprocess_asset",
AssetCheckSeverity::Warn,
json!({"quality": {"raw_value": 5, "type": "int"}}),
&AssetCheckSeverity::Warn,
check_metadata,
);
Ok(())
}
26 changes: 11 additions & 15 deletions jsonschema/pipes/PipesMetadataValue.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,17 @@
]
},
"raw_value": {
"anyOf": [
{ "type": "integer" },
{ "type": "number" },
{ "type": "string" },
{
"type": "object",
"additionalProperties": true
},
{
"type": "array",
"items": {}
},
{ "type": "boolean" },
{ "type": "null" }
]
"type": [
"integer",
"number",
"string",
"object",
"array",
"boolean",
"null"
],
"additionalProperties": true,
"items": {}
}
}
}
242 changes: 236 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
mod context_loader;
mod params_loader;
mod types;
pub mod types;

use std::collections::HashMap;
use std::fs::OpenOptions;
Expand All @@ -17,7 +17,7 @@ use crate::context_loader::DefaultLoader as PipesDefaultContextLoader;
pub use crate::context_loader::LoadContext;
use crate::params_loader::EnvVarLoader as PipesEnvVarParamsLoader;
pub use crate::params_loader::LoadParams;
pub use crate::types::{Method, PipesContextData, PipesMessage};
pub use crate::types::{Method, PipesContextData, PipesMessage, PipesMetadataValue};

#[derive(Serialize)]
#[serde(rename_all = "UPPERCASE")]
Expand All @@ -35,10 +35,14 @@ pub struct PipesContext {
}

impl PipesContext {
pub fn report_asset_materialization(&mut self, asset_key: &str, metadata: serde_json::Value) {
pub fn report_asset_materialization(
&mut self,
asset_key: &str,
metadata: HashMap<String, PipesMetadataValue>,
) {
let params: HashMap<String, Option<serde_json::Value>> = HashMap::from([
("asset_key".to_string(), Some(json!(asset_key))),
("metadata".to_string(), Some(metadata)),
("metadata".to_string(), Some(json!(metadata))),
("data_version".to_string(), None), // TODO - support data versions
]);

Expand All @@ -56,14 +60,14 @@ impl PipesContext {
passed: bool,
asset_key: &str,
severity: &AssetCheckSeverity,
metadata: serde_json::Value,
metadata: HashMap<String, PipesMetadataValue>,
) {
let params: HashMap<String, Option<serde_json::Value>> = HashMap::from([
("asset_key".to_string(), Some(json!(asset_key))),
("check_name".to_string(), Some(json!(check_name))),
("passed".to_string(), Some(json!(passed))),
("severity".to_string(), Some(json!(severity))),
("metadata".to_string(), Some(metadata)),
("metadata".to_string(), Some(json!(metadata))),
]);

let msg = PipesMessage {
Expand Down Expand Up @@ -134,3 +138,229 @@ pub fn open_dagster_pipes() -> Result<PipesContext, DagsterPipesError> {
writer: PipesFileMessageWriter { path },
})
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::fs;
use tempfile::NamedTempFile;

use super::*;

#[test]
fn test_write_pipes_metadata() {
let file = NamedTempFile::new().unwrap();
let path = file.path().to_str().unwrap().to_string();

let writer = PipesFileMessageWriter { path };
let asset_metadata = HashMap::from([
(
"text".to_string(),
PipesMetadataValue {
raw_value: Some(types::RawValue::String("hello".to_string())),
pipes_metadata_value_type: Some(types::Type::Text),
},
),
(
"url".to_string(),
PipesMetadataValue {
raw_value: Some(types::RawValue::String("http://someurl.com".to_string())),
pipes_metadata_value_type: Some(types::Type::Url),
},
),
(
"path".to_string(),
PipesMetadataValue {
raw_value: Some(types::RawValue::String("file://some/path".to_string())),
pipes_metadata_value_type: Some(types::Type::Path),
},
),
(
"notebook".to_string(),
PipesMetadataValue {
raw_value: Some(types::RawValue::String("notebook".to_string())),
pipes_metadata_value_type: Some(types::Type::Notebook),
},
),
(
"json_object".to_string(),
PipesMetadataValue {
raw_value: Some(types::RawValue::AnythingMap(HashMap::from([(
"key".to_string(),
Some(json!("value")),
)]))),
pipes_metadata_value_type: Some(types::Type::Json),
},
),
(
"json_array".to_string(),
PipesMetadataValue {
raw_value: Some(types::RawValue::AnythingArray(vec![Some(
json!({"key": "value"}),
)])),
pipes_metadata_value_type: Some(types::Type::Json),
},
),
(
"md".to_string(),
PipesMetadataValue {
raw_value: Some(types::RawValue::String("## markdown".to_string())),
pipes_metadata_value_type: Some(types::Type::Md),
},
),
(
"dagster_run".to_string(),
PipesMetadataValue {
raw_value: Some(types::RawValue::String("1234".to_string())),
pipes_metadata_value_type: Some(types::Type::DagsterRun),
},
),
(
"asset".to_string(),
PipesMetadataValue {
raw_value: Some(types::RawValue::String("some_asset".to_string())),
pipes_metadata_value_type: Some(types::Type::Asset),
},
),
(
"job".to_string(),
PipesMetadataValue {
raw_value: Some(types::RawValue::String("some_job".to_string())),
pipes_metadata_value_type: Some(types::Type::Job),
},
),
(
"timestamp".to_string(),
PipesMetadataValue {
raw_value: Some(types::RawValue::String(
"2012-04-23T18:25:43.511Z".to_string(),
)),
pipes_metadata_value_type: Some(types::Type::Timestamp),
},
),
(
"int".to_string(),
PipesMetadataValue {
raw_value: Some(types::RawValue::Integer(100)),
pipes_metadata_value_type: Some(types::Type::Int),
},
),
(
"float".to_string(),
PipesMetadataValue {
raw_value: Some(types::RawValue::Double(100.0)),
pipes_metadata_value_type: Some(types::Type::Float),
},
),
(
"bool".to_string(),
PipesMetadataValue {
raw_value: Some(types::RawValue::Bool(true)),
pipes_metadata_value_type: Some(types::Type::Bool),
},
),
(
"none".to_string(),
PipesMetadataValue {
raw_value: None,
pipes_metadata_value_type: None,
},
),
]);

let mut context = PipesContext {
data: PipesContextData {
asset_keys: Some(vec!["asset1".to_string()]),
code_version_by_asset_key: None,
extras: None,
job_name: None,
partition_key: None,
partition_key_range: None,
partition_time_window: None,
provenance_by_asset_key: None,
retry_number: 0,
run_id: "012345".to_string(),
},
writer,
};
context.report_asset_materialization("asset1", asset_metadata);

assert_eq!(
serde_json::from_str::<PipesMessage>(&fs::read_to_string(file.path()).unwrap())
.unwrap(),
PipesMessage {
dagster_pipes_version: "0.1".to_string(),
method: Method::ReportAssetMaterialization,
params: Some(HashMap::from([
("asset_key".to_string(), Some(json!("asset1"))),
(
"metadata".to_string(),
Some(json!({
"text": {
"raw_value": "hello",
"type": "text"
},
"url": {
"raw_value": "http://someurl.com",
"type": "url"
},
"path": {
"raw_value": "file://some/path",
"type": "path"
},
"notebook": {
"raw_value": "notebook",
"type": "notebook"
},
"json_object": {
"raw_value": {"key": "value"},
"type": "json"
},
"json_array": {
"raw_value": [{"key": "value"}],
"type": "json"
},
"md": {
"raw_value": "## markdown",
"type": "md"
},
"dagster_run": {
"raw_value": "1234",
"type": "dagster_run"
},
"asset": {
"raw_value": "some_asset",
"type": "asset"
},
"job": {
"raw_value": "some_job",
"type": "job"
},
"timestamp": {
"raw_value": "2012-04-23T18:25:43.511Z",
"type": "timestamp"
},
"int": {
"raw_value": 100,
"type": "int"
},
"float": {
"raw_value": 100.0,
"type": "float"
},
"bool": {
"raw_value": true,
"type": "bool"
},
"none": {
"raw_value": null,
"type": null
}
}))
),
("data_version".to_string(), None),
])),
}
);
}
}
2 changes: 2 additions & 0 deletions src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,5 +199,7 @@ pub enum RawValue {

Double(f64),

Integer(i64),

String(String),
}

0 comments on commit 9cef9b8

Please sign in to comment.