Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1433] Add vertex retention to KeyedGrouping. #1470

Open
wants to merge 13 commits into
base: develop
Choose a base branch
from
Open
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
package org.gradoop.flink.io.impl.gdl;

import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.gradoop.common.model.api.entities.GraphHead;
import org.gradoop.common.model.api.entities.Edge;
import org.gradoop.common.model.api.entities.GraphHead;
import org.gradoop.common.model.api.entities.Vertex;
import org.gradoop.flink.model.api.epgm.BaseGraph;
import org.gradoop.flink.model.api.epgm.BaseGraphCollection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,16 @@ default void addKeyToElement(E element, Object key) {
* @return The key type.
*/
TypeInformation<K> getType();

/**
* The element will not be grouped, if this returns true for all key functions and
* {@link org.gradoop.flink.model.impl.operators.keyedgrouping.KeyedGrouping#setRetainUngroupedVertices(boolean)}
* is set to true.
*
* @param element element to test
* @return true, if element should not be grouped
*/
default boolean retainElement(E element) {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package org.gradoop.flink.model.api.functions;

import java.util.Objects;

/**
* A (grouping) key function with a default value. The value will be used in some cases where the key can
* not be determined or when this key function is not applicable for an element.<p>
Expand All @@ -31,4 +33,18 @@ public interface KeyFunctionWithDefaultValue<E, K> extends KeyFunction<E, K> {
* @return The default key.
*/
K getDefaultKey();

/**
* {@inheritDoc}
* <p>
* The default implementation of this method compares the key to the default key provided by this class
* using {@link Objects#deepEquals(Object, Object)}.
*
* @param element The key to check.
* @return {@code true}, if the key is a default key for this key-function.
*/
@Override
default boolean retainElement(E element) {
return Objects.deepEquals(getDefaultKey(), getKey(element));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -779,13 +779,10 @@ GC extends BaseGraphCollection<G, V, E, LG, GC>> UnaryBaseGraphToBaseGraphOperat
retainVerticesWithoutGroup);
break;
case GROUP_WITH_KEYFUNCTIONS:
if (retainVerticesWithoutGroup) {
throw new UnsupportedOperationException("Retaining vertices without group is not yet supported" +
" with this strategy.");
}
groupingOperator = KeyedGroupingUtils.createInstance(
groupingOperator = KeyedGroupingUtils.<G, V, E, LG, GC>createInstance(
useVertexLabel, useEdgeLabel, vertexLabelGroups, edgeLabelGroups,
globalVertexAggregateFunctions, globalEdgeAggregateFunctions);
globalVertexAggregateFunctions, globalEdgeAggregateFunctions)
.setRetainUngroupedVertices(retainVerticesWithoutGroup);
break;
default:
throw new IllegalArgumentException("Unsupported strategy: " + strategy);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.gradoop.flink.model.impl.operators.keyedgrouping;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
Expand All @@ -27,16 +28,24 @@
import org.gradoop.flink.model.api.functions.AggregateFunction;
import org.gradoop.flink.model.api.functions.KeyFunction;
import org.gradoop.flink.model.api.operators.UnaryBaseGraphToBaseGraphOperator;
import org.gradoop.flink.model.impl.functions.epgm.Id;
import org.gradoop.flink.model.impl.functions.filters.Not;
import org.gradoop.flink.model.impl.functions.utils.LeftSide;
import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.BuildSuperEdgeFromTuple;
import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.BuildSuperVertexFromTuple;
import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.BuildTuplesFromEdges;
import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.BuildTuplesFromEdgesWithId;
import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.BuildTuplesFromVertices;
import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.CreateElementMappingToSelf;
import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.FilterEdgesToGroup;
import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.FilterSuperVertices;
import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.GroupingConstants;
import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.PickRetainedEdgeIDs;
import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.ReduceEdgeTuples;
import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.ReduceVertexTuples;
import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.UpdateIdField;
import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.UpdateIdFieldAndMarkTuple;
import org.gradoop.flink.model.impl.operators.keyedgrouping.labelspecific.WithAllKeysRetained;

import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -93,6 +102,11 @@ public class KeyedGrouping<
*/
private boolean useGroupCombine = true;

/**
* Should vertices without groups be retained in the result as is.
*/
private boolean retainUngroupedVertices = false;

/**
* Instantiate this grouping function.
*
Expand Down Expand Up @@ -123,33 +137,63 @@ public KeyedGrouping(List<KeyFunction<V, ?>> vertexGroupingKeys,
@Override
public LG execute(LG graph) {
/* First we create tuple representations of each vertex.
If retention of ungrouped vertices is enabled, we filter out retained vertices prior to this step.
Those tuples will then be grouped by the respective key fields (the fields containing the values
extracted by the key functions) and reduced to assign a super vertex and to calculate aggregates. */
DataSet<Tuple> verticesWithSuperVertex = graph.getVertices()
DataSet<V> vertices = graph.getVertices();
DataSet<V> ungrouped = vertices;
if (retainUngroupedVertices) {
final FilterFunction<V> retentionSelector = new WithAllKeysRetained<>(vertexGroupingKeys);
ungrouped = ungrouped.filter(retentionSelector);
vertices = vertices.filter(new Not<>(retentionSelector));
}

/* Group vertices and create super-vertices. Do not yet remove the input vertices. */
DataSet<Tuple> verticesWithSuperVertex = vertices
.map(new BuildTuplesFromVertices<>(vertexGroupingKeys, vertexAggregateFunctions))
.groupBy(getInternalVertexGroupingKeys())
.reduceGroup(new ReduceVertexTuples<>(
GroupingConstants.VERTEX_TUPLE_RESERVED + vertexGroupingKeys.size(), vertexAggregateFunctions));

/* Extract a mapping from vertex-ID to super-vertex-ID from the result of the vertex-reduce step. */
DataSet<Tuple2<GradoopId, GradoopId>> idToSuperId = verticesWithSuperVertex
.filter(new Not<>(new FilterSuperVertices<>()))
.project(GroupingConstants.VERTEX_TUPLE_ID, GroupingConstants.VERTEX_TUPLE_SUPERID);
if (retainUngroupedVertices) {
/* Retained vertices will be mapped to themselves, instead of a super-vertex. */
idToSuperId = idToSuperId.union(ungrouped.map(new CreateElementMappingToSelf<>()));
}

final int edgeOffset = retainUngroupedVertices ?
GroupingConstants.EDGE_RETENTION_OFFSET : GroupingConstants.EDGE_DEFAULT_OFFSET;
/* Create tuple representations of each edge and update the source- and target-ids of those tuples with
with the mapping extracted in the previous step. Edges will then point from and to super-vertices. */
with the mapping extracted in the previous step. Edges will then point from and to super-vertices.
When retention of ungrouped vertices is enabled, we keep track of edge IDs to pick those that point
to and from retained vertices later. The ID is stored at the beginning of the tuple, we therefore
add some additional offset for these operations. */
DataSet<Tuple> edgesWithUpdatedIds = graph.getEdges()
.map(new BuildTuplesFromEdges<>(edgeGroupingKeys, edgeAggregateFunctions))
.map(retainUngroupedVertices ?
new BuildTuplesFromEdgesWithId<>(edgeGroupingKeys, edgeAggregateFunctions) :
new BuildTuplesFromEdges<>(edgeGroupingKeys, edgeAggregateFunctions))
.join(idToSuperId)
.where(GroupingConstants.EDGE_TUPLE_SOURCEID)
.where(GroupingConstants.EDGE_TUPLE_SOURCEID + edgeOffset)
.equalTo(GroupingConstants.VERTEX_TUPLE_ID)
.with(new UpdateIdField<>(GroupingConstants.EDGE_TUPLE_SOURCEID))
.with(retainUngroupedVertices ?
new UpdateIdFieldAndMarkTuple<>(GroupingConstants.EDGE_TUPLE_SOURCEID) :
new UpdateIdField<>(GroupingConstants.EDGE_TUPLE_SOURCEID))
.join(idToSuperId)
.where(GroupingConstants.EDGE_TUPLE_TARGETID)
.where(GroupingConstants.EDGE_TUPLE_TARGETID + edgeOffset)
.equalTo(GroupingConstants.VERTEX_TUPLE_ID)
.with(new UpdateIdField<>(GroupingConstants.EDGE_TUPLE_TARGETID));
.with(retainUngroupedVertices ?
new UpdateIdFieldAndMarkTuple<>(GroupingConstants.EDGE_TUPLE_TARGETID) :
new UpdateIdField<>(GroupingConstants.EDGE_TUPLE_TARGETID));

/* Group the edge-tuples by the key fields and vertex IDs and reduce them to single elements. */
DataSet<Tuple> superEdgeTuples = edgesWithUpdatedIds
/* Group the edge-tuples by the key fields and vertex IDs and reduce them to single elements.
When retention of ungrouped vertices is enabled, we have to filter out edges marked for retention
before the grouping step and then project to remove the additional ID field. */
DataSet<Tuple> superEdgeTuples = (retainUngroupedVertices ? edgesWithUpdatedIds
.filter(new FilterEdgesToGroup<>())
.project(getInternalEdgeProjectionIndices()) : edgesWithUpdatedIds)
.groupBy(getInternalEdgeGroupingKeys())
.reduceGroup(new ReduceEdgeTuples<>(
GroupingConstants.EDGE_TUPLE_RESERVED + edgeGroupingKeys.size(), edgeAggregateFunctions))
Expand All @@ -167,6 +211,18 @@ Those tuples will then be grouped by the respective key fields (the fields conta
.map(new BuildSuperEdgeFromTuple<>(edgeGroupingKeys, edgeAggregateFunctions,
graph.getFactory().getEdgeFactory()));

if (retainUngroupedVertices) {
/* We have to add the previously filtered vertices back. */
superVertices = superVertices.union(ungrouped);
/* We have to select the retained edges and add them back. */
DataSet<GradoopId> retainedEdgeIds = edgesWithUpdatedIds.flatMap(new PickRetainedEdgeIDs<>());
DataSet<E> retainedEdges = graph.getEdges().join(retainedEdgeIds)
.where(new Id<>())
.equalTo("*")
.with(new LeftSide<>());
superEdges = superEdges.union(retainedEdges);
}

return graph.getFactory().fromDataSets(superVertices, superEdges);
}

Expand All @@ -190,6 +246,20 @@ private int[] getInternalVertexGroupingKeys() {
GroupingConstants.VERTEX_TUPLE_RESERVED + vertexGroupingKeys.size()).toArray();
}

/**
* Get the indices to which edge tuples should be projected to remove the additional and at this stage
* no longer required {@link GroupingConstants#EDGE_TUPLE_ID} field. This will effectively return all
* the indices of all fields, except for that ID field.<p>
* This is only needed when {@link #retainUngroupedVertices} is enabled.
*
* @return The edge tuple indices.
*/
private int[] getInternalEdgeProjectionIndices() {
return IntStream.range(GroupingConstants.EDGE_RETENTION_OFFSET, GroupingConstants.EDGE_RETENTION_OFFSET +
GroupingConstants.EDGE_TUPLE_RESERVED + edgeGroupingKeys.size() + edgeAggregateFunctions.size())
.toArray();
}

/**
* Enable or disable an optional combine step before the reduce step.
* Note that this currently only affects the edge reduce step.
Expand All @@ -203,4 +273,25 @@ public KeyedGrouping<G, V, E, LG, GC> setUseGroupCombine(boolean useGroupCombine
this.useGroupCombine = useGroupCombine;
return this;
}

/**
* Enable or disable vertex retention.
* Vertices will be retained, if all key functions return true for
* {@link KeyFunction#retainElement(Object)}.
* For example {@code KeyFunctionWithDefaultValue} returns true,
* if the extracted key equals the default key.
* <p>
* Retained vertices will not be grouped and returned without modifications.
* Edges between retained vertices will not be grouped and returned without modifications.
* Edges between retained vertices and grouped vertices will be grouped.
* <p>
* This is disabled per default.
*
* @param retainVertices Should vertices be retained?
* @return This operator.
*/
public KeyedGrouping<G, V, E, LG, GC> setRetainUngroupedVertices(boolean retainVertices) {
this.retainUngroupedVertices = retainVertices;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,43 @@
*/
public class BuildTuplesFromEdges<E extends Edge> extends BuildTuplesFromElements<E> {

/**
* An additional edge offset. All tuple accesses will be shifted by this value.
*/
private final int offset;

/**
* Initialize this function, setting the grouping keys and aggregate functions.<p>
* This constructor will consider additional reserved fields in the edge tuple.
*
* @param keys The grouping keys.
* @param aggregateFunctions The aggregate functions used to determine the aggregate property
* @param additionalOffset An additional number of fields to be reserved in edge tuples.
*/
protected BuildTuplesFromEdges(List<KeyFunction<E, ?>> keys, List<AggregateFunction> aggregateFunctions,
int additionalOffset) {
super(GroupingConstants.EDGE_TUPLE_RESERVED + additionalOffset, keys, aggregateFunctions);
if (additionalOffset < 0) {
throw new IllegalArgumentException("Additional offset can not be negative: " + additionalOffset);
}
this.offset = additionalOffset;
}

/**
* Initialize this function, setting the grouping keys and aggregate functions.
*
* @param keys The grouping keys.
* @param aggregateFunctions The aggregate functions used to determine the aggregate property
*/
public BuildTuplesFromEdges(List<KeyFunction<E, ?>> keys, List<AggregateFunction> aggregateFunctions) {
super(GroupingConstants.EDGE_TUPLE_RESERVED, keys, aggregateFunctions);
this(keys, aggregateFunctions, 0);
}

@Override
public Tuple map(E element) throws Exception {
final Tuple result = super.map(element);
result.setField(element.getSourceId(), GroupingConstants.EDGE_TUPLE_SOURCEID);
result.setField(element.getTargetId(), GroupingConstants.EDGE_TUPLE_TARGETID);
result.setField(element.getSourceId(), GroupingConstants.EDGE_TUPLE_SOURCEID + offset);
result.setField(element.getTargetId(), GroupingConstants.EDGE_TUPLE_TARGETID + offset);
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.keyedgrouping.functions;

import org.apache.flink.api.java.tuple.Tuple;
import org.gradoop.common.model.api.entities.Edge;
import org.gradoop.flink.model.api.functions.AggregateFunction;
import org.gradoop.flink.model.api.functions.KeyFunction;

import java.util.List;

/**
* Build a tuple-based representation of edges for grouping with an additional source ID field at position
* {@value GroupingConstants#EDGE_TUPLE_ID}. All other fields will be shifted by
* {@value GroupingConstants#EDGE_RETENTION_OFFSET}.
*
* @param <E> The edge type.
*/
public class BuildTuplesFromEdgesWithId<E extends Edge> extends BuildTuplesFromEdges<E> {

/**
* Initialize this function, setting the grouping keys and aggregate functions.
*
* @param keys The edge grouping keys.
* @param aggregateFunctions The edge aggregate functions.
*/
public BuildTuplesFromEdgesWithId(List<KeyFunction<E, ?>> keys,
List<AggregateFunction> aggregateFunctions) {
super(keys, aggregateFunctions, GroupingConstants.EDGE_RETENTION_OFFSET);
}

@Override
public Tuple map(E element) throws Exception {
final Tuple tuple = super.map(element);
tuple.setField(element.getId(), GroupingConstants.EDGE_TUPLE_ID);
return tuple;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ public BuildTuplesFromElements(int tupleDataOffset, List<KeyFunction<E, ?>> keys
reuseTuple = Tuple.newInstance(tupleSize);
// Fill first fields with default ID values.
for (int i = 0; i < tupleDataOffset; i++) {
reuseTuple.setField(GradoopId.NULL_VALUE, i);
// The copy is needed to protect NULL_VALUE from changes. See #1466
reuseTuple.setField(GradoopId.NULL_VALUE.copy(), i);
}
}

Expand Down
Loading