diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocode/DataMigrationStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocode/DataMigrationStep.java index eb5ad3e307a58..043f2f1e473db 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocode/DataMigrationStep.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocode/DataMigrationStep.java @@ -17,7 +17,6 @@ import com.linkedin.metadata.entity.ebean.EbeanAspectV1; import com.linkedin.metadata.entity.ebean.EbeanAspectV2; import com.linkedin.metadata.models.EntitySpec; -import com.linkedin.metadata.search.utils.BrowsePathUtils; import io.ebean.EbeanServer; import io.ebean.PagedList; import java.net.URISyntaxException; @@ -151,7 +150,7 @@ public Function executable() { // Emit a browse path aspect. final BrowsePaths browsePaths; try { - browsePaths = BrowsePathUtils.buildBrowsePath(urn, _entityService.getEntityRegistry()); + browsePaths = _entityService.buildDefaultBrowsePath(urn); final AuditStamp browsePathsStamp = new AuditStamp(); browsePathsStamp.setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)); diff --git a/docker/datahub-gms/env/docker-without-neo4j.env b/docker/datahub-gms/env/docker-without-neo4j.env index 1dbe5f4614439..45b8e8c5eda54 100644 --- a/docker/datahub-gms/env/docker-without-neo4j.env +++ b/docker/datahub-gms/env/docker-without-neo4j.env @@ -43,3 +43,7 @@ ENTITY_SERVICE_ENABLE_RETENTION=true # set ELASTICSEARCH_USE_SSL=true and uncomment: # ELASTICSEARCH_USERNAME= # ELASTICSEARCH_PASSWORD= + +# Uncomment to run a one-time upgrade to migrate legacy default browse path format to latest format +# More details can be found at https://datahubproject.io/docs/advanced/browse-paths-upgrade +# UPGRADE_DEFAULT_BROWSE_PATHS_ENABLED=true \ No newline at end of file diff --git a/docker/datahub-gms/env/docker.env b/docker/datahub-gms/env/docker.env index f768a5e4ec7e7..1b859aa59b144 100644 --- a/docker/datahub-gms/env/docker.env +++ b/docker/datahub-gms/env/docker.env @@ -72,3 +72,7 @@ UI_INGESTION_DEFAULT_CLI_VERSION=0.8.42 # Uncomment to increase concurrency across Kafka consumers # KAFKA_LISTENER_CONCURRENCY=2 + +# Uncomment to run a one-time upgrade to migrate legacy default browse path format to latest format +# More details can be found at https://datahubproject.io/docs/advanced/browse-paths-upgrade +# UPGRADE_DEFAULT_BROWSE_PATHS_ENABLED=true diff --git a/docs-website/sidebars.js b/docs-website/sidebars.js index f4ad3fb2f7409..4dcabe7937bf4 100644 --- a/docs-website/sidebars.js +++ b/docs-website/sidebars.js @@ -433,6 +433,7 @@ module.exports = { "metadata-ingestion/adding-source", "docs/how/add-custom-ingestion-source", "docs/how/add-custom-data-platform", + "docs/advanced/browse-paths-upgrade", ], }, ], diff --git a/docs/advanced/browse-paths-upgrade.md b/docs/advanced/browse-paths-upgrade.md new file mode 100644 index 0000000000000..e440a35c3af46 --- /dev/null +++ b/docs/advanced/browse-paths-upgrade.md @@ -0,0 +1,137 @@ +# Browse Paths Upgrade (August 2022) + +## Background + +Up to this point, there's been a historical constraint on all entity browse paths. Namely, each browse path has been +required to end with a path component that represents "simple name" for an entity. For example, a Browse Path for a +Snowflake Table called "test_table" may look something like this: + +``` +/prod/snowflake/warehouse1/db1/test_table +``` + +In the UI, we artificially truncate the final path component when you are browsing the Entity hierarchy, so your browse experience +would be: + +`prod` > `snowflake` > `warehouse1`> `db1` > `Click Entity` + +As you can see, the final path component `test_table` is effectively ignored. It could have any value, and we would still ignore +it in the UI. This behavior serves as a workaround to the historical requirement that all browse paths end with a simple name. + +This data constraint stands in opposition the original intention of Browse Paths: to provide a simple mechanism for organizing +assets into a hierarchical folder structure. For this reason, we've changed the semantics of Browse Paths to better align with the original intention. +Going forward, you will not be required to provide a final component detailing the "name". Instead, you will be able to provide a simpler path that +omits this final component: + +``` +/prod/snowflake/warehouse1/db1 +``` + +and the browse experience from the UI will continue to work as you would expect: + +`prod` > `snowflake` > `warehouse1`> `db1` > `Click Entity`. + +With this change comes a fix to a longstanding bug where multiple browse paths could not be attached to a single URN. Going forward, +we will support producing multiple browse paths for the same entity, and allow you to traverse via multiple paths. For example + +```python +browse_path = BrowsePathsClass( + paths=["/powerbi/my/custom/path", "/my/other/custom/path"] +) +return MetadataChangeProposalWrapper( + entityType="dataset", + changeType="UPSERT", + entityUrn="urn:li:dataset:(urn:li:dataPlatform:custom,MyFileName,PROD), + aspectName="browsePaths", + aspect=browse_path, +) +``` +*Using the Python Emitter SDK to produce multiple Browse Paths for the same entity* + +We've received multiple bug reports, such as [this issue](https://github.com/datahub-project/datahub/issues/5525), and requests to address these issues with Browse, and thus are deciding +to do it now before more workarounds are created. + +## What this means for you + +Once you upgrade to DataHub `v0.8.45` you will immediately notice that traversing your Browse Path hierarchy will require +one extra click to find the entity. This is because we are correctly displaying the FULL browse path, including the simple name mentioned above. + +There will be 2 ways to upgrade to the new browse path format. Depending on your ingestion sources, you may want to use one or both: + +1. Migrate default browse paths to the new format by restarting DataHub +2. Upgrade your version of the `datahub` CLI to push new browse path format (version `v0.8.45`) + +Each step will be discussed in detail below. + +### 1. Migrating default browse paths to the new format + +To migrate those Browse Paths that are generated by DataHub by default (when no path is provided), simply restart the `datahub-gms` container / pod with a single +additional environment variable: + +``` +UPGRADE_DEFAULT_BROWSE_PATHS_ENABLED=true +``` + +And restart the `datahub-gms` instance. This will cause GMS to perform a boot-time migration of all your existing Browse Paths +to the new format, removing the unnecessarily name component at the very end. + +If the migration is successful, you'll see the following in your GMS logs: + +``` +18:58:17.414 [main] INFO c.l.m.b.s.UpgradeDefaultBrowsePathsStep:60 - Successfully upgraded all browse paths! +``` + +After this one-time migration is complete, you should be able to navigate the Browse hierarchy exactly as you did previously. + +> Note that certain ingestion sources actively produce their own Browse Paths, which overrides the default path +> computed by DataHub. +> +> In these cases, getting the updated Browse Path will require re-running your ingestion process with the updated +> version of the connector. This is discussed in more detail in the next section. + +### 2. Upgrading the `datahub` CLI to push new browse paths + +If you are actively ingesting metadata from one or more of following sources + +1. Sagemaker +2. Looker / LookML +3. Feast +4. Kafka +5. Mode +6. PowerBi +7. Pulsar +8. Tableau +9. Business Glossary + +You will need to upgrade the DataHub CLI to >= `v0.8.45` and re-run metadata ingestion. This will generate the new browse path format +and overwrite the existing paths for entities that were extracted from these sources. + +### If you are producing custom Browse Paths + +If you've decided to produce your own custom Browse Paths to organize your assets (e.g. via the Python Emitter SDK), you'll want to change the code to produce those paths +to truncate the final path component. For example, if you were previously emitting a browse path like this: + +``` +"my/custom/browse/path/suffix" +``` + +You can simply remove the final "suffix" piece: + +``` +"my/custom/browse/path" +``` + +Your users will be able to find the entity by traversing through these folders in the UI: + +`my` > `custom` > `browse`> `path` > `Click Entity`. + + +> Note that if you are using the Browse Path Transformer you *will* be impacted in the same way. It is recommended that you revisit the +> paths that you are producing, and update them to the new format. + +## Support + +The Acryl team will be on standby to assist you in your migration. Please +join [#release-0_8_0](https://datahubspace.slack.com/archives/C0244FHMHJQ) channel and reach out to us if you find +trouble with the upgrade or have feedback on the process. We will work closely to make sure you can continue to operate +DataHub smoothly. diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index 66aeb6699b4c5..c81e72be8c126 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -6,6 +6,9 @@ This file documents any backwards-incompatible changes in DataHub and assists pe ### Breaking Changes +- Browse Paths have been upgraded to a new format to align more closely with the intention of the feature. + Learn more about the changes, including steps on upgrading, here: https://datahubproject.io/docs/advanced/browse-paths-upgrade + ### Potential Downtime ### Deprecations diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityService.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityService.java index 5846d87d4d56b..e6ac8d10e2137 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityService.java @@ -19,7 +19,9 @@ import com.linkedin.data.schema.TyperefDataSchema; import com.linkedin.data.template.DataTemplateUtil; import com.linkedin.data.template.RecordTemplate; +import com.linkedin.data.template.StringArray; import com.linkedin.data.template.UnionTemplate; +import com.linkedin.dataplatform.DataPlatformInfo; import com.linkedin.entity.AspectType; import com.linkedin.entity.Entity; import com.linkedin.entity.EntityResponse; @@ -38,7 +40,6 @@ import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.query.ListUrnsResult; import com.linkedin.metadata.run.AspectRowSummary; -import com.linkedin.metadata.search.utils.BrowsePathUtils; import com.linkedin.metadata.snapshot.Snapshot; import com.linkedin.metadata.utils.DataPlatformInstanceUtils; import com.linkedin.metadata.utils.EntityKeyUtils; @@ -73,6 +74,7 @@ import lombok.extern.slf4j.Slf4j; import static com.linkedin.metadata.Constants.*; +import static com.linkedin.metadata.search.utils.BrowsePathUtils.*; import static com.linkedin.metadata.utils.PegasusUtils.*; @@ -142,7 +144,6 @@ public static class IngestProposalResult { public static final String BROWSE_PATHS = "browsePaths"; public static final String DATA_PLATFORM_INSTANCE = "dataPlatformInstance"; protected static final int MAX_KEYS_PER_QUERY = 500; - public static final String STATUS = "status"; public EntityService( @Nonnull final AspectDao aspectDao, @@ -1180,10 +1181,8 @@ public List> generateDefaultAspectsIfMissing(@Nonnu if (shouldCheckBrowsePath && latestAspects.get(BROWSE_PATHS) == null) { try { - BrowsePaths generatedBrowsePath = BrowsePathUtils.buildBrowsePath(urn, getEntityRegistry()); - if (generatedBrowsePath != null) { - aspects.add(Pair.of(BROWSE_PATHS, generatedBrowsePath)); - } + BrowsePaths generatedBrowsePath = buildDefaultBrowsePath(urn); + aspects.add(Pair.of(BROWSE_PATHS, generatedBrowsePath)); } catch (URISyntaxException e) { log.error("Failed to parse urn: {}", urn); } @@ -1762,4 +1761,54 @@ private RecordTemplate updateAspect( return newValue; } + + /** + * Builds the default browse path aspects for a subset of well-supported entities. + * + * This method currently supports datasets, charts, dashboards, data flows, data jobs, and glossary terms. + */ + @Nonnull + public BrowsePaths buildDefaultBrowsePath(final @Nonnull Urn urn) throws URISyntaxException { + Character dataPlatformDelimiter = getDataPlatformDelimiter(urn); + String defaultBrowsePath = getDefaultBrowsePath(urn, this.getEntityRegistry(), dataPlatformDelimiter); + StringArray browsePaths = new StringArray(); + browsePaths.add(defaultBrowsePath); + BrowsePaths browsePathAspect = new BrowsePaths(); + browsePathAspect.setPaths(browsePaths); + return browsePathAspect; + } + + /** + * Returns a delimiter on which the name of an asset may be split. + */ + private Character getDataPlatformDelimiter(Urn urn) { + // Attempt to construct the appropriate Data Platform URN + Urn dataPlatformUrn = buildDataPlatformUrn(urn, this.getEntityRegistry()); + if (dataPlatformUrn != null) { + // Attempt to resolve the delimiter from Data Platform Info + DataPlatformInfo dataPlatformInfo = getDataPlatformInfo(dataPlatformUrn); + if (dataPlatformInfo != null && dataPlatformInfo.hasDatasetNameDelimiter()) { + return dataPlatformInfo.getDatasetNameDelimiter().charAt(0); + } + } + // Else, fallback to a default delimiter (period) if one cannot be resolved. + return '.'; + } + + @Nullable + private DataPlatformInfo getDataPlatformInfo(Urn urn) { + try { + final EntityResponse entityResponse = getEntityV2( + Constants.DATA_PLATFORM_ENTITY_NAME, + urn, + ImmutableSet.of(Constants.DATA_PLATFORM_INFO_ASPECT_NAME) + ); + if (entityResponse != null && entityResponse.hasAspects() && entityResponse.getAspects().containsKey(Constants.DATA_PLATFORM_INFO_ASPECT_NAME)) { + return new DataPlatformInfo(entityResponse.getAspects().get(Constants.DATA_PLATFORM_INFO_ASPECT_NAME).getValue().data()); + } + } catch (Exception e) { + log.warn(String.format("Failed to find Data Platform Info for urn %s", urn)); + } + return null; + } } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java index 4e0eb2489dbee..f141d80ac6bfc 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java @@ -127,12 +127,12 @@ public Map aggregateByValue(@Nullable String entityName, @Nonnull @Nonnull @Override - public BrowseResult browse(@Nonnull String entityName, @Nonnull String path, @Nullable Filter requestParams, int from, + public BrowseResult browse(@Nonnull String entityName, @Nonnull String path, @Nullable Filter filters, int from, int size) { log.debug( - String.format("Browsing entities entityName: %s, path: %s, requestParams: %s, from: %s, size: %s", entityName, - path, requestParams, from, size)); - return esBrowseDAO.browse(entityName, path, requestParams, from, size); + String.format("Browsing entities entityName: %s, path: %s, filters: %s, from: %s, size: %s", entityName, + path, filters, from, size)); + return esBrowseDAO.browse(entityName, path, filters, from, size); } @Nonnull diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/ESBrowseDAO.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/ESBrowseDAO.java index b5b92088fe423..fcbe62a801c71 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/ESBrowseDAO.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/ESBrowseDAO.java @@ -39,7 +39,6 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.AggregationBuilders; -import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; import org.elasticsearch.search.aggregations.bucket.terms.IncludeExclude; import org.elasticsearch.search.aggregations.bucket.terms.ParsedTerms; import org.elasticsearch.search.builder.SearchSourceBuilder; @@ -77,15 +76,15 @@ private class BrowseGroupsResult { * * @param entityName type of entity to query * @param path the path to be browsed - * @param requestParams the request map with fields and values as filters + * @param filters the request map with fields and values as filters * @param from index of the first entity located in path * @param size the max number of entities contained in the response * @return a {@link BrowseResult} that contains a list of groups/entities */ @Nonnull - public BrowseResult browse(@Nonnull String entityName, @Nonnull String path, @Nullable Filter requestParams, int from, + public BrowseResult browse(@Nonnull String entityName, @Nonnull String path, @Nullable Filter filters, int from, int size) { - final Map requestMap = SearchUtils.getRequestMap(requestParams); + final Map requestMap = SearchUtils.getRequestMap(filters); try { final String indexName = indexConvention.getIndexName(entityRegistry.getEntitySpec(entityName)); @@ -179,7 +178,7 @@ protected SearchRequest constructGroupsSearchRequest(@Nonnull String indexName, @Nonnull private QueryBuilder buildQueryString(@Nonnull String path, @Nonnull Map requestMap, boolean isGroupQuery) { - final int browseDepthVal = getPathDepth(path) + 1; + final int browseDepthVal = getPathDepth(path); final BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery(); @@ -236,7 +235,6 @@ private BrowseGroupsResult extractGroupsResponse(@Nonnull SearchResponse groupsR final ParsedTerms groups = groupsResponse.getAggregations().get(GROUP_AGG); final List groupsAgg = groups.getBuckets() .stream() - .filter(this::validateBucket) .map(group -> new BrowseResultGroup().setName(getSimpleName(group.getKeyAsString())) .setCount(group.getDocCount())) .collect(Collectors.toList()); @@ -247,18 +245,6 @@ private BrowseGroupsResult extractGroupsResponse(@Nonnull SearchResponse groupsR (int) groupsResponse.getHits().getTotalHits().value); } - /** - * Check if there are any paths that extends the matchedPath signifying that the path does not point to an entity - */ - private boolean validateBucket(@Nonnull MultiBucketsAggregation.Bucket bucket) { - final ParsedTerms groups = bucket.getAggregations().get(ALL_PATHS); - final String matchedPath = bucket.getKeyAsString(); - return groups.getBuckets() - .stream() - .map(MultiBucketsAggregation.Bucket::getKeyAsString) - .anyMatch(bucketPath -> (bucketPath.length() > matchedPath.length() && bucketPath.startsWith(matchedPath))); - } - /** * Extracts entity search response into list of browse result entities. * @@ -273,11 +259,8 @@ List extractEntitiesResponse(@Nonnull SearchResponse entitie Arrays.stream(entitiesResponse.getHits().getHits()).forEach(hit -> { try { final List allPaths = (List) hit.getSourceAsMap().get(BROWSE_PATH); - final String nextLevelPath = getNextLevelPath(allPaths, currentPath); - if (nextLevelPath != null) { - entityMetadataArray.add(new BrowseResultEntity().setName(getSimpleName(nextLevelPath)) - .setUrn(Urn.createFromString((String) hit.getSourceAsMap().get(URN)))); - } + entityMetadataArray.add(new BrowseResultEntity().setName((String) hit.getSourceAsMap().get(URN)) + .setUrn(Urn.createFromString((String) hit.getSourceAsMap().get(URN)))); } catch (URISyntaxException e) { log.error("URN is not valid: " + e.toString()); } @@ -286,7 +269,7 @@ List extractEntitiesResponse(@Nonnull SearchResponse entitie } /** - * Extracts the name of group/entity from path. + * Extracts the name of group from path. * *

Example: /foo/bar/baz => baz * @@ -298,17 +281,6 @@ private String getSimpleName(@Nonnull String path) { return path.substring(path.lastIndexOf('/') + 1); } - @VisibleForTesting - @Nullable - static String getNextLevelPath(@Nonnull List paths, @Nonnull String currentPath) { - final String normalizedCurrentPath = currentPath.toLowerCase(); - final int pathDepth = getPathDepth(currentPath); - return paths.stream() - .filter(x -> x.toLowerCase().startsWith(normalizedCurrentPath) && getPathDepth(x) == (pathDepth + 1)) - .findFirst() - .orElse(null); - } - private static int getPathDepth(@Nonnull String path) { return StringUtils.countMatches(path, "/"); } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/utils/BrowsePathUtils.java b/metadata-io/src/main/java/com/linkedin/metadata/search/utils/BrowsePathUtils.java index c76d35b41a11c..870064aadc486 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/utils/BrowsePathUtils.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/utils/BrowsePathUtils.java @@ -1,9 +1,9 @@ package com.linkedin.metadata.search.utils; -import com.linkedin.common.BrowsePaths; import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; import com.linkedin.data.schema.RecordDataSchema; -import com.linkedin.data.template.StringArray; +import com.linkedin.metadata.Constants; import com.linkedin.metadata.key.ChartKey; import com.linkedin.metadata.key.DashboardKey; import com.linkedin.metadata.key.DataFlowKey; @@ -11,30 +11,87 @@ import com.linkedin.metadata.key.DataPlatformKey; import com.linkedin.metadata.key.DatasetKey; import com.linkedin.metadata.key.GlossaryTermKey; +import com.linkedin.metadata.key.NotebookKey; import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.models.EntitySpec; import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.utils.EntityKeyUtils; import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.List; +import java.util.regex.Pattern; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import lombok.extern.slf4j.Slf4j; @Slf4j public class BrowsePathUtils { - private BrowsePathUtils() { - //not called + + public static String getDefaultBrowsePath( + @Nonnull Urn urn, + @Nonnull EntityRegistry entityRegistry, + @Nonnull Character dataPlatformDelimiter) throws URISyntaxException { + + switch (urn.getEntityType()) { + case Constants.DATASET_ENTITY_NAME: + DatasetKey dsKey = (DatasetKey) EntityKeyUtils.convertUrnToEntityKey(urn, getKeySchema(urn.getEntityType(), entityRegistry)); + DataPlatformKey dpKey = (DataPlatformKey) EntityKeyUtils.convertUrnToEntityKey( + dsKey.getPlatform(), + getKeySchema(dsKey.getPlatform().getEntityType(), + entityRegistry)); + String datasetNamePath = getDatasetPath(dsKey.getName(), dataPlatformDelimiter); + return ("/" + dsKey.getOrigin() + "/" + dpKey.getPlatformName() + datasetNamePath).toLowerCase(); + case Constants.CHART_ENTITY_NAME: + ChartKey chartKey = (ChartKey) EntityKeyUtils.convertUrnToEntityKey(urn, getKeySchema(urn.getEntityType(), entityRegistry)); + return ("/" + chartKey.getDashboardTool()); + case Constants.DASHBOARD_ENTITY_NAME: // TODO -> Improve the quality of our browse path here. + DashboardKey dashboardKey = (DashboardKey) EntityKeyUtils.convertUrnToEntityKey(urn, getKeySchema(urn.getEntityType(), entityRegistry)); + return ("/" + dashboardKey.getDashboardTool()).toLowerCase(); + case Constants.DATA_FLOW_ENTITY_NAME: // TODO -> Improve the quality of our browse path here. + DataFlowKey dataFlowKey = (DataFlowKey) EntityKeyUtils.convertUrnToEntityKey(urn, getKeySchema(urn.getEntityType(), entityRegistry)); + return ("/" + dataFlowKey.getOrchestrator() + "/" + dataFlowKey.getCluster()) + .toLowerCase(); + case Constants.DATA_JOB_ENTITY_NAME: // TODO -> Improve the quality of our browse path here. + DataJobKey dataJobKey = (DataJobKey) EntityKeyUtils.convertUrnToEntityKey(urn, getKeySchema(urn.getEntityType(), entityRegistry)); + DataFlowKey parentFlowKey = (DataFlowKey) EntityKeyUtils.convertUrnToEntityKey(dataJobKey.getFlow(), + getKeySchema(dataJobKey.getFlow().getEntityType(), entityRegistry)); + return ("/" + parentFlowKey.getOrchestrator() + "/" + parentFlowKey.getCluster()).toLowerCase(); + default: + return ""; + } } - public static BrowsePaths buildBrowsePath(Urn urn, EntityRegistry registry) throws URISyntaxException { - String defaultBrowsePath = getDefaultBrowsePath(urn, registry); - StringArray browsePaths = new StringArray(); - browsePaths.add(defaultBrowsePath); - BrowsePaths browsePathAspect = new BrowsePaths(); - browsePathAspect.setPaths(browsePaths); - return browsePathAspect; + @Nullable + public static Urn buildDataPlatformUrn(Urn urn, EntityRegistry entityRegistry) { + switch (urn.getEntityType()) { + case Constants.DATASET_ENTITY_NAME: + DatasetKey dsKey = (DatasetKey) EntityKeyUtils.convertUrnToEntityKey(urn, getKeySchema(urn.getEntityType(), entityRegistry)); + return dsKey.getPlatform(); + case Constants.CHART_ENTITY_NAME: + ChartKey chartKey = (ChartKey) EntityKeyUtils.convertUrnToEntityKey(urn, getKeySchema(urn.getEntityType(), entityRegistry)); + return UrnUtils.getUrn(String.format("urn:li:%s:%s", Constants.DATA_PLATFORM_ENTITY_NAME, chartKey.getDashboardTool())); + case Constants.DASHBOARD_ENTITY_NAME: + DashboardKey dashboardKey = (DashboardKey) EntityKeyUtils.convertUrnToEntityKey(urn, getKeySchema(urn.getEntityType(), entityRegistry)); + return UrnUtils.getUrn(String.format("urn:li:%s:%s", Constants.DATA_PLATFORM_ENTITY_NAME, dashboardKey.getDashboardTool())); + case Constants.DATA_FLOW_ENTITY_NAME: + DataFlowKey dataFlowKey = (DataFlowKey) EntityKeyUtils.convertUrnToEntityKey(urn, getKeySchema(urn.getEntityType(), entityRegistry)); + return UrnUtils.getUrn(String.format("urn:li:%s:%s", Constants.DATA_PLATFORM_ENTITY_NAME, dataFlowKey.getOrchestrator())); + case Constants.DATA_JOB_ENTITY_NAME: + DataJobKey dataJobKey = (DataJobKey) EntityKeyUtils.convertUrnToEntityKey(urn, getKeySchema(urn.getEntityType(), entityRegistry)); + DataFlowKey parentFlowKey = (DataFlowKey) EntityKeyUtils.convertUrnToEntityKey(dataJobKey.getFlow(), + getKeySchema(dataJobKey.getFlow().getEntityType(), entityRegistry)); + return UrnUtils.getUrn(String.format("urn:li:%s:%s", Constants.DATA_PLATFORM_ENTITY_NAME, parentFlowKey.getOrchestrator())); + case Constants.NOTEBOOK_ENTITY_NAME: + NotebookKey notebookKey = (NotebookKey) EntityKeyUtils.convertUrnToEntityKey(urn, getKeySchema(urn.getEntityType(), entityRegistry)); + return UrnUtils.getUrn(String.format("urn:li:%s:%s", Constants.DATA_PLATFORM_ENTITY_NAME, notebookKey.getNotebookTool())); + default: + // Could not resolve a data platform + return null; + } } - public static String getDefaultBrowsePath(Urn urn, EntityRegistry entityRegistry) throws URISyntaxException { + public static String getLegacyDefaultBrowsePath(Urn urn, EntityRegistry entityRegistry) throws URISyntaxException { switch (urn.getEntityType()) { case "dataset": DatasetKey dsKey = (DatasetKey) EntityKeyUtils.convertUrnToEntityKey(urn, getKeySchema(urn.getEntityType(), entityRegistry)); @@ -69,6 +126,21 @@ public static String getDefaultBrowsePath(Urn urn, EntityRegistry entityRegistry } } + /** + * Attempts to convert a dataset name into a proper browse path by splitting it using the Data Platform delimiter. + * If there are not > 1 name parts, then an empty string will be returned. + */ + private static String getDatasetPath(@Nonnull final String datasetName, @Nonnull final Character delimiter) { + if (datasetName.contains(delimiter.toString())) { + final List datasetNamePathParts = Arrays.asList(datasetName.split(Pattern.quote(delimiter.toString()))); + System.out.println(datasetNamePathParts); + // Omit the name from the path. + final String datasetPath = String.join("/", datasetNamePathParts.subList(0, datasetNamePathParts.size() - 1)); + return datasetPath.startsWith("/") ? datasetPath : String.format("/%s", datasetPath); + } + return ""; + } + protected static RecordDataSchema getKeySchema( final String entityName, final EntityRegistry registry) { @@ -76,4 +148,6 @@ protected static RecordDataSchema getKeySchema( final AspectSpec keySpec = spec.getKeyAspectSpec(); return keySpec.getPegasusSchema(); } + + private BrowsePathUtils() { } } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/elasticsearch/query/ESBrowseDAOTest.java b/metadata-io/src/test/java/com/linkedin/metadata/search/elasticsearch/query/ESBrowseDAOTest.java index 6959c0023bcd1..1b2259626a9da 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/elasticsearch/query/ESBrowseDAOTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/elasticsearch/query/ESBrowseDAOTest.java @@ -4,7 +4,6 @@ import com.linkedin.metadata.entity.TestEntityRegistry; import com.linkedin.metadata.utils.elasticsearch.IndexConventionImpl; import java.net.URISyntaxException; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -22,7 +21,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNull; public class ESBrowseDAOTest { @@ -43,41 +41,6 @@ public static Urn makeUrn(Object id) { } } - @Test - public void testMatchingPaths() { - List browsePaths = - Arrays.asList("/all/subscriptions/premium_new_signups_v2/subs_new_bookings", "/certified/lls/subs_new_bookings", - "/certified/lls/lex/subs_new_bookings", "/certified/lls/consumer/subs_new_bookings", "/subs_new_bookings", - "/School/Characteristics/General/Embedding/network_standardized_school_embeddings_v3"); - - // Scenario 1: inside /Certified/LLS - String path1 = "/certified/lls"; - assertEquals(ESBrowseDAO.getNextLevelPath(browsePaths, path1), "/certified/lls/subs_new_bookings"); - - // Scenario 2: inside /Certified/LLS/Consumer - String path2 = "/certified/lls/consumer"; - assertEquals(ESBrowseDAO.getNextLevelPath(browsePaths, path2), "/certified/lls/consumer/subs_new_bookings"); - - // Scenario 3: inside root directory - String path3 = ""; - assertEquals(ESBrowseDAO.getNextLevelPath(browsePaths, path3), "/subs_new_bookings"); - - // Scenario 4: inside an incorrect path /foo - // this situation should ideally not arise for entity browse queries - String path4 = "/foo"; - assertNull(ESBrowseDAO.getNextLevelPath(browsePaths, path4)); - - // Scenario 5: one of the browse paths isn't normalized - String path5 = "/school/characteristics/general/embedding"; - assertEquals(ESBrowseDAO.getNextLevelPath(browsePaths, path5), - "/School/Characteristics/General/Embedding/network_standardized_school_embeddings_v3"); - - // Scenario 6: current path isn't normalized, which ideally should not be the case - String path6 = "/School/Characteristics/General/Embedding"; - assertEquals(ESBrowseDAO.getNextLevelPath(browsePaths, path6), - "/School/Characteristics/General/Embedding/network_standardized_school_embeddings_v3"); - } - @Test public void testGetBrowsePath() throws Exception { SearchResponse mockSearchResponse = mock(SearchResponse.class); diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/utils/BrowsePathUtilsTest.java b/metadata-io/src/test/java/com/linkedin/metadata/search/utils/BrowsePathUtilsTest.java new file mode 100644 index 0000000000000..6127326db8ab9 --- /dev/null +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/utils/BrowsePathUtilsTest.java @@ -0,0 +1,114 @@ +package com.linkedin.metadata.search.utils; + +import com.linkedin.common.FabricType; +import com.linkedin.common.urn.Urn; +import com.linkedin.metadata.entity.TestEntityRegistry; +import com.linkedin.metadata.key.ChartKey; +import com.linkedin.metadata.key.DashboardKey; +import com.linkedin.metadata.key.DataFlowKey; +import com.linkedin.metadata.key.DataJobKey; +import com.linkedin.metadata.key.DatasetKey; +import com.linkedin.metadata.models.registry.EntityRegistry; +import com.linkedin.metadata.utils.EntityKeyUtils; +import java.net.URISyntaxException; +import org.testng.Assert; +import org.testng.annotations.Test; + + + +public class BrowsePathUtilsTest { + + private final EntityRegistry registry = new TestEntityRegistry(); + + @Test + public void testGetDefaultBrowsePath() throws URISyntaxException { + + // Datasets + DatasetKey datasetKey = new DatasetKey() + .setName("Test.A.B") + .setOrigin(FabricType.PROD) + .setPlatform(Urn.createFromString("urn:li:dataPlatform:kafka")); + Urn datasetUrn = EntityKeyUtils.convertEntityKeyToUrn(datasetKey, "dataset"); + String datasetPath = BrowsePathUtils.getDefaultBrowsePath(datasetUrn, this.registry, '.'); + Assert.assertEquals(datasetPath, "/prod/kafka/test/a"); + + // Charts + ChartKey chartKey = new ChartKey() + .setChartId("Test/A/B") + .setDashboardTool("looker"); + Urn chartUrn = EntityKeyUtils.convertEntityKeyToUrn(chartKey, "chart"); + String chartPath = BrowsePathUtils.getDefaultBrowsePath(chartUrn, this.registry, '/'); + Assert.assertEquals(chartPath, "/looker"); + + // Dashboards + DashboardKey dashboardKey = new DashboardKey() + .setDashboardId("Test/A/B") + .setDashboardTool("looker"); + Urn dashboardUrn = EntityKeyUtils.convertEntityKeyToUrn(dashboardKey, "dashboard"); + String dashboardPath = BrowsePathUtils.getDefaultBrowsePath(dashboardUrn, this.registry, '/'); + Assert.assertEquals(dashboardPath, "/looker"); + + // Data Flows + DataFlowKey dataFlowKey = new DataFlowKey() + .setCluster("test") + .setFlowId("Test/A/B") + .setOrchestrator("airflow"); + Urn dataFlowUrn = EntityKeyUtils.convertEntityKeyToUrn(dataFlowKey, "dataFlow"); + String dataFlowPath = BrowsePathUtils.getDefaultBrowsePath(dataFlowUrn, this.registry, '/'); + Assert.assertEquals(dataFlowPath, "/airflow/test"); + + // Data Jobs + DataJobKey dataJobKey = new DataJobKey() + .setFlow(Urn.createFromString("urn:li:dataFlow:(airflow,Test/A/B,test)")) + .setJobId("Job/A/B"); + Urn dataJobUrn = EntityKeyUtils.convertEntityKeyToUrn(dataJobKey, "dataJob"); + String dataJobPath = BrowsePathUtils.getDefaultBrowsePath(dataJobUrn, this.registry, '/'); + Assert.assertEquals(dataJobPath, "/airflow/test"); + } + + @Test + public void testBuildDataPlatformUrn() throws URISyntaxException { + // Datasets + DatasetKey datasetKey = new DatasetKey() + .setName("Test.A.B") + .setOrigin(FabricType.PROD) + .setPlatform(Urn.createFromString("urn:li:dataPlatform:kafka")); + Urn datasetUrn = EntityKeyUtils.convertEntityKeyToUrn(datasetKey, "dataset"); + Urn dataPlatformUrn1 = BrowsePathUtils.buildDataPlatformUrn(datasetUrn, this.registry); + Assert.assertEquals(dataPlatformUrn1, Urn.createFromString("urn:li:dataPlatform:kafka")); + + // Charts + ChartKey chartKey = new ChartKey() + .setChartId("Test/A/B") + .setDashboardTool("looker"); + Urn chartUrn = EntityKeyUtils.convertEntityKeyToUrn(chartKey, "chart"); + Urn dataPlatformUrn2 = BrowsePathUtils.buildDataPlatformUrn(chartUrn, this.registry); + Assert.assertEquals(dataPlatformUrn2, Urn.createFromString("urn:li:dataPlatform:looker")); + + // Dashboards + DashboardKey dashboardKey = new DashboardKey() + .setDashboardId("Test/A/B") + .setDashboardTool("looker"); + Urn dashboardUrn = EntityKeyUtils.convertEntityKeyToUrn(dashboardKey, "dashboard"); + Urn dataPlatformUrn3 = BrowsePathUtils.buildDataPlatformUrn(dashboardUrn, this.registry); + Assert.assertEquals(dataPlatformUrn3, Urn.createFromString("urn:li:dataPlatform:looker")); + + // Data Flows + DataFlowKey dataFlowKey = new DataFlowKey() + .setCluster("test") + .setFlowId("Test/A/B") + .setOrchestrator("airflow"); + Urn dataFlowUrn = EntityKeyUtils.convertEntityKeyToUrn(dataFlowKey, "dataFlow"); + Urn dataPlatformUrn4 = BrowsePathUtils.buildDataPlatformUrn(dataFlowUrn, this.registry); + Assert.assertEquals(dataPlatformUrn4, Urn.createFromString("urn:li:dataPlatform:airflow")); + + // Data Jobs + DataJobKey dataJobKey = new DataJobKey() + .setFlow(Urn.createFromString("urn:li:dataFlow:(airflow,Test/A/B,test)")) + .setJobId("Job/A/B"); + Urn dataJobUrn = EntityKeyUtils.convertEntityKeyToUrn(dataJobKey, "dataJob"); + Urn dataPlatformUrn5 = BrowsePathUtils.buildDataPlatformUrn(dataJobUrn, this.registry); + Assert.assertEquals(dataPlatformUrn5, Urn.createFromString("urn:li:dataPlatform:airflow")); + + } +} diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/factories/BootstrapManagerFactory.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/factories/BootstrapManagerFactory.java index cba09ee0d9406..19a117e1f7f6b 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/factories/BootstrapManagerFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/factories/BootstrapManagerFactory.java @@ -6,6 +6,7 @@ import com.linkedin.gms.factory.search.EntitySearchServiceFactory; import com.linkedin.gms.factory.search.SearchDocumentTransformerFactory; import com.linkedin.metadata.boot.BootstrapManager; +import com.linkedin.metadata.boot.BootstrapStep; import com.linkedin.metadata.boot.steps.IngestDataPlatformInstancesStep; import com.linkedin.metadata.boot.steps.IngestDataPlatformsStep; import com.linkedin.metadata.boot.steps.IngestPoliciesStep; @@ -15,14 +16,18 @@ import com.linkedin.metadata.boot.steps.RemoveClientIdAspectStep; import com.linkedin.metadata.boot.steps.RestoreDbtSiblingsIndices; import com.linkedin.metadata.boot.steps.RestoreGlossaryIndices; +import com.linkedin.metadata.boot.steps.UpgradeDefaultBrowsePathsStep; import com.linkedin.metadata.entity.AspectMigrationsDao; import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.search.EntitySearchService; import com.linkedin.metadata.search.transformer.SearchDocumentTransformer; +import java.util.ArrayList; +import java.util.List; import javax.annotation.Nonnull; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; @@ -58,6 +63,9 @@ public class BootstrapManagerFactory { @Qualifier("ingestRetentionPoliciesStep") private IngestRetentionPoliciesStep _ingestRetentionPoliciesStep; + @Value("${bootstrap.upgradeDefaultBrowsePaths.enabled}") + private Boolean _upgradeDefaultBrowsePathsEnabled; + @Bean(name = "bootstrapManager") @Scope("singleton") @Nonnull @@ -74,9 +82,15 @@ protected BootstrapManager createInstance() { final RestoreDbtSiblingsIndices restoreDbtSiblingsIndices = new RestoreDbtSiblingsIndices(_entityService, _entityRegistry); final RemoveClientIdAspectStep removeClientIdAspectStep = new RemoveClientIdAspectStep(_entityService); - return new BootstrapManager( - ImmutableList.of(ingestRootUserStep, ingestPoliciesStep, ingestRolesStep, ingestDataPlatformsStep, - ingestDataPlatformInstancesStep, _ingestRetentionPoliciesStep, restoreGlossaryIndicesStep, - removeClientIdAspectStep, restoreDbtSiblingsIndices)); + + final List finalSteps = new ArrayList<>(ImmutableList.of(ingestRootUserStep, ingestPoliciesStep, ingestRolesStep, + ingestDataPlatformsStep, ingestDataPlatformInstancesStep, _ingestRetentionPoliciesStep, restoreGlossaryIndicesStep, + removeClientIdAspectStep, restoreDbtSiblingsIndices)); + + if (_upgradeDefaultBrowsePathsEnabled) { + finalSteps.add(new UpgradeDefaultBrowsePathsStep(_entityService)); + } + + return new BootstrapManager(finalSteps); } } diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/UpgradeDefaultBrowsePathsStep.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/UpgradeDefaultBrowsePathsStep.java new file mode 100644 index 0000000000000..9afc1f8c2f8a5 --- /dev/null +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/UpgradeDefaultBrowsePathsStep.java @@ -0,0 +1,135 @@ +package com.linkedin.metadata.boot.steps; + +import com.google.common.collect.ImmutableSet; +import com.linkedin.common.AuditStamp; +import com.linkedin.common.BrowsePaths; +import com.linkedin.common.urn.Urn; +import com.linkedin.data.template.RecordTemplate; +import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.Constants; +import com.linkedin.metadata.boot.UpgradeStep; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.entity.ListResult; +import com.linkedin.metadata.query.ExtraInfo; +import com.linkedin.metadata.search.utils.BrowsePathUtils; +import com.linkedin.metadata.utils.GenericRecordUtils; +import com.linkedin.mxe.MetadataChangeProposal; +import com.linkedin.mxe.SystemMetadata; +import java.util.Set; +import javax.annotation.Nonnull; +import lombok.extern.slf4j.Slf4j; + + +/** + * This is an opt-in optional upgrade step to migrate your browse paths to the new truncated form. + * It is idempotent, can be retried as many times as necessary. + */ +@Slf4j +public class UpgradeDefaultBrowsePathsStep extends UpgradeStep { + + private static final Set ENTITY_TYPES_TO_MIGRATE = ImmutableSet.of( + Constants.DATASET_ENTITY_NAME, + Constants.DASHBOARD_ENTITY_NAME, + Constants.CHART_ENTITY_NAME, + Constants.DATA_JOB_ENTITY_NAME, + Constants.DATA_FLOW_ENTITY_NAME + ); + private static final String VERSION = "1"; + private static final String UPGRADE_ID = "upgrade-default-browse-paths-step"; + private static final Integer BATCH_SIZE = 5000; + + public UpgradeDefaultBrowsePathsStep(EntityService entityService) { + super(entityService, VERSION, UPGRADE_ID); + } + + @Override + public void upgrade() throws Exception { + final AuditStamp auditStamp = + new AuditStamp().setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis()); + + int total = 0; + for (String entityType : ENTITY_TYPES_TO_MIGRATE) { + int migratedCount = 0; + do { + log.info(String.format("Upgrading batch %s-%s out of %s of browse paths for entity type %s", + migratedCount, migratedCount + BATCH_SIZE, total, entityType)); + total = getAndMigrateBrowsePaths(entityType, migratedCount, auditStamp); + migratedCount += BATCH_SIZE; + } while (migratedCount < total); + } + log.info("Successfully upgraded all browse paths!"); + } + + @Nonnull + @Override + public ExecutionMode getExecutionMode() { + return ExecutionMode.BLOCKING; // ensure there are no write conflicts. + } + + private int getAndMigrateBrowsePaths(String entityType, int start, AuditStamp auditStamp) + throws Exception { + + final ListResult latestAspects = _entityService.listLatestAspects( + entityType, + Constants.BROWSE_PATHS_ASPECT_NAME, + start, + BATCH_SIZE); + + if (latestAspects.getTotalCount() == 0 || latestAspects.getValues() == null || latestAspects.getMetadata() == null) { + log.debug(String.format("Found 0 browse paths for entity with type %s. Skipping migration!", entityType)); + return 0; + } + + if (latestAspects.getValues().size() != latestAspects.getMetadata().getExtraInfos().size()) { + // Bad result -- we should log that we cannot migrate this batch of paths. + log.warn("Failed to match browse path aspects with corresponding urns. Found mismatched length between aspects ({})" + + "and metadata ({}) for metadata {}", + latestAspects.getValues().size(), + latestAspects.getMetadata().getExtraInfos().size(), + latestAspects.getMetadata()); + return latestAspects.getTotalCount(); + } + + for (int i = 0; i < latestAspects.getValues().size(); i++) { + + ExtraInfo info = latestAspects.getMetadata().getExtraInfos().get(i); + RecordTemplate browsePathsRec = latestAspects.getValues().get(i); + + // Assert on 2 conditions: + // 1. The latest browse path aspect contains only 1 browse path + // 2. The latest browse path matches exactly the legacy default path + + Urn urn = info.getUrn(); + BrowsePaths browsePaths = (BrowsePaths) browsePathsRec; + + log.debug(String.format("Inspecting browse path for urn %s, value %s", urn, browsePaths)); + + if (browsePaths.hasPaths() && browsePaths.getPaths().size() == 1) { + String legacyBrowsePath = BrowsePathUtils.getLegacyDefaultBrowsePath(urn, _entityService.getEntityRegistry()); + log.debug(String.format("Legacy browse path for urn %s, value %s", urn, legacyBrowsePath)); + if (legacyBrowsePath.equals(browsePaths.getPaths().get(0))) { + migrateBrowsePath(urn, auditStamp); + } + } + } + + return latestAspects.getTotalCount(); + } + + private void migrateBrowsePath(Urn urn, AuditStamp auditStamp) throws Exception { + BrowsePaths newPaths = _entityService.buildDefaultBrowsePath(urn); + log.debug(String.format("Updating browse path for urn %s to value %s", urn, newPaths)); + MetadataChangeProposal proposal = new MetadataChangeProposal(); + proposal.setEntityUrn(urn); + proposal.setEntityType(urn.getEntityType()); + proposal.setAspectName(Constants.BROWSE_PATHS_ASPECT_NAME); + proposal.setChangeType(ChangeType.UPSERT); + proposal.setSystemMetadata(new SystemMetadata().setRunId(EntityService.DEFAULT_RUN_ID).setLastObserved(System.currentTimeMillis())); + proposal.setAspect(GenericRecordUtils.serializeAspect(newPaths)); + _entityService.ingestProposal( + proposal, + auditStamp + ); + } + +} \ No newline at end of file diff --git a/metadata-service/factories/src/main/resources/application.yml b/metadata-service/factories/src/main/resources/application.yml index be259a12724a5..94a18b0b29fb3 100644 --- a/metadata-service/factories/src/main/resources/application.yml +++ b/metadata-service/factories/src/main/resources/application.yml @@ -197,6 +197,10 @@ metadataTests: siblings: enabled: ${ENABLE_SIBLING_HOOK:true} # enable to turn on automatic sibling associations for dbt +bootstrap: + upgradeDefaultBrowsePaths: + enabled: ${UPGRADE_DEFAULT_BROWSE_PATHS_ENABLED:false} # enable to run the upgrade to migrate legacy default browse paths to new ones + featureFlags: showSimplifiedHomepageByDefault: ${SHOW_SIMPLIFIED_HOMEPAGE_BY_DEFAULT:false} # shows a simplified homepage with just datasets, charts and dashboards by default to users. this can be configured in user settings diff --git a/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/UpgradeDefaultBrowsePathsStepTest.java b/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/UpgradeDefaultBrowsePathsStepTest.java new file mode 100644 index 0000000000000..729c1d23306c2 --- /dev/null +++ b/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/UpgradeDefaultBrowsePathsStepTest.java @@ -0,0 +1,316 @@ +package com.linkedin.metadata.boot.steps; + +import com.google.common.collect.ImmutableList; +import com.linkedin.common.AuditStamp; +import com.linkedin.common.BrowsePaths; +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.data.template.RecordTemplate; +import com.linkedin.data.template.StringArray; +import com.linkedin.entity.Aspect; +import com.linkedin.entity.EntityResponse; +import com.linkedin.entity.EnvelopedAspect; +import com.linkedin.entity.EnvelopedAspectMap; +import com.linkedin.metadata.Constants; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.entity.ListResult; +import com.linkedin.metadata.models.EntitySpec; +import com.linkedin.metadata.models.EntitySpecBuilder; +import com.linkedin.metadata.models.EventSpec; +import com.linkedin.metadata.models.registry.EntityRegistry; +import com.linkedin.metadata.query.ExtraInfo; +import com.linkedin.metadata.query.ExtraInfoArray; +import com.linkedin.metadata.query.ListResultMetadata; +import com.linkedin.metadata.search.utils.BrowsePathUtils; +import com.linkedin.metadata.snapshot.Snapshot; +import com.linkedin.mxe.MetadataChangeProposal; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.mockito.Mockito; +import org.testng.annotations.Test; + + +public class UpgradeDefaultBrowsePathsStepTest { + + private static final String VERSION_1 = "1"; + private static final String UPGRADE_URN = String.format( + "urn:li:%s:%s", + Constants.DATA_HUB_UPGRADE_ENTITY_NAME, + "upgrade-default-browse-paths-step"); + + @Test + public void testExecuteNoExistingBrowsePaths() throws Exception { + + final EntityService mockService = Mockito.mock(EntityService.class); + final EntityRegistry registry = new TestEntityRegistry(); + Mockito.when(mockService.getEntityRegistry()).thenReturn(registry); + + final Urn upgradeEntityUrn = Urn.createFromString(UPGRADE_URN); + Mockito.when(mockService.getEntityV2( + Mockito.eq(Constants.DATA_HUB_UPGRADE_ENTITY_NAME), + Mockito.eq(upgradeEntityUrn), + Mockito.eq(Collections.singleton(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME)) + )).thenReturn(null); + + final List browsePaths1 = Collections.emptyList(); + + Mockito.when(mockService.listLatestAspects( + Mockito.eq(Constants.DATASET_ENTITY_NAME), + Mockito.eq(Constants.BROWSE_PATHS_ASPECT_NAME), + Mockito.eq(0), + Mockito.eq(5000) + )).thenReturn(new ListResult<>( + browsePaths1, + new ListResultMetadata().setExtraInfos(new ExtraInfoArray(Collections.emptyList())), + 0, + false, + 0, + 0, + 2)); + initMockServiceOtherEntities(mockService); + + UpgradeDefaultBrowsePathsStep upgradeDefaultBrowsePathsStep = new UpgradeDefaultBrowsePathsStep(mockService); + upgradeDefaultBrowsePathsStep.execute(); + + Mockito.verify(mockService, Mockito.times(1)).listLatestAspects( + Mockito.eq(Constants.DATASET_ENTITY_NAME), + Mockito.eq(Constants.BROWSE_PATHS_ASPECT_NAME), + Mockito.eq(0), + Mockito.eq(5000) + ); + // Verify that 4 aspects are ingested, 2 for the upgrade request / result, but none for ingesting + Mockito.verify(mockService, Mockito.times(2)).ingestProposal( + Mockito.any(MetadataChangeProposal.class), + Mockito.any() + ); + } + + @Test + public void testExecuteFirstTime() throws Exception { + + Urn testUrn1 = UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset1,PROD)"); + Urn testUrn2 = UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset2,PROD)"); + + final EntityService mockService = Mockito.mock(EntityService.class); + final EntityRegistry registry = new TestEntityRegistry(); + Mockito.when(mockService.getEntityRegistry()).thenReturn(registry); + Mockito.when(mockService.buildDefaultBrowsePath(Mockito.eq(testUrn1))).thenReturn( + new BrowsePaths().setPaths(new StringArray(ImmutableList.of("/prod/kafka")))); + Mockito.when(mockService.buildDefaultBrowsePath(Mockito.eq(testUrn2))).thenReturn( + new BrowsePaths().setPaths(new StringArray(ImmutableList.of("/prod/kafka")))); + + final Urn upgradeEntityUrn = Urn.createFromString(UPGRADE_URN); + Mockito.when(mockService.getEntityV2( + Mockito.eq(Constants.DATA_HUB_UPGRADE_ENTITY_NAME), + Mockito.eq(upgradeEntityUrn), + Mockito.eq(Collections.singleton(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME)) + )).thenReturn(null); + final List browsePaths1 = ImmutableList.of( + new BrowsePaths().setPaths(new StringArray(ImmutableList.of(BrowsePathUtils.getLegacyDefaultBrowsePath(testUrn1, registry)))), + new BrowsePaths().setPaths(new StringArray(ImmutableList.of(BrowsePathUtils.getLegacyDefaultBrowsePath(testUrn2, registry)))) + ); + + final List extraInfos1 = ImmutableList.of( + new ExtraInfo() + .setUrn(testUrn1) + .setVersion(0L) + .setAudit(new AuditStamp().setActor(UrnUtils.getUrn("urn:li:corpuser:test")).setTime(0L)), + new ExtraInfo() + .setUrn(testUrn2) + .setVersion(0L) + .setAudit(new AuditStamp().setActor(UrnUtils.getUrn("urn:li:corpuser:test")).setTime(0L)) + ); + + Mockito.when(mockService.listLatestAspects( + Mockito.eq(Constants.DATASET_ENTITY_NAME), + Mockito.eq(Constants.BROWSE_PATHS_ASPECT_NAME), + Mockito.eq(0), + Mockito.eq(5000) + )).thenReturn(new ListResult<>( + browsePaths1, + new ListResultMetadata().setExtraInfos(new ExtraInfoArray(extraInfos1)), + 2, + false, + 2, + 2, + 2)); + initMockServiceOtherEntities(mockService); + + UpgradeDefaultBrowsePathsStep upgradeDefaultBrowsePathsStep = new UpgradeDefaultBrowsePathsStep(mockService); + upgradeDefaultBrowsePathsStep.execute(); + + Mockito.verify(mockService, Mockito.times(1)).listLatestAspects( + Mockito.eq(Constants.DATASET_ENTITY_NAME), + Mockito.eq(Constants.BROWSE_PATHS_ASPECT_NAME), + Mockito.eq(0), + Mockito.eq(5000) + ); + // Verify that 4 aspects are ingested, 2 for the upgrade request / result and 2 for the browse pahts + Mockito.verify(mockService, Mockito.times(4)).ingestProposal( + Mockito.any(MetadataChangeProposal.class), + Mockito.any() + ); + } + + @Test + public void testDoesNotRunWhenBrowsePathIsNotQualified() throws Exception { + // Test for browse paths that are not ingested + Urn testUrn3 = UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset3,PROD)"); // Do not migrate + Urn testUrn4 = UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset4,PROD)"); // Do not migrate + + final EntityService mockService = Mockito.mock(EntityService.class); + final EntityRegistry registry = new TestEntityRegistry(); + Mockito.when(mockService.getEntityRegistry()).thenReturn(registry); + + final Urn upgradeEntityUrn = Urn.createFromString(UPGRADE_URN); + Mockito.when(mockService.getEntityV2( + Mockito.eq(Constants.DATA_HUB_UPGRADE_ENTITY_NAME), + Mockito.eq(upgradeEntityUrn), + Mockito.eq(Collections.singleton(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME)) + )).thenReturn(null); + + final List browsePaths2 = ImmutableList.of( + new BrowsePaths().setPaths(new StringArray(ImmutableList.of( + BrowsePathUtils.getDefaultBrowsePath(testUrn3, registry, '.')))), + new BrowsePaths().setPaths(new StringArray(ImmutableList.of( + BrowsePathUtils.getLegacyDefaultBrowsePath(testUrn4, registry), + BrowsePathUtils.getDefaultBrowsePath(testUrn4, registry, '.')))) + ); + + final List extraInfos2 = ImmutableList.of( + new ExtraInfo() + .setUrn(testUrn3) + .setVersion(0L) + .setAudit(new AuditStamp().setActor(UrnUtils.getUrn("urn:li:corpuser:test")).setTime(0L)), + new ExtraInfo() + .setUrn(testUrn4) + .setVersion(0L) + .setAudit(new AuditStamp().setActor(UrnUtils.getUrn("urn:li:corpuser:test")).setTime(0L))); + + + Mockito.when(mockService.listLatestAspects( + Mockito.eq(Constants.DATASET_ENTITY_NAME), + Mockito.eq(Constants.BROWSE_PATHS_ASPECT_NAME), + Mockito.eq(0), + Mockito.eq(5000) + )).thenReturn(new ListResult<>( + browsePaths2, + new ListResultMetadata().setExtraInfos(new ExtraInfoArray(extraInfos2)), + 2, + false, + 2, + 2, + 2)); + initMockServiceOtherEntities(mockService); + + UpgradeDefaultBrowsePathsStep upgradeDefaultBrowsePathsStep = new UpgradeDefaultBrowsePathsStep(mockService); + upgradeDefaultBrowsePathsStep.execute(); + + Mockito.verify(mockService, Mockito.times(1)).listLatestAspects( + Mockito.eq(Constants.DATASET_ENTITY_NAME), + Mockito.eq(Constants.BROWSE_PATHS_ASPECT_NAME), + Mockito.eq(0), + Mockito.eq(5000) + ); + // Verify that 2 aspects are ingested, only those for the upgrade step + Mockito.verify(mockService, Mockito.times(2)).ingestProposal( + Mockito.any(MetadataChangeProposal.class), + Mockito.any() + ); + } + + @Test + public void testDoesNotRunWhenAlreadyExecuted() throws Exception { + final EntityService mockService = Mockito.mock(EntityService.class); + + final Urn upgradeEntityUrn = Urn.createFromString(UPGRADE_URN); + com.linkedin.upgrade.DataHubUpgradeRequest upgradeRequest = new com.linkedin.upgrade.DataHubUpgradeRequest().setVersion(VERSION_1); + Map upgradeRequestAspects = new HashMap<>(); + upgradeRequestAspects.put(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME, + new EnvelopedAspect().setValue(new Aspect(upgradeRequest.data()))); + EntityResponse response = new EntityResponse().setAspects(new EnvelopedAspectMap(upgradeRequestAspects)); + Mockito.when(mockService.getEntityV2( + Mockito.eq(Constants.DATA_HUB_UPGRADE_ENTITY_NAME), + Mockito.eq(upgradeEntityUrn), + Mockito.eq(Collections.singleton(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME)) + )).thenReturn(response); + + UpgradeDefaultBrowsePathsStep step = new UpgradeDefaultBrowsePathsStep(mockService); + step.execute(); + + Mockito.verify(mockService, Mockito.times(0)).ingestProposal( + Mockito.any(MetadataChangeProposal.class), + Mockito.any(AuditStamp.class) + ); + } + + private void initMockServiceOtherEntities(EntityService mockService) { + List skippedEntityTypes = ImmutableList.of( + Constants.DASHBOARD_ENTITY_NAME, + Constants.CHART_ENTITY_NAME, + Constants.DATA_FLOW_ENTITY_NAME, + Constants.DATA_JOB_ENTITY_NAME + ); + for (String entityType : skippedEntityTypes) { + Mockito.when(mockService.listLatestAspects( + Mockito.eq(entityType), + Mockito.eq(Constants.BROWSE_PATHS_ASPECT_NAME), + Mockito.eq(0), + Mockito.eq(5000) + )).thenReturn(new ListResult<>( + Collections.emptyList(), + new ListResultMetadata().setExtraInfos(new ExtraInfoArray(Collections.emptyList())), + 0, + false, + 0, + 0, + 0)); + } + } + + public static class TestEntityRegistry implements EntityRegistry { + + private final Map entityNameToSpec; + + public TestEntityRegistry() { + entityNameToSpec = new EntitySpecBuilder(EntitySpecBuilder.AnnotationExtractionMode.IGNORE_ASPECT_FIELDS) + .buildEntitySpecs(new Snapshot().schema()) + .stream() + .collect(Collectors.toMap(spec -> spec.getName().toLowerCase(), spec -> spec)); + } + + @Nonnull + @Override + public EntitySpec getEntitySpec(@Nonnull final String entityName) { + String lowercaseEntityName = entityName.toLowerCase(); + if (!entityNameToSpec.containsKey(lowercaseEntityName)) { + throw new IllegalArgumentException( + String.format("Failed to find entity with name %s in EntityRegistry", entityName)); + } + return entityNameToSpec.get(lowercaseEntityName); + } + + @Nullable + @Override + public EventSpec getEventSpec(@Nonnull String eventName) { + return null; + } + + @Nonnull + @Override + public Map getEntitySpecs() { + return entityNameToSpec; + } + + @Nonnull + @Override + public Map getEventSpecs() { + return Collections.emptyMap(); + } + } +} diff --git a/smoke-test/tests/browse/__init__.py b/smoke-test/tests/browse/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/smoke-test/tests/browse/browse_test.py b/smoke-test/tests/browse/browse_test.py new file mode 100644 index 0000000000000..7bedd37d6a026 --- /dev/null +++ b/smoke-test/tests/browse/browse_test.py @@ -0,0 +1,124 @@ +import json +import urllib +import time + +import pytest +import requests +from tests.utils import delete_urns_from_file, get_frontend_url, ingest_file_via_rest + + +TEST_DATASET_1_URN = "urn:li:dataset:(urn:li:dataPlatform:kafka,test-browse-1,PROD)" +TEST_DATASET_2_URN = "urn:li:dataset:(urn:li:dataPlatform:kafka,test-browse-2,PROD)" +TEST_DATASET_3_URN = "urn:li:dataset:(urn:li:dataPlatform:kafka,test-browse-3,PROD)" + + +@pytest.fixture(scope="module", autouse=False) +def ingest_cleanup_data(request): + print("ingesting browse test data") + ingest_file_via_rest("tests/browse/data.json") + time.sleep(5) # Allow for indexing time + yield + print("removing browse test data") + delete_urns_from_file("tests/browse/data.json") + + +@pytest.mark.dependency() +def test_healthchecks(wait_for_healthchecks): + # Call to wait_for_healthchecks fixture will do the actual functionality. + pass + + +@pytest.mark.dependency(depends=["test_healthchecks"]) +def test_get_browse_paths(frontend_session, ingest_cleanup_data): + + # Iterate through each browse path, starting with the root + + get_browse_paths_query = """query browse($input: BrowseInput!) {\n + browse(input: $input) {\n + total\n + entities {\n + urn\n + }\n + groups {\n + name\n + count\n + }\n + metadata {\n + path\n + totalNumEntities\n + }\n + }\n + }""" + + # /prod -- There should be one entity + get_browse_paths_json = { + "query": get_browse_paths_query, + "variables": {"input": { "type": "DATASET", "path": ["prod"], "start": 0, "count": 10 } }, + } + + response = frontend_session.post( + f"{get_frontend_url()}/api/v2/graphql", json=get_browse_paths_json + ) + response.raise_for_status() + res_data = response.json() + + assert res_data + assert res_data["data"] + assert res_data["data"]["browse"] is not None + assert "errors" not in res_data + + browse = res_data["data"]["browse"] + print(browse) + assert browse["entities"] == [{ "urn": TEST_DATASET_3_URN }] + + # /prod/kafka1 + get_browse_paths_json = { + "query": get_browse_paths_query, + "variables": {"input": { "type": "DATASET", "path": ["prod", "kafka1"], "start": 0, "count": 10 } }, + } + + response = frontend_session.post( + f"{get_frontend_url()}/api/v2/graphql", json=get_browse_paths_json + ) + response.raise_for_status() + res_data = response.json() + + assert res_data + assert res_data["data"] + assert res_data["data"]["browse"] is not None + assert "errors" not in res_data + + browse = res_data["data"]["browse"] + assert browse == { + "total": 3, + "entities": [{ "urn": TEST_DATASET_1_URN }, { "urn": TEST_DATASET_2_URN }, { "urn": TEST_DATASET_3_URN }], + "groups": [], + "metadata": { "path": ["prod", "kafka1"], "totalNumEntities": 0 } + } + + # /prod/kafka2 + get_browse_paths_json = { + "query": get_browse_paths_query, + "variables": {"input": { "type": "DATASET", "path": ["prod", "kafka2"], "start": 0, "count": 10 } }, + } + + response = frontend_session.post( + f"{get_frontend_url()}/api/v2/graphql", json=get_browse_paths_json + ) + response.raise_for_status() + res_data = response.json() + + assert res_data + assert res_data["data"] + assert res_data["data"]["browse"] is not None + assert "errors" not in res_data + + browse = res_data["data"]["browse"] + assert browse == { + "total": 2, + "entities": [{ "urn": TEST_DATASET_1_URN }, { "urn": TEST_DATASET_2_URN }], + "groups": [], + "metadata": { "path": ["prod", "kafka2"], "totalNumEntities": 0 } + } + + diff --git a/smoke-test/tests/browse/data.json b/smoke-test/tests/browse/data.json new file mode 100644 index 0000000000000..ef776a04c134c --- /dev/null +++ b/smoke-test/tests/browse/data.json @@ -0,0 +1,83 @@ +[ + { + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:kafka,test-browse-1,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.BrowsePaths": { + "paths": ["/prod/kafka1", "/prod/kafka2"] + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "description": null, + "uri": null, + "tags": [], + "customProperties": { + "prop1": "fakeprop", + "prop2": "pikachu" + } + } + } + ] + } + }, + "proposedDelta": null + }, + { + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:kafka,test-browse-2,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.BrowsePaths": { + "paths": ["/prod/kafka1", "/prod/kafka2"] + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "description": null, + "uri": null, + "tags": [], + "customProperties": { + "prop1": "fakeprop", + "prop2": "pikachu" + } + } + } + ] + } + }, + "proposedDelta": null + }, + { + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:kafka,test-browse-3,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.BrowsePaths": { + "paths": ["/prod", "/prod/kafka1"] + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "description": null, + "uri": null, + "tags": [], + "customProperties": { + "prop1": "fakeprop", + "prop2": "pikachu" + } + } + } + ] + } + }, + "proposedDelta": null + } +] \ No newline at end of file