Skip to content

Commit

Permalink
chore(testing):[#358] Update testdata upload script to Asset v3 API
Browse files Browse the repository at this point in the history
  • Loading branch information
ds-jhartmann committed Apr 19, 2024
1 parent e47c5e5 commit 9d98a84
Showing 1 changed file with 69 additions and 64 deletions.
133 changes: 69 additions & 64 deletions local/testing/testdata/transform-and-upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,93 +16,98 @@ def create_submodel_payload(json_payload):

def create_edc_asset_payload(submodel_url_, asset_id_):
return json.dumps({
"@context": {},
"asset": {
"@type": "Asset",
"@id": f"{asset_id_}",
"properties": {
"description": "IRS EDC Test Asset"
}
"@context": edc_context(),
"@id": f"{asset_id_}",
"edc:properties": {
"edc:description": "IRS EDC Test Asset",
"edc:id": f"{asset_id_}",
},
"dataAddress": {
"@type": "DataAddress",
"type": "HttpData",
"baseUrl": f"{submodel_url_}",
"proxyPath": "true",
"proxyBody": "false",
"proxyMethod": "false",
"proxyQueryParams": "false"
"edc:dataAddress": {
"@type": "edc:DataAddress",
"edc:type": "HttpData",
"edc:baseUrl": f"{submodel_url_}",
"edc:proxyPath": "true",
"edc:proxyBody": "false",
"edc:proxyMethod": "false",
"edc:proxyQueryParams": "false"
}
})


def create_edc_registry_asset_payload(registry_url_, asset_prop_id_):
return json.dumps({
"@context": {},
"asset": {
"@type": "Asset",
"@id": f"{asset_prop_id_}", # DTR-EDC-instance-unique-ID
"properties": {
"type": "data.core.digitalTwinRegistry",
"description": "Digital Twin Registry Endpoint of IRS DEV"
}
"@context": edc_context(),
"@id": f"{asset_prop_id_}", # DTR-EDC-instance-unique-ID
"edc:properties": {
"edc:description": "Digital Twin Registry Endpoint of IRS DEV",
"edc:id": f"{asset_prop_id_}", # DTR-EDC-instance-unique-ID
"edc:type": "data.core.digitalTwinRegistry"
},
"dataAddress": {
"@type": "DataAddress",
"type": "HttpData",
"baseUrl": f"{registry_url_}",
"proxyPath": "true",
"proxyBody": "true",
"proxyMethod": "true",
"proxyQueryParams": "true"
"edc:dataAddress": {
"@type": "edc:DataAddress",
"edc:type": "HttpData",
"edc:baseUrl": f"{registry_url_}",
"edc:proxyPath": "true",
"edc:proxyMethod": "true",
"edc:proxyQueryParams": "true",
"edc:proxyBody": "true",
"edc:contentType": "application-json"
}
})


def edc_context():
return {
"dct": "https://purl.org/dc/terms/",
"tx": "https://w3id.org/tractusx/v0.0.1/ns/",
"edc": "https://w3id.org/edc/v0.0.1/ns/",
"dcat": "https://www.w3.org/ns/dcat/",
"odrl": "http://www.w3.org/ns/odrl/2/",
"dspace": "https://w3id.org/dspace/v0.8/"
}


def create_edc_notification_asset_payload(ess_url_, asset_id_, notification_type_):
return json.dumps({
"@context": {},
"asset": {
"@id": f"{asset_id_}",
"properties": {
"description": "ESS notification endpoint",
"contenttype": "application/json",
"notificationtype": f"{notification_type_}",
"notificationmethod": "receive"
}
"@context": edc_context(),
"@id": f"{asset_id_}",
"edc:properties": {
"edc:description": "ESS notification endpoint",
"edc:id": f"{asset_id_}",
"edc:notificationtype": f"{notification_type_}",
"edc:notificationmethod": "receive"
},
"dataAddress": {
"baseUrl": f"{ess_url_}",
"type": "HttpData",
"proxyBody": "true",
"proxyMethod": "true"
"edc:dataAddress": {
"@type": "edc:DataAddress",
"edc:type": "HttpData",
"edc:baseUrl": f"{ess_url_}",
"edc:proxyBody": "true",
"edc:proxyMethod": "true",
}
})


def create_esr_edc_asset_payload(esr_url_, asset_prop_id_, digital_twin_id_):
return json.dumps({
"asset": {
"properties": {
"asset:prop:id": asset_prop_id_,
"asset:prop:description": "product description",
"asset:prop:contenttype": "application/json"
}
"@context": edc_context(),
"@id": f"{asset_prop_id_}",
"edc:properties": {
"edc:description": "product description",
"edc:id": f"{asset_prop_id_}",
},
"dataAddress": {
"properties": {
"baseUrl": f"{esr_url_}/{digital_twin_id_}/asBuilt/ISO14001/submodel",
"type": "HttpData",
"proxyBody": True,
"proxyMethod": True
}
"edc:dataAddress": {
"@type": "edc:DataAddress",
"edc:type": "HttpData",
"edc:baseUrl": f"{esr_url_}/{digital_twin_id_}/asBuilt/ISO14001/submodel",
"edc:proxyBody": "true",
"edc:proxyMethod": "true",
}
})


def create_edc_contract_definition_payload(edc_policy_id_, asset_prop_id_):
return json.dumps({
"@context": {},
"@context": edc_context(),
"@type": "ContractDefinition",
"accessPolicyId": f"{edc_policy_id_}",
"contractPolicyId": f"{edc_policy_id_}",
Expand Down Expand Up @@ -476,7 +481,7 @@ def search_for_asset_in_catalog(edc_catalog_path_, edc_upload_url_, edc_url_, he

check_url_args(submodel_server_upload_urls, submodel_server_urls, edc_upload_urls, edc_urls, dataplane_urls)

edc_asset_path = "/management/v2/assets"
edc_asset_path = "/management/v3/assets"
edc_policy_path = "/management/v2/policydefinitions"
edc_contract_definition_path = "/management/v2/contractdefinitions"
edc_catalog_path = "/management/v2/catalog/request"
Expand Down Expand Up @@ -563,7 +568,7 @@ def search_for_asset_in_catalog(edc_catalog_path_, edc_upload_url_, edc_url_, he
" ",
"")
if "PartAsPlanned" in tmp_key:
specific_asset_ids_temp = copy(tmp_data[tmp_key][0]["localIdentifiers"])
# specific_asset_ids_temp = copy(tmp_data[tmp_key][0]["localIdentifiers"])
name_at_manufacturer = tmp_data[tmp_key][0]["partTypeInformation"]["nameAtManufacturer"].replace(
" ",
"")
Expand Down Expand Up @@ -635,9 +640,9 @@ def search_for_asset_in_catalog(edc_catalog_path_, edc_upload_url_, edc_url_, he
endpoint_address = f"{edc_url}/{catenax_id}-{submodel_identification}/submodel?content=value&extent=withBlobValue"

if is_aas3:
endpoint_address = f"{dataplane_url}{dataplane_public_path}/data/{submodel_identification}"
endpoint_address = f"{dataplane_url}{dataplane_public_path}/{submodel_identification}"
if is_ess and tmp_data["bpnl"] in bpnl_fail:
endpoint_address = f"http://idonotexist/{dataplane_public_path}/data/{submodel_identification}"
endpoint_address = f"http://idonotexist/{dataplane_public_path}/{submodel_identification}"
descriptor = create_submodel_descriptor_3_0(submodel_name, submodel_identification, semantic_id,
endpoint_address,
edc_asset_id,
Expand All @@ -657,7 +662,7 @@ def search_for_asset_in_catalog(edc_catalog_path_, edc_upload_url_, edc_url_, he
if tmp_data[tmp_key] != "":
payload = create_submodel_payload(tmp_data[tmp_key][0])
response = session.request(method="POST",
url=f"{submodel_upload_url}/data/{submodel_identification}",
url=f"{submodel_upload_url}/{submodel_identification}",
headers=headers, data=payload)
print_response(response)

Expand Down

0 comments on commit 9d98a84

Please sign in to comment.