Skip to content

Commit

Permalink
fix(lineage) Fix lineage viz with multiple siblings (datahub-project#…
Browse files Browse the repository at this point in the history
  • Loading branch information
chriscollins3456 authored and szalai1 committed Dec 22, 2022
1 parent 100b077 commit d360b42
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.net.URISyntaxException;
import java.util.HashSet;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -57,7 +58,8 @@ public CompletableFuture<EntityLineageResult> get(DataFetchingEnvironment enviro
start != null ? start : 0,
count != null ? count : 100,
1,
separateSiblings != null ? input.getSeparateSiblings() : false
separateSiblings != null ? input.getSeparateSiblings() : false,
new HashSet<>()
));
} catch (URISyntaxException e) {
log.error("Failed to fetch lineage for {}", urn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class SiblingGraphService {
@Nonnull
public EntityLineageResult getLineage(@Nonnull Urn entityUrn, @Nonnull LineageDirection direction, int offset,
int count, int maxHops) {
return getLineage(entityUrn, direction, offset, count, maxHops, false);
return getLineage(entityUrn, direction, offset, count, maxHops, false, new HashSet<>());
}

/**
Expand All @@ -45,7 +45,7 @@ public EntityLineageResult getLineage(@Nonnull Urn entityUrn, @Nonnull LineageDi
*/
@Nonnull
public EntityLineageResult getLineage(@Nonnull Urn entityUrn, @Nonnull LineageDirection direction,
int offset, int count, int maxHops, boolean separateSiblings) {
int offset, int count, int maxHops, boolean separateSiblings, @Nonnull Set<Urn> visitedUrns) {
if (separateSiblings) {
return _graphService.getLineage(entityUrn, direction, offset, count, maxHops);
}
Expand Down Expand Up @@ -73,10 +73,15 @@ public EntityLineageResult getLineage(@Nonnull Urn entityUrn, @Nonnull LineageDi
offset = Math.max(0, offset - entityLineage.getTotal());
count = Math.max(0, count - entityLineage.getRelationships().size());

visitedUrns.add(entityUrn);
// iterate through each sibling and include their lineage in the bunch
for (Urn siblingUrn : siblingUrns) {
if (visitedUrns.contains(siblingUrn)) {
continue;
}
// need to call siblingGraphService to get sibling results for this sibling entity in case there is more than one sibling
EntityLineageResult nextEntityLineage = filterLineageResultFromSiblings(siblingUrn, allSiblingsInGroup,
_graphService.getLineage(siblingUrn, direction, offset, count, maxHops), entityLineage);
getLineage(siblingUrn, direction, offset, count, maxHops, false, visitedUrns), entityLineage);

// Update offset and count to fetch the correct number of edges from the next sibling node
offset = Math.max(0, offset - nextEntityLineage.getTotal());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,94 @@ public void testUpstreamOfSiblings() throws Exception {
assertEquals(upstreamLineage, expectedResult);
}

// we should be combining lineage of siblings of siblings
// ie. dataset1 has sibling dataset2. dataset 2 has siblings dataset1 and dataset3. dataset3 has sibling dataset2. dataset3 has upstream dataset4.
// requesting upstream for dataset1 should give us dataset4
@Test
public void testUpstreamOfSiblingSiblings() throws Exception {
EntityLineageResult mockResult = new EntityLineageResult();
EntityLineageResult expectedResult = new EntityLineageResult();

LineageRelationshipArray relationships = new LineageRelationshipArray();
LineageRelationshipArray expectedRelationships = new LineageRelationshipArray();

LineageRelationship relationship = new LineageRelationship();
relationship.setDegree(0);
relationship.setType(downstreamOf);
relationship.setEntity(datasetFourUrn);

relationships.add(relationship);

expectedRelationships.add(relationship);

expectedResult.setCount(1);
expectedResult.setStart(0);
expectedResult.setTotal(1);
expectedResult.setRelationships(expectedRelationships);

mockResult.setStart(0);
mockResult.setTotal(1);
mockResult.setCount(1);
mockResult.setRelationships(relationships);

EntityLineageResult emptyLineageResult = new EntityLineageResult();
emptyLineageResult.setRelationships(new LineageRelationshipArray());
emptyLineageResult.setStart(0);
emptyLineageResult.setTotal(0);
emptyLineageResult.setCount(0);

Mockito.when(_graphService.getLineage(
Mockito.eq(datasetOneUrn), Mockito.eq(LineageDirection.UPSTREAM), Mockito.anyInt(), Mockito.anyInt(), Mockito.eq(1)
)).thenReturn(emptyLineageResult);

Mockito.when(_graphService.getLineage(
Mockito.eq(datasetTwoUrn), Mockito.eq(LineageDirection.UPSTREAM), Mockito.anyInt(), Mockito.anyInt(), Mockito.eq(1)
)).thenReturn(emptyLineageResult);

Mockito.when(_graphService.getLineage(
Mockito.eq(datasetThreeUrn), Mockito.eq(LineageDirection.UPSTREAM), Mockito.anyInt(), Mockito.anyInt(), Mockito.eq(1)
)).thenReturn(mockResult);

Siblings dataset1Siblings = new Siblings();
dataset1Siblings.setPrimary(true);
dataset1Siblings.setSiblings(new UrnArray(ImmutableList.of(datasetTwoUrn)));

Mockito.when(_mockEntityService.getLatestAspect(datasetOneUrn, SIBLINGS_ASPECT_NAME)).thenReturn(dataset1Siblings);

Siblings dataset2Siblings = new Siblings();
dataset2Siblings.setPrimary(true);
dataset2Siblings.setSiblings(new UrnArray(ImmutableList.of(datasetOneUrn, datasetThreeUrn)));

Mockito.when(_mockEntityService.getLatestAspect(datasetTwoUrn, SIBLINGS_ASPECT_NAME)).thenReturn(dataset2Siblings);

Siblings dataset3Siblings = new Siblings();
dataset3Siblings.setPrimary(true);
dataset3Siblings.setSiblings(new UrnArray(ImmutableList.of(datasetTwoUrn)));

Mockito.when(_mockEntityService.getLatestAspect(datasetThreeUrn, SIBLINGS_ASPECT_NAME)).thenReturn(dataset3Siblings);

Siblings dataset4Siblings = new Siblings();
dataset4Siblings.setPrimary(true);
dataset4Siblings.setSiblings(new UrnArray());

Mockito.when(_mockEntityService.getLatestAspect(datasetFourUrn, SIBLINGS_ASPECT_NAME)).thenReturn(dataset4Siblings);

Map<Urn, List<RecordTemplate>> siblingsMap = ImmutableMap.of(
datasetOneUrn, ImmutableList.of(dataset1Siblings),
datasetTwoUrn, ImmutableList.of(dataset2Siblings),
datasetThreeUrn, ImmutableList.of(dataset3Siblings),
datasetFourUrn, ImmutableList.of(dataset4Siblings)
);

Mockito.when(_mockEntityService.getLatestAspects(Mockito.any(), Mockito.any())).thenReturn(siblingsMap);

SiblingGraphService service = _client;

EntityLineageResult upstreamLineage = service.getLineage(datasetOneUrn, LineageDirection.UPSTREAM, 0, 100, 1);

assertEquals(upstreamLineage, expectedResult);
}

static Urn createFromString(@Nonnull String rawUrn) {
try {
return Urn.createFromString(rawUrn);
Expand Down

0 comments on commit d360b42

Please sign in to comment.