diff --git a/gradoop-flink/src/main/java/org/gradoop/flink/io/impl/gdl/GDLConsoleOutput.java b/gradoop-flink/src/main/java/org/gradoop/flink/io/impl/gdl/GDLConsoleOutput.java index 43ddb4a98932..eef0b0e44b80 100644 --- a/gradoop-flink/src/main/java/org/gradoop/flink/io/impl/gdl/GDLConsoleOutput.java +++ b/gradoop-flink/src/main/java/org/gradoop/flink/io/impl/gdl/GDLConsoleOutput.java @@ -16,8 +16,8 @@ package org.gradoop.flink.io.impl.gdl; import org.apache.flink.api.java.io.LocalCollectionOutputFormat; -import org.gradoop.common.model.api.entities.GraphHead; import org.gradoop.common.model.api.entities.Edge; +import org.gradoop.common.model.api.entities.GraphHead; import org.gradoop.common.model.api.entities.Vertex; import org.gradoop.flink.model.api.epgm.BaseGraph; import org.gradoop.flink.model.api.epgm.BaseGraphCollection; diff --git a/gradoop-flink/src/main/java/org/gradoop/flink/model/api/functions/KeyFunction.java b/gradoop-flink/src/main/java/org/gradoop/flink/model/api/functions/KeyFunction.java index 99a931536e8e..1080cfc844d1 100644 --- a/gradoop-flink/src/main/java/org/gradoop/flink/model/api/functions/KeyFunction.java +++ b/gradoop-flink/src/main/java/org/gradoop/flink/model/api/functions/KeyFunction.java @@ -54,4 +54,16 @@ default void addKeyToElement(E element, Object key) { * @return The key type. */ TypeInformation getType(); + + /** + * The element will not be grouped, if this returns true for all key functions and + * {@link org.gradoop.flink.model.impl.operators.keyedgrouping.KeyedGrouping#setRetainUngroupedVertices(boolean)} + * is set to true. + * + * @param element element to test + * @return true, if element should not be grouped + */ + default boolean retainElement(E element) { + return false; + } } diff --git a/gradoop-flink/src/main/java/org/gradoop/flink/model/api/functions/KeyFunctionWithDefaultValue.java b/gradoop-flink/src/main/java/org/gradoop/flink/model/api/functions/KeyFunctionWithDefaultValue.java index 56ae90316f57..3306d60e72ec 100644 --- a/gradoop-flink/src/main/java/org/gradoop/flink/model/api/functions/KeyFunctionWithDefaultValue.java +++ b/gradoop-flink/src/main/java/org/gradoop/flink/model/api/functions/KeyFunctionWithDefaultValue.java @@ -15,6 +15,8 @@ */ package org.gradoop.flink.model.api.functions; +import java.util.Objects; + /** * A (grouping) key function with a default value. The value will be used in some cases where the key can * not be determined or when this key function is not applicable for an element.

@@ -31,4 +33,18 @@ public interface KeyFunctionWithDefaultValue extends KeyFunction { * @return The default key. */ K getDefaultKey(); + + /** + * {@inheritDoc} + *

+ * The default implementation of this method compares the key to the default key provided by this class + * using {@link Objects#deepEquals(Object, Object)}. + * + * @param element The key to check. + * @return {@code true}, if the key is a default key for this key-function. + */ + @Override + default boolean retainElement(E element) { + return Objects.deepEquals(getDefaultKey(), getKey(element)); + } } diff --git a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/grouping/Grouping.java b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/grouping/Grouping.java index 4d7fd6e5d34e..45b34e93cc6b 100644 --- a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/grouping/Grouping.java +++ b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/grouping/Grouping.java @@ -779,13 +779,10 @@ GC extends BaseGraphCollection> UnaryBaseGraphToBaseGraphOperat retainVerticesWithoutGroup); break; case GROUP_WITH_KEYFUNCTIONS: - if (retainVerticesWithoutGroup) { - throw new UnsupportedOperationException("Retaining vertices without group is not yet supported" + - " with this strategy."); - } - groupingOperator = KeyedGroupingUtils.createInstance( + groupingOperator = KeyedGroupingUtils.createInstance( useVertexLabel, useEdgeLabel, vertexLabelGroups, edgeLabelGroups, - globalVertexAggregateFunctions, globalEdgeAggregateFunctions); + globalVertexAggregateFunctions, globalEdgeAggregateFunctions) + .setRetainUngroupedVertices(retainVerticesWithoutGroup); break; default: throw new IllegalArgumentException("Unsupported strategy: " + strategy); diff --git a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/KeyedGrouping.java b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/KeyedGrouping.java index fdb681d726d2..ff0e0a74d43d 100644 --- a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/KeyedGrouping.java +++ b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/KeyedGrouping.java @@ -15,6 +15,7 @@ */ package org.gradoop.flink.model.impl.operators.keyedgrouping; +import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; @@ -27,16 +28,24 @@ import org.gradoop.flink.model.api.functions.AggregateFunction; import org.gradoop.flink.model.api.functions.KeyFunction; import org.gradoop.flink.model.api.operators.UnaryBaseGraphToBaseGraphOperator; +import org.gradoop.flink.model.impl.functions.epgm.Id; import org.gradoop.flink.model.impl.functions.filters.Not; +import org.gradoop.flink.model.impl.functions.utils.LeftSide; import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.BuildSuperEdgeFromTuple; import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.BuildSuperVertexFromTuple; import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.BuildTuplesFromEdges; +import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.BuildTuplesFromEdgesWithId; import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.BuildTuplesFromVertices; +import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.CreateElementMappingToSelf; +import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.FilterEdgesToGroup; import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.FilterSuperVertices; import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.GroupingConstants; +import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.PickRetainedEdgeIDs; import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.ReduceEdgeTuples; import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.ReduceVertexTuples; import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.UpdateIdField; +import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.UpdateIdFieldAndMarkTuple; +import org.gradoop.flink.model.impl.operators.keyedgrouping.labelspecific.WithAllKeysRetained; import java.util.Collections; import java.util.List; @@ -93,6 +102,11 @@ public class KeyedGrouping< */ private boolean useGroupCombine = true; + /** + * Should vertices without groups be retained in the result as is. + */ + private boolean retainUngroupedVertices = false; + /** * Instantiate this grouping function. * @@ -123,33 +137,63 @@ public KeyedGrouping(List> vertexGroupingKeys, @Override public LG execute(LG graph) { /* First we create tuple representations of each vertex. + If retention of ungrouped vertices is enabled, we filter out retained vertices prior to this step. Those tuples will then be grouped by the respective key fields (the fields containing the values extracted by the key functions) and reduced to assign a super vertex and to calculate aggregates. */ - DataSet verticesWithSuperVertex = graph.getVertices() + DataSet vertices = graph.getVertices(); + DataSet ungrouped = vertices; + if (retainUngroupedVertices) { + final FilterFunction retentionSelector = new WithAllKeysRetained<>(vertexGroupingKeys); + ungrouped = ungrouped.filter(retentionSelector); + vertices = vertices.filter(new Not<>(retentionSelector)); + } + + /* Group vertices and create super-vertices. Do not yet remove the input vertices. */ + DataSet verticesWithSuperVertex = vertices .map(new BuildTuplesFromVertices<>(vertexGroupingKeys, vertexAggregateFunctions)) .groupBy(getInternalVertexGroupingKeys()) .reduceGroup(new ReduceVertexTuples<>( GroupingConstants.VERTEX_TUPLE_RESERVED + vertexGroupingKeys.size(), vertexAggregateFunctions)); + /* Extract a mapping from vertex-ID to super-vertex-ID from the result of the vertex-reduce step. */ DataSet> idToSuperId = verticesWithSuperVertex .filter(new Not<>(new FilterSuperVertices<>())) .project(GroupingConstants.VERTEX_TUPLE_ID, GroupingConstants.VERTEX_TUPLE_SUPERID); + if (retainUngroupedVertices) { + /* Retained vertices will be mapped to themselves, instead of a super-vertex. */ + idToSuperId = idToSuperId.union(ungrouped.map(new CreateElementMappingToSelf<>())); + } + final int edgeOffset = retainUngroupedVertices ? + GroupingConstants.EDGE_RETENTION_OFFSET : GroupingConstants.EDGE_DEFAULT_OFFSET; /* Create tuple representations of each edge and update the source- and target-ids of those tuples with - with the mapping extracted in the previous step. Edges will then point from and to super-vertices. */ + with the mapping extracted in the previous step. Edges will then point from and to super-vertices. + When retention of ungrouped vertices is enabled, we keep track of edge IDs to pick those that point + to and from retained vertices later. The ID is stored at the beginning of the tuple, we therefore + add some additional offset for these operations. */ DataSet edgesWithUpdatedIds = graph.getEdges() - .map(new BuildTuplesFromEdges<>(edgeGroupingKeys, edgeAggregateFunctions)) + .map(retainUngroupedVertices ? + new BuildTuplesFromEdgesWithId<>(edgeGroupingKeys, edgeAggregateFunctions) : + new BuildTuplesFromEdges<>(edgeGroupingKeys, edgeAggregateFunctions)) .join(idToSuperId) - .where(GroupingConstants.EDGE_TUPLE_SOURCEID) + .where(GroupingConstants.EDGE_TUPLE_SOURCEID + edgeOffset) .equalTo(GroupingConstants.VERTEX_TUPLE_ID) - .with(new UpdateIdField<>(GroupingConstants.EDGE_TUPLE_SOURCEID)) + .with(retainUngroupedVertices ? + new UpdateIdFieldAndMarkTuple<>(GroupingConstants.EDGE_TUPLE_SOURCEID) : + new UpdateIdField<>(GroupingConstants.EDGE_TUPLE_SOURCEID)) .join(idToSuperId) - .where(GroupingConstants.EDGE_TUPLE_TARGETID) + .where(GroupingConstants.EDGE_TUPLE_TARGETID + edgeOffset) .equalTo(GroupingConstants.VERTEX_TUPLE_ID) - .with(new UpdateIdField<>(GroupingConstants.EDGE_TUPLE_TARGETID)); + .with(retainUngroupedVertices ? + new UpdateIdFieldAndMarkTuple<>(GroupingConstants.EDGE_TUPLE_TARGETID) : + new UpdateIdField<>(GroupingConstants.EDGE_TUPLE_TARGETID)); - /* Group the edge-tuples by the key fields and vertex IDs and reduce them to single elements. */ - DataSet superEdgeTuples = edgesWithUpdatedIds + /* Group the edge-tuples by the key fields and vertex IDs and reduce them to single elements. + When retention of ungrouped vertices is enabled, we have to filter out edges marked for retention + before the grouping step and then project to remove the additional ID field. */ + DataSet superEdgeTuples = (retainUngroupedVertices ? edgesWithUpdatedIds + .filter(new FilterEdgesToGroup<>()) + .project(getInternalEdgeProjectionIndices()) : edgesWithUpdatedIds) .groupBy(getInternalEdgeGroupingKeys()) .reduceGroup(new ReduceEdgeTuples<>( GroupingConstants.EDGE_TUPLE_RESERVED + edgeGroupingKeys.size(), edgeAggregateFunctions)) @@ -167,6 +211,18 @@ Those tuples will then be grouped by the respective key fields (the fields conta .map(new BuildSuperEdgeFromTuple<>(edgeGroupingKeys, edgeAggregateFunctions, graph.getFactory().getEdgeFactory())); + if (retainUngroupedVertices) { + /* We have to add the previously filtered vertices back. */ + superVertices = superVertices.union(ungrouped); + /* We have to select the retained edges and add them back. */ + DataSet retainedEdgeIds = edgesWithUpdatedIds.flatMap(new PickRetainedEdgeIDs<>()); + DataSet retainedEdges = graph.getEdges().join(retainedEdgeIds) + .where(new Id<>()) + .equalTo("*") + .with(new LeftSide<>()); + superEdges = superEdges.union(retainedEdges); + } + return graph.getFactory().fromDataSets(superVertices, superEdges); } @@ -190,6 +246,20 @@ private int[] getInternalVertexGroupingKeys() { GroupingConstants.VERTEX_TUPLE_RESERVED + vertexGroupingKeys.size()).toArray(); } + /** + * Get the indices to which edge tuples should be projected to remove the additional and at this stage + * no longer required {@link GroupingConstants#EDGE_TUPLE_ID} field. This will effectively return all + * the indices of all fields, except for that ID field.

+ * This is only needed when {@link #retainUngroupedVertices} is enabled. + * + * @return The edge tuple indices. + */ + private int[] getInternalEdgeProjectionIndices() { + return IntStream.range(GroupingConstants.EDGE_RETENTION_OFFSET, GroupingConstants.EDGE_RETENTION_OFFSET + + GroupingConstants.EDGE_TUPLE_RESERVED + edgeGroupingKeys.size() + edgeAggregateFunctions.size()) + .toArray(); + } + /** * Enable or disable an optional combine step before the reduce step. * Note that this currently only affects the edge reduce step. @@ -203,4 +273,25 @@ public KeyedGrouping setUseGroupCombine(boolean useGroupCombine this.useGroupCombine = useGroupCombine; return this; } + + /** + * Enable or disable vertex retention. + * Vertices will be retained, if all key functions return true for + * {@link KeyFunction#retainElement(Object)}. + * For example {@code KeyFunctionWithDefaultValue} returns true, + * if the extracted key equals the default key. + *

+ * Retained vertices will not be grouped and returned without modifications. + * Edges between retained vertices will not be grouped and returned without modifications. + * Edges between retained vertices and grouped vertices will be grouped. + *

+ * This is disabled per default. + * + * @param retainVertices Should vertices be retained? + * @return This operator. + */ + public KeyedGrouping setRetainUngroupedVertices(boolean retainVertices) { + this.retainUngroupedVertices = retainVertices; + return this; + } } diff --git a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/BuildTuplesFromEdges.java b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/BuildTuplesFromEdges.java index 206c7de5065a..e1a175d4d491 100644 --- a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/BuildTuplesFromEdges.java +++ b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/BuildTuplesFromEdges.java @@ -30,6 +30,28 @@ */ public class BuildTuplesFromEdges extends BuildTuplesFromElements { + /** + * An additional edge offset. All tuple accesses will be shifted by this value. + */ + private final int offset; + + /** + * Initialize this function, setting the grouping keys and aggregate functions.

+ * This constructor will consider additional reserved fields in the edge tuple. + * + * @param keys The grouping keys. + * @param aggregateFunctions The aggregate functions used to determine the aggregate property + * @param additionalOffset An additional number of fields to be reserved in edge tuples. + */ + protected BuildTuplesFromEdges(List> keys, List aggregateFunctions, + int additionalOffset) { + super(GroupingConstants.EDGE_TUPLE_RESERVED + additionalOffset, keys, aggregateFunctions); + if (additionalOffset < 0) { + throw new IllegalArgumentException("Additional offset can not be negative: " + additionalOffset); + } + this.offset = additionalOffset; + } + /** * Initialize this function, setting the grouping keys and aggregate functions. * @@ -37,14 +59,14 @@ public class BuildTuplesFromEdges extends BuildTuplesFromElement * @param aggregateFunctions The aggregate functions used to determine the aggregate property */ public BuildTuplesFromEdges(List> keys, List aggregateFunctions) { - super(GroupingConstants.EDGE_TUPLE_RESERVED, keys, aggregateFunctions); + this(keys, aggregateFunctions, 0); } @Override public Tuple map(E element) throws Exception { final Tuple result = super.map(element); - result.setField(element.getSourceId(), GroupingConstants.EDGE_TUPLE_SOURCEID); - result.setField(element.getTargetId(), GroupingConstants.EDGE_TUPLE_TARGETID); + result.setField(element.getSourceId(), GroupingConstants.EDGE_TUPLE_SOURCEID + offset); + result.setField(element.getTargetId(), GroupingConstants.EDGE_TUPLE_TARGETID + offset); return result; } } diff --git a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/BuildTuplesFromEdgesWithId.java b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/BuildTuplesFromEdgesWithId.java new file mode 100644 index 000000000000..d06ff56a03e9 --- /dev/null +++ b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/BuildTuplesFromEdgesWithId.java @@ -0,0 +1,51 @@ +/* + * Copyright © 2014 - 2021 Leipzig University (Database Research Group) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.gradoop.flink.model.impl.operators.keyedgrouping.functions; + +import org.apache.flink.api.java.tuple.Tuple; +import org.gradoop.common.model.api.entities.Edge; +import org.gradoop.flink.model.api.functions.AggregateFunction; +import org.gradoop.flink.model.api.functions.KeyFunction; + +import java.util.List; + +/** + * Build a tuple-based representation of edges for grouping with an additional source ID field at position + * {@value GroupingConstants#EDGE_TUPLE_ID}. All other fields will be shifted by + * {@value GroupingConstants#EDGE_RETENTION_OFFSET}. + * + * @param The edge type. + */ +public class BuildTuplesFromEdgesWithId extends BuildTuplesFromEdges { + + /** + * Initialize this function, setting the grouping keys and aggregate functions. + * + * @param keys The edge grouping keys. + * @param aggregateFunctions The edge aggregate functions. + */ + public BuildTuplesFromEdgesWithId(List> keys, + List aggregateFunctions) { + super(keys, aggregateFunctions, GroupingConstants.EDGE_RETENTION_OFFSET); + } + + @Override + public Tuple map(E element) throws Exception { + final Tuple tuple = super.map(element); + tuple.setField(element.getId(), GroupingConstants.EDGE_TUPLE_ID); + return tuple; + } +} diff --git a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/BuildTuplesFromElements.java b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/BuildTuplesFromElements.java index f65c6325dac8..2b32b86332e5 100644 --- a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/BuildTuplesFromElements.java +++ b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/BuildTuplesFromElements.java @@ -105,7 +105,8 @@ public BuildTuplesFromElements(int tupleDataOffset, List> keys reuseTuple = Tuple.newInstance(tupleSize); // Fill first fields with default ID values. for (int i = 0; i < tupleDataOffset; i++) { - reuseTuple.setField(GradoopId.NULL_VALUE, i); + // The copy is needed to protect NULL_VALUE from changes. See #1466 + reuseTuple.setField(GradoopId.NULL_VALUE.copy(), i); } } diff --git a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/CreateElementMappingToSelf.java b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/CreateElementMappingToSelf.java new file mode 100644 index 000000000000..355b88970ae1 --- /dev/null +++ b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/CreateElementMappingToSelf.java @@ -0,0 +1,42 @@ +/* + * Copyright © 2014 - 2021 Leipzig University (Database Research Group) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.gradoop.flink.model.impl.operators.keyedgrouping.functions; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.gradoop.common.model.api.entities.Identifiable; +import org.gradoop.common.model.impl.id.GradoopId; + +/** + * Create a mapping (in the form of a {@link Tuple2}) from the ID of an element to itself. + * + * @param The element type. + */ +public class CreateElementMappingToSelf + implements MapFunction> { + + /** + * Reduce object instantiations. + */ + private final Tuple2 reuse = new Tuple2<>(); + + @Override + public Tuple2 map(E element) { + reuse.f0 = element.getId(); + reuse.f1 = element.getId(); + return reuse; + } +} diff --git a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/FilterEdgesToGroup.java b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/FilterEdgesToGroup.java new file mode 100644 index 000000000000..699f2da241c4 --- /dev/null +++ b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/FilterEdgesToGroup.java @@ -0,0 +1,33 @@ +/* + * Copyright © 2014 - 2021 Leipzig University (Database Research Group) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.gradoop.flink.model.impl.operators.keyedgrouping.functions; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.java.tuple.Tuple; +import org.gradoop.common.model.impl.id.GradoopId; + +/** + * A filter function accepting all edges that were not marked for retention. + * + * @param The edge tuple type. + */ +public class FilterEdgesToGroup implements FilterFunction { + + @Override + public boolean filter(T tuple) { + return tuple.getField(GroupingConstants.EDGE_TUPLE_ID).equals(GradoopId.NULL_VALUE); + } +} diff --git a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/FilterSuperVertices.java b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/FilterSuperVertices.java index 9fdf5ff3cfdd..85c8777c3279 100644 --- a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/FilterSuperVertices.java +++ b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/FilterSuperVertices.java @@ -16,7 +16,6 @@ package org.gradoop.flink.model.impl.operators.keyedgrouping.functions; import org.apache.flink.api.common.functions.FilterFunction; -import org.apache.flink.api.java.functions.FunctionAnnotation; import org.apache.flink.api.java.tuple.Tuple; import org.gradoop.common.model.impl.id.GradoopId; @@ -26,7 +25,6 @@ * * @param The type of the vertex-tuples. */ -@FunctionAnnotation.ReadFields({"f0", "f1"}) public class FilterSuperVertices implements FilterFunction { @Override diff --git a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/GroupingConstants.java b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/GroupingConstants.java index 606c981d320c..c1b0ce15ad3d 100644 --- a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/GroupingConstants.java +++ b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/GroupingConstants.java @@ -43,4 +43,18 @@ public abstract class GroupingConstants { * The number of reserved fields in the tuple-representation of an edge. */ public static final int EDGE_TUPLE_RESERVED = 2; + /** + * The number of additionally reserved fields in the tuple-representation of an edge. + */ + public static final int EDGE_DEFAULT_OFFSET = 0; + /** + * The number of additionally reserved fields in the tuple-representation of an edge, when retention of + * ungrouped vertices is enabled. + */ + public static final int EDGE_RETENTION_OFFSET = 1; + /** + * The index of the ID in the tuple-representation of an edge. This will only be available + * if retention of ungrouped vertices is enabled. + */ + public static final int EDGE_TUPLE_ID = 0; } diff --git a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/PickRetainedEdgeIDs.java b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/PickRetainedEdgeIDs.java new file mode 100644 index 000000000000..8c8dbdca2c71 --- /dev/null +++ b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/PickRetainedEdgeIDs.java @@ -0,0 +1,37 @@ +/* + * Copyright © 2014 - 2021 Leipzig University (Database Research Group) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.gradoop.flink.model.impl.operators.keyedgrouping.functions; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.util.Collector; +import org.gradoop.common.model.impl.id.GradoopId; + +/** + * Picks the ID from a tuple if that ID is not {@link GradoopId#NULL_VALUE}. + * + * @param The input tuple type. + */ +public class PickRetainedEdgeIDs implements FlatMapFunction { + + @Override + public void flatMap(T tuple, Collector collector) { + final GradoopId id = tuple.getField(GroupingConstants.EDGE_TUPLE_ID); + if (!id.equals(GradoopId.NULL_VALUE)) { + collector.collect(id); + } + } +} diff --git a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/UpdateIdField.java b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/UpdateIdField.java index 1b991dcee5b5..c3dcf9cbfcfd 100644 --- a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/UpdateIdField.java +++ b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/UpdateIdField.java @@ -45,7 +45,9 @@ public UpdateIdField(int index) { @Override public T join(T inputTuple, Tuple2 updateValue) throws Exception { - inputTuple.setField(updateValue.f1, index); + if (updateValue != null) { + inputTuple.setField(updateValue.f1, index); + } return inputTuple; } } diff --git a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/UpdateIdFieldAndMarkTuple.java b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/UpdateIdFieldAndMarkTuple.java new file mode 100644 index 000000000000..84b0c32384a8 --- /dev/null +++ b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/UpdateIdFieldAndMarkTuple.java @@ -0,0 +1,60 @@ +/* + * Copyright © 2014 - 2021 Leipzig University (Database Research Group) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.gradoop.flink.model.impl.operators.keyedgrouping.functions; + +import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple2; +import org.gradoop.common.model.impl.id.GradoopId; + +/** + * Updates the an ID field of an edge tuple to the ID of the corresponding super vertex.

+ * This function is used when retention of ungrouped vertices is enabled. In this case edge tuples have an + * additional ID field. This field will initially be equal to the ID of the edge. When the ID field is + * updated by this function, that field will be set to {@link GradoopId#NULL_VALUE} instead. + * + * @param The edge tuple type. + */ +public class UpdateIdFieldAndMarkTuple + implements JoinFunction, T> { + + /** + * The index of the field to update. + */ + private final int index; + + /** + * Create a new instance of this update function. + * + * @param index The index of the field to update (without offset). + */ + public UpdateIdFieldAndMarkTuple(int index) { + if (index < 0) { + throw new IllegalArgumentException("Index can not be negative."); + } + this.index = index + GroupingConstants.EDGE_RETENTION_OFFSET; + } + + @Override + public T join(T edgeTuple, Tuple2 mapping) { + if (!mapping.f0.equals(mapping.f1)) { + // Mark the tuple and update the field, if the mapping would actually change it. + GradoopId.NULL_VALUE.copyTo(edgeTuple.getField(GroupingConstants.EDGE_TUPLE_ID)); + edgeTuple.setField(mapping.f1, index); + } + return edgeTuple; + } +} diff --git a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/keys/CompositeKeyFunction.java b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/keys/CompositeKeyFunction.java index 3514548b6bda..8b2947814b56 100644 --- a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/keys/CompositeKeyFunction.java +++ b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/keys/CompositeKeyFunction.java @@ -34,7 +34,7 @@ public class CompositeKeyFunction implements KeyFunction { /** * A list of grouping key functions combined in this key function. */ - private final List> componentFunctions; + protected final List> componentFunctions; /** * Reduce object instantiations. @@ -81,4 +81,9 @@ public TypeInformation getType() { } return new TupleTypeInfo<>(types); } + + @Override + public boolean retainElement(T element) { + return componentFunctions.stream().allMatch(k -> k.retainElement(element)); + } } diff --git a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/keys/PropertyKeyFunction.java b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/keys/PropertyKeyFunction.java index dba3aa333309..19302e7b6cb4 100644 --- a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/keys/PropertyKeyFunction.java +++ b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/keys/PropertyKeyFunction.java @@ -66,4 +66,9 @@ public TypeInformation getType() { public byte[] getDefaultKey() { return PropertyValue.NULL_VALUE.getRawBytes(); } + + @Override + public boolean retainElement(T element) { + return !element.hasProperty(propertyKey); + } } diff --git a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/labelspecific/LabelSpecificKeyFunction.java b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/labelspecific/LabelSpecificKeyFunction.java index bd213d9777a5..5f104778ebfe 100644 --- a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/labelspecific/LabelSpecificKeyFunction.java +++ b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/labelspecific/LabelSpecificKeyFunction.java @@ -30,10 +30,20 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; /** * A grouping key function that extracts grouping keys only for specific labels. + * It wraps a map of labels and key functions and returns a tuple of keys as key. + *

+ * The first position of the tuple represents the label of the element (stored as index). + * Each other position corresponds to a label. + *

+ * The position corresponding to the element label contains a combined key for that label. + * Every other position contains default values. + * If the element label is not represented in the tuple, a key function representing + * the default label group is used. * * @param The type of the elements to group. */ @@ -49,7 +59,9 @@ public class LabelSpecificKeyFunction implements KeyFunction< public static final String DEFAULT_GROUP_LABEL = Grouping.DEFAULT_VERTEX_LABEL_GROUP; /** - * A map assigning an internally used number to each label. + * A map assigning an index to each label. + * It encodes the position of labels in {@code targetLabels} + * and of corresponding key functions in {@code keyFunctions}. */ private final Map labelToIndex; @@ -91,15 +103,18 @@ public LabelSpecificKeyFunction(Map Tuple.MAX_ARITY) { - throw new IllegalArgumentException("Too many labels. Tuple arity exceeded: " + (totalLabels + 1) + - " (max.: " + Tuple.MAX_ARITY + ")"); + throw new IllegalArgumentException( + String.format("Too many labels. Tuple arity exceeded: %d (max.: %d)", + totalLabels + 1, Tuple.MAX_ARITY)); } + int labelNr = 1; labelToIndex = new HashMap<>(); // The list needs to be filled initially, the set(int,Object) function will fail otherwise. keyFunctions = new ArrayList<>(Collections.nCopies(totalLabels, null)); targetLabels = new String[totalLabels]; - for (Map.Entry>> labelToKeys : labelsWithKeys.entrySet()) { + + for (Entry>> labelToKeys : labelsWithKeys.entrySet()) { final String key = labelToKeys.getKey(); final List> keysForLabel = labelToKeys.getValue(); if (key.equals(defaultGroupLabel)) { @@ -114,8 +129,9 @@ public LabelSpecificKeyFunction(Map(keysForLabel)); labelNr++; } + if (labelToSuperLabel != null) { - for (Map.Entry labelUpdateEntry : labelToSuperLabel.entrySet()) { + for (Entry labelUpdateEntry : labelToSuperLabel.entrySet()) { Integer index = labelToIndex.get(labelUpdateEntry.getKey()); if (index == null) { continue; @@ -128,13 +144,10 @@ public LabelSpecificKeyFunction(Map getType() { } return new TupleTypeInfo<>(types); } + + @Override + public boolean retainElement(T element) { + Integer index = labelToIndex.getOrDefault(element.getLabel(), 0); + return keyFunctions.get(index).retainElement(element); + } } diff --git a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/labelspecific/WithAllKeysRetained.java b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/labelspecific/WithAllKeysRetained.java new file mode 100644 index 000000000000..4eff31feb5e8 --- /dev/null +++ b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/labelspecific/WithAllKeysRetained.java @@ -0,0 +1,49 @@ +/* + * Copyright © 2014 - 2021 Leipzig University (Database Research Group) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.gradoop.flink.model.impl.operators.keyedgrouping.labelspecific; + +import org.gradoop.flink.model.api.functions.KeyFunction; +import org.gradoop.flink.model.impl.functions.filters.CombinableFilter; + +import java.util.List; +import java.util.Objects; + +/** + * A filter function selecting elements where all keys vote to retain it. + * + * @param The type of the elements to filter. + */ +public class WithAllKeysRetained implements CombinableFilter { + + /** + * The keys to check on each element. + */ + private final List> keys; + + /** + * Create a new instance of this filter function. + * + * @param keys The list of key functions to check. + */ + public WithAllKeysRetained(List> keys) { + this.keys = Objects.requireNonNull(keys); + } + + @Override + public boolean filter(E value) { + return keys.stream().allMatch(k -> k.retainElement(value)); + } +} diff --git a/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/grouping/VertexRetentionTestBase.java b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/grouping/VertexRetentionTestBase.java index ec950d0e46fd..4c6b762445e4 100644 --- a/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/grouping/VertexRetentionTestBase.java +++ b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/grouping/VertexRetentionTestBase.java @@ -116,6 +116,39 @@ public void testRetentionSingleProperty() throws Exception { output.equalsByElementData(loader.getLogicalGraphByVariable("expected"))); } + /** + * Tests correct retention of a vertex with a null property. + * + * @throws Exception if collecting result values fails + */ + @Test + public void testRetentionNullProperty() throws Exception { + String asciiInput = "input[" + + "(v0 {a: NULL})" + // group + "]"; + + FlinkAsciiGraphLoader loader = getLoaderFromString(asciiInput); + + loader.appendToDatabaseFromString( + "expected[" + + "(v00 {a: NULL, count: 1L})" + + "]"); + + final LogicalGraph input = loader.getLogicalGraphByVariable("input"); + + LogicalGraph output = new Grouping.GroupingBuilder() + .setStrategy(getStrategy()) + .retainVerticesWithoutGroup() + .useVertexLabel(true) + .addVertexGroupingKey("a") + .addVertexAggregateFunction(new Count()) + .build() + .execute(input); + + collectAndAssertTrue( + output.equalsByElementData(loader.getLogicalGraphByVariable("expected"))); + } + /** * Tests function {@link Grouping#groupInternal(BaseGraph)}. * Tests correct retention of a vertex with multiple properties. @@ -898,6 +931,7 @@ public void testLabelSpecificGroupingNoVerticesMatch() throws Exception { * - a vertex with a matching label, no properties: retain * - a vertex with a matching label, one matching property: retain * - two vertices with matching labels, two matching properties: group + * - two vertices with matching labels, two matching null properties: group * * @throws Exception if collecting result values fails */ @@ -913,6 +947,7 @@ public void testLabelSpecificGrouping() throws Exception { "(v6:A {})" + "(v7:A {a : 1})" + "(v8:A {a : 1, b : 2})" + + "(v9:A {a : NULL, b : NULL})" + "]"; FlinkAsciiGraphLoader loader = getLoaderFromString(asciiInput); @@ -928,6 +963,7 @@ public void testLabelSpecificGrouping() throws Exception { "(v06:A {})" + "(v07:A {a : 1})" + "(v08:A {a : 1, b : 2, count: 1L})" + + "(v09:A {a : NULL, b : NULL, count: 1L})" + "]"); final LogicalGraph input = loader.getLogicalGraphByVariable("input"); @@ -1049,4 +1085,30 @@ public void testLabelSpecificGroupingNoGlobalPropertyGrouping() throws Exception collectAndAssertTrue( output.equalsByElementData(loader.getLogicalGraphByVariable("expected"))); } + + /** + * Test if edges are properly updated when the source or target vertex was retained. + * + * @throws Exception when the execution in Flink fails. + */ + @Test + public void testEdgeUpdateWithRetainedSourceOrTarget() throws Exception { + FlinkAsciiGraphLoader loader = getLoaderFromString("input[" + + "(retained:Retained {otherprop: 1L})-[e:edge]->(notretained:NotRetained {prop: 1L, otherprop: 1L})" + + "-[:otherEdge {otherprop: 1L}]->(:NotRetained2 {prop:1L, otherprop: 2L})-[e2:edge2]->" + + "(retained2:RetainedTarget {otherprop: 2L})" + + "] expected [" + + "(retained)-->(resv {prop: 1L})-->(resv)-->(retained2)" + + "]"); + LogicalGraph input = loader.getLogicalGraphByVariable("input"); + LogicalGraph output = new Grouping.GroupingBuilder() + .setStrategy(getStrategy()) + .retainVerticesWithoutGroup() + .useVertexLabel(false) + .addVertexGroupingKey("prop") + .build() + .execute(input); + + collectAndAssertTrue(output.equalsByData(loader.getLogicalGraphByVariable("expected"))); + } } diff --git a/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/keyedgrouping/KeyedGroupingVertexRetentionTest.java b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/keyedgrouping/KeyedGroupingVertexRetentionTest.java new file mode 100644 index 000000000000..2e9bc0be807c --- /dev/null +++ b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/keyedgrouping/KeyedGroupingVertexRetentionTest.java @@ -0,0 +1,225 @@ +/* + * Copyright © 2014 - 2021 Leipzig University (Database Research Group) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.gradoop.flink.model.impl.operators.keyedgrouping; + +import org.gradoop.common.model.impl.pojo.EPGMEdge; +import org.gradoop.common.model.impl.pojo.EPGMGraphHead; +import org.gradoop.common.model.impl.pojo.EPGMVertex; +import org.gradoop.flink.model.impl.epgm.GraphCollection; +import org.gradoop.flink.model.impl.epgm.LogicalGraph; +import org.gradoop.flink.model.impl.operators.aggregation.functions.count.Count; +import org.gradoop.flink.model.impl.operators.grouping.Grouping; +import org.gradoop.flink.model.impl.operators.grouping.GroupingStrategy; +import org.gradoop.flink.model.impl.operators.grouping.VertexRetentionTestBase; +import org.gradoop.flink.util.FlinkAsciiGraphLoader; + +import java.util.Arrays; +import java.util.Collections; + +/** + * Tests if {@link KeyedGrouping#setRetainUngroupedVertices(boolean)} works as expected. + */ +public class KeyedGroupingVertexRetentionTest extends VertexRetentionTestBase { + + @Override + protected GroupingStrategy getStrategy() { + return GroupingStrategy.GROUP_WITH_KEYFUNCTIONS; + } + + /** + * This test is overwritten here, since it does not work in the same way for {@link KeyedGrouping}. + */ + @Override + public void testRetainVerticesFlag() { + } + + /** + * Test grouping on label and two properties.

+ * The expected result of this test is different, since vertices with one property or a label are grouped + * together according to that property or label. + * + * @throws Exception When the execution in Flink fails. + */ + @Override + public void testGroupByLabelAndProperties() throws Exception { + String asciiInput = "input[" + + "(v1 {a : 1})" + + "(v2 {a : 1, b : 2})" + + "(v4:B {a : 1})" + + "(v5:B {a : 1, b : 2})" + + "]"; + + FlinkAsciiGraphLoader loader = getLoaderFromString(asciiInput); + + loader.appendToDatabaseFromString( + "expected[" + + "(v01 {a : 1, b: NULL, count: 1L})" + + "(v02 {a : 1, b : 2, count : 1L})" + + "(v04:B {a : 1, b: NULL, count : 1L})" + + "(v05:B {a : 1, b : 2, count : 1L})" + + "]"); + + final LogicalGraph input = loader.getLogicalGraphByVariable("input"); + + LogicalGraph output = new Grouping.GroupingBuilder() + .setStrategy(getStrategy()) + .retainVerticesWithoutGroup() + .useVertexLabel(true) + .addVertexGroupingKeys(Arrays.asList("a", "b")) + .addVertexAggregateFunction(new Count()) + .build() + .execute(input); + + collectAndAssertTrue( + output.equalsByElementData(loader.getLogicalGraphByVariable("expected"))); + } + + /** + * Test grouping on labels and one property with retention enabled.

+ * The expected result is different, since the two vertices with label "B" and no property "a" are + * grouped together. + * + * @throws Exception when the execution in Flink fails. + */ + @Override + public void testGroupByLabelAndProperty() throws Exception { + String asciiInput = "input[" + + "(v0 {})" + + "(v1 {a : 1})" + + "(v2 {b : 2})" + + "(v3:B {})" + + "(v4:B {a : 1})" + + "(v5:B {b : 2})" + + "]"; + + FlinkAsciiGraphLoader loader = getLoaderFromString(asciiInput); + + loader.appendToDatabaseFromString( + "expected[" + + "(v00 {})" + + "(v01 {a : 1, count : 1L})" + + "(v02 {b : 2})" + + "(v03:B {a: NULL, count: 2L})" + + "(v04:B {a : 1, count : 1L})" + + "]"); + + final LogicalGraph input = loader.getLogicalGraphByVariable("input"); + + LogicalGraph output = new Grouping.GroupingBuilder() + .setStrategy(getStrategy()) + .retainVerticesWithoutGroup() + .useVertexLabel(true) + .addVertexGroupingKey("a") + .addVertexAggregateFunction(new Count()) + .build() + .execute(input); + + collectAndAssertTrue( + output.equalsByElementData(loader.getLogicalGraphByVariable("expected"))); + } + + /** + * Test grouping using two properties.

+ * The expected result is different here, since vertices with only one of two properties set are also + * grouped together. + * + * @throws Exception when the execution in Flink fails. + */ + @Override + public void testGroupByProperties() throws Exception { + String asciiInput = "input[" + + "(v1 {a : 1})" + + "(v2 {a : 1, b : 2})" + + "(v4:B {a : 1})" + + "(v5:B {a : 1, b : 2})" + + "]"; + + FlinkAsciiGraphLoader loader = getLoaderFromString(asciiInput); + + loader.appendToDatabaseFromString( + "expected[" + + "(v01 {a : 1, b: NULL, count : 2L})" + + "(v0205 {a : 1, b : 2, count : 2L})" + + "]"); + + final LogicalGraph input = loader.getLogicalGraphByVariable("input"); + + LogicalGraph output = new Grouping.GroupingBuilder() + .setStrategy(getStrategy()) + .retainVerticesWithoutGroup() + .useVertexLabel(false) + .addVertexGroupingKeys(Arrays.asList("a", "b")) + .addVertexAggregateFunction(new Count()) + .build() + .execute(input); + + collectAndAssertTrue( + output.equalsByElementData(loader.getLogicalGraphByVariable("expected"))); + } + + /** + * Test label specific grouping with vertex retention enabled. + * The expected result is different here (v7), since vertices with only one of two properties set + * are also grouped together. + * + * @throws Exception when the execution in Flink fails. + */ + @Override + public void testLabelSpecificGrouping() throws Exception { + String asciiInput = "input[" + + "(v0 {})" + + "(v1 {a : 1})" + + "(v2 {a : 1, b : 2})" + + "(v3:B {})" + + "(v4:B {a : 1})" + + "(v5:B {a : 1, b : 2})" + + "(v6:A {})" + + "(v7:A {a : 1})" + + "(v8:A {a : 1, b : 2})" + + "(v9:A {a : NULL, b : NULL})" + + "]"; + + FlinkAsciiGraphLoader loader = getLoaderFromString(asciiInput); + + loader.appendToDatabaseFromString( + "expected[" + + "(v00 {})" + + "(v01 {a : 1})" + + "(v02 {a : 1, b : 2})" + + "(v03:B {})" + + "(v04:B {a : 1})" + + "(v05:B {a : 1, b : 2})" + + "(v06:A)" + + "(v07:A {a : 1, b : NULL, count : 1L})" + + "(v08:A {a : 1, b : 2, count: 1L})" + + "(v09:A {a : NULL, b : NULL, count: 1L})" + + "]"); + + final LogicalGraph input = loader.getLogicalGraphByVariable("input"); + + LogicalGraph output = new Grouping.GroupingBuilder() + .setStrategy(getStrategy()) + .retainVerticesWithoutGroup() + .useVertexLabel(false) + .addVertexLabelGroup("A", "A", Arrays.asList("a", "b"), + Collections.singletonList(new Count())) + .build() + .execute(input); + + collectAndAssertTrue( + output.equalsByElementData(loader.getLogicalGraphByVariable("expected"))); + } +} diff --git a/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/BuildTuplesFromEdgesWithIdTest.java b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/BuildTuplesFromEdgesWithIdTest.java new file mode 100644 index 000000000000..68f611d78ddb --- /dev/null +++ b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/BuildTuplesFromEdgesWithIdTest.java @@ -0,0 +1,60 @@ +/* + * Copyright © 2014 - 2021 Leipzig University (Database Research Group) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.gradoop.flink.model.impl.operators.keyedgrouping.functions; + +import org.apache.flink.api.java.tuple.Tuple; +import org.gradoop.common.model.api.entities.Edge; +import org.gradoop.common.model.impl.id.GradoopId; +import org.gradoop.common.model.impl.properties.PropertyValue; +import org.gradoop.flink.model.GradoopFlinkTestBase; +import org.gradoop.flink.model.impl.operators.aggregation.functions.max.MaxProperty; +import org.gradoop.flink.model.impl.operators.keyedgrouping.keys.LabelKeyFunction; +import org.junit.Test; + +import static java.util.Collections.singletonList; +import static org.junit.Assert.assertEquals; + +/** + * Test for the {@link BuildTuplesFromEdgesWithId} function. + */ +public class BuildTuplesFromEdgesWithIdTest extends GradoopFlinkTestBase { + + /** + * Test the {@link BuildTuplesFromEdgesWithId#map(Edge)} functionality. + * + * @throws Exception when the function throws an exception + */ + @Test + public void testMap() throws Exception { + GradoopId source = GradoopId.get(); + GradoopId target = GradoopId.get(); + Edge testEdge = getConfig().getLogicalGraphFactory().getEdgeFactory().createEdge(source, target); + String testLabel = "a"; + String testAggKey = "key"; + PropertyValue testAggValue = PropertyValue.create(17L); + testEdge.setLabel(testLabel); + testEdge.setProperty(testAggKey, testAggValue); + BuildTuplesFromEdgesWithId function = new BuildTuplesFromEdgesWithId<>( + singletonList(new LabelKeyFunction<>()), singletonList(new MaxProperty(testAggKey))); + Tuple result = function.map(testEdge); + assertEquals("Invalid result tuple size", 5, result.getArity()); + assertEquals(testEdge.getId(), result.getField(0)); + assertEquals(source, result.getField(1)); + assertEquals(target, result.getField(2)); + assertEquals(testLabel, result.getField(3)); + assertEquals(testAggValue, result.getField(4)); + } +} diff --git a/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/PickRetainedEdgeIDsTest.java b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/PickRetainedEdgeIDsTest.java new file mode 100644 index 000000000000..28ba3fdf08e0 --- /dev/null +++ b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/PickRetainedEdgeIDsTest.java @@ -0,0 +1,51 @@ +/* + * Copyright © 2014 - 2021 Leipzig University (Database Research Group) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.gradoop.flink.model.impl.operators.keyedgrouping.functions; + +import org.apache.flink.api.common.functions.util.ListCollector; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.util.Collector; +import org.gradoop.common.model.impl.id.GradoopId; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertArrayEquals; + +/** + * Test for the {@link PickRetainedEdgeIDs} function. + */ +public class PickRetainedEdgeIDsTest { + + /** + * Test the {@link PickRetainedEdgeIDs#flatMap(Tuple, Collector)} functionality. + */ + @Test + public void testFlatMap() { + PickRetainedEdgeIDs> toTest = new PickRetainedEdgeIDs<>(); + List result = new ArrayList<>(); + Collector collector = new ListCollector(result); + GradoopId someId = GradoopId.get(); + GradoopId otherId = GradoopId.get(); + toTest.flatMap(Tuple1.of(someId), collector); + toTest.flatMap(Tuple1.of(GradoopId.NULL_VALUE), collector); + toTest.flatMap(Tuple1.of(otherId), collector); + collector.close(); + assertArrayEquals(new GradoopId[] {someId, otherId}, result.toArray()); + } +} diff --git a/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/UpdateIdFieldAndMarkTupleTest.java b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/UpdateIdFieldAndMarkTupleTest.java new file mode 100644 index 000000000000..65825df06b03 --- /dev/null +++ b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/UpdateIdFieldAndMarkTupleTest.java @@ -0,0 +1,61 @@ +/* + * Copyright © 2014 - 2021 Leipzig University (Database Research Group) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.gradoop.flink.model.impl.operators.keyedgrouping.functions; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.gradoop.common.model.impl.id.GradoopId; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +/** + * Test for the {@link UpdateIdFieldAndMarkTuple} function. + */ +public class UpdateIdFieldAndMarkTupleTest { + + /** + * Test the join function in cases where the update should be performed. + */ + @Test + public void testWithUpdate() { + GradoopId id = GradoopId.get(); + GradoopId foreignKey = GradoopId.get(); + GradoopId newForeignKey = GradoopId.get(); + Tuple2 input = Tuple2.of(id, foreignKey); + Tuple2 mapping = Tuple2.of(foreignKey, newForeignKey); + assertNotEquals(GradoopId.NULL_VALUE, input.f0); + Tuple2 result = + new UpdateIdFieldAndMarkTuple>(0).join(input, mapping); + assertEquals(GradoopId.NULL_VALUE, result.f0); + assertEquals(newForeignKey, result.f1); + } + + /** + * Test the join function in cases where the update should not be performed. + */ + @Test + public void testWithNoUpdate() { + GradoopId id = GradoopId.get(); + GradoopId foreignKey = GradoopId.get(); + Tuple2 input = Tuple2.of(id, foreignKey); + Tuple2 mapping = Tuple2.of(foreignKey, foreignKey); + Tuple2 result = new UpdateIdFieldAndMarkTuple>(0) + .join(input, mapping); + assertEquals(id, result.f0); + assertEquals(foreignKey, result.f1); + } +} diff --git a/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/keyedgrouping/keys/CompositeKeyFunctionWithDefaultValuesTest.java b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/keyedgrouping/keys/CompositeKeyFunctionWithDefaultValuesTest.java new file mode 100644 index 000000000000..90ebac2d815c --- /dev/null +++ b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/keyedgrouping/keys/CompositeKeyFunctionWithDefaultValuesTest.java @@ -0,0 +1,104 @@ +/* + * Copyright © 2014 - 2021 Leipzig University (Database Research Group) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.gradoop.flink.model.impl.operators.keyedgrouping.keys; + +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple2; +import org.gradoop.common.GradoopTestUtils; +import org.gradoop.common.model.api.entities.Element; +import org.gradoop.common.model.api.entities.VertexFactory; +import org.gradoop.common.model.impl.pojo.EPGMVertex; +import org.gradoop.common.model.impl.properties.PropertyValue; +import org.gradoop.flink.model.api.functions.KeyFunction; +import org.gradoop.flink.model.api.functions.KeyFunctionWithDefaultValue; +import org.junit.Test; + +import java.util.Arrays; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +/** + * Test for the {@link CompositeKeyFunctionWithDefaultValues}. + */ +public class CompositeKeyFunctionWithDefaultValuesTest extends KeyFunctionTestBase { + + /** + * A property key for one component function of the test function. + */ + private final String propertyKey = "key"; + + @Override + public void setUp() { + checkForKeyEquality = false; + } + + @Override + protected KeyFunction getInstance() { + return new CompositeKeyFunctionWithDefaultValues<>( + Arrays.asList(new PropertyKeyFunction<>(propertyKey), new LabelKeyFunction<>())); + } + + /** + * Test if the default key has the correct value.

+ */ + @Test + public void checkDefaultKeyValue() { + final Tuple defaultKey = ((KeyFunctionWithDefaultValue) getInstance()).getDefaultKey(); + assertEquals(2, defaultKey.getArity()); + assertArrayEquals(PropertyValue.NULL_VALUE.getRawBytes(), defaultKey.getField(0)); + assertEquals("", defaultKey.getField(1)); + } + + @Override + public void testGetKey() { + final KeyFunction keyFunction = getInstance(); + final VertexFactory vertexFactory = getConfig().getLogicalGraphFactory().getVertexFactory(); + EPGMVertex testVertex = vertexFactory.createVertex(); + Tuple key = keyFunction.getKey(testVertex); + assertEquals(2, key.getArity()); + assertArrayEquals(PropertyValue.NULL_VALUE.getRawBytes(), key.getField(0)); + assertEquals("", key.getField(1)); + final String testLabel = "testLabel"; + testVertex.setLabel(testLabel); + key = keyFunction.getKey(testVertex); + assertEquals(2, key.getArity()); + assertArrayEquals(PropertyValue.NULL_VALUE.getRawBytes(), key.getField(0)); + assertEquals(testLabel, key.getField(1)); + final PropertyValue testValue = PropertyValue.create(GradoopTestUtils.DOUBLE_VAL_5); + testVertex.setProperty(propertyKey, testValue); + key = keyFunction.getKey(testVertex); + assertEquals(2, key.getArity()); + assertArrayEquals(testValue.getRawBytes(), key.getField(0)); + assertEquals(testLabel, key.getField(1)); + } + + /** + * Test for the {@link CompositeKeyFunctionWithDefaultValues#addKeyToElement(Object, Object)} function. + */ + @Test + public void testAddKeyToElement() { + final EPGMVertex vertex = getConfig().getLogicalGraphFactory().getVertexFactory().createVertex(); + final PropertyValue testValue = PropertyValue.create(GradoopTestUtils.BIG_DECIMAL_VAL_7); + final String testLabel = "testLabel"; + assertEquals("", vertex.getLabel()); + assertFalse(vertex.hasProperty(propertyKey)); + getInstance().addKeyToElement(vertex, Tuple2.of(testValue.getRawBytes(), testLabel)); + assertEquals(testValue, vertex.getPropertyValue(propertyKey)); + assertEquals(testLabel, vertex.getLabel()); + } +} diff --git a/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/keyedgrouping/keys/ConstantKeyFunctionTest.java b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/keyedgrouping/keys/ConstantKeyFunctionTest.java new file mode 100644 index 000000000000..d84659bc2a6f --- /dev/null +++ b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/keyedgrouping/keys/ConstantKeyFunctionTest.java @@ -0,0 +1,40 @@ +/* + * Copyright © 2014 - 2021 Leipzig University (Database Research Group) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.gradoop.flink.model.impl.operators.keyedgrouping.keys; + +import org.gradoop.common.model.api.entities.Element; +import org.gradoop.flink.model.api.functions.KeyFunction; + +import java.util.HashMap; +import java.util.Map; + +/** + * Test for the {@link ConstantKeyFunction}. + */ +public class ConstantKeyFunctionTest extends KeyFunctionTestBase { + + @Override + protected KeyFunction getInstance() { + return new ConstantKeyFunction<>(); + } + + @Override + protected Map getTestElements() { + Map testCases = new HashMap<>(); + testCases.put(getConfig().getLogicalGraphFactory().getVertexFactory().createVertex(), Boolean.TRUE); + return testCases; + } +} diff --git a/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/keyedgrouping/keys/KeyFunctionTestBase.java b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/keyedgrouping/keys/KeyFunctionTestBase.java new file mode 100644 index 000000000000..5814dc8ad12e --- /dev/null +++ b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/keyedgrouping/keys/KeyFunctionTestBase.java @@ -0,0 +1,133 @@ +/* + * Copyright © 2014 - 2021 Leipzig University (Database Research Group) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.gradoop.flink.model.impl.operators.keyedgrouping.keys; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.gradoop.flink.model.GradoopFlinkTestBase; +import org.gradoop.flink.model.api.functions.KeyFunction; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; + +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * A base class for key-function tests. This provides some common tests that should pass by all keys to be + * used for grouping. + * + * @param The element type. + * @param The key type. + */ +public abstract class KeyFunctionTestBase extends GradoopFlinkTestBase { + + /** + * Flag indicating whether a serialized and deserialized key should be checked for equality. + * Setting this to false might be useful when testing composite keys or keys that use array types. + */ + protected boolean checkForKeyEquality; + + /** + * Get an instance of the key function to test. + * + * @return The key function. + */ + protected abstract KeyFunction getInstance(); + + /** + * Get a {@link Map} of test cases for the {@link KeyFunction#getKey(Object)} method. + * Keys of this map will be elements and values keys extracted from those elements.

+ * The default implementation of this method provides no test elements. + * + * @return A map of test cases. + */ + protected Map getTestElements() { + return Collections.emptyMap(); + } + + /** + * Setup this test. + */ + @Before + public void setUp() { + checkForKeyEquality = true; + } + + /** + * Check if the {@link KeyFunction#getType()} function returns a non-{@code null} value that is a valid + * key type. + */ + @Test + public void checkTypeInfo() { + TypeInformation type = getInstance().getType(); + assertNotNull("Type information provided by the key fuction was null.", type); + assertTrue("Type is not a valid key type.", type.isKeyType()); + assertNotEquals("Key type has no fields.", 0, type.getTotalFields()); + } + + /** + * Check if a key is of a certain type and if it is serializable as that type. + * + * @param type The type. + * @param key The key to check. + * @throws IOException when serialization of the key fails. + */ + protected void checkKeyType(TypeInformation type, K key) throws IOException { + assertTrue(type.getTypeClass().isInstance(key)); + // Check serializability + ByteArrayOutputStream byteOutput = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper output = new DataOutputViewStreamWrapper(byteOutput); + TypeSerializer serializer = type.createSerializer(getExecutionEnvironment().getConfig()); + serializer.serialize(key, output); + output.close(); + byte[] serializedData = byteOutput.toByteArray(); + DataInputViewStreamWrapper input = new DataInputViewStreamWrapper( + new ByteArrayInputStream(serializedData)); + K deserializedKey = serializer.deserialize(input); + if (checkForKeyEquality) { + assertTrue(Objects.deepEquals(key, deserializedKey)); + } + input.close(); + } + + /** + * Test the {@link KeyFunction#getKey(Object)} function using test cases provided by + * {@link #getTestElements()}. + * + * @throws IOException when serialization of a key fails + */ + @Test + public void testGetKey() throws IOException { + final KeyFunction function = getInstance(); + TypeInformation type = function.getType(); + for (Map.Entry testCase : getTestElements().entrySet()) { + final K actual = function.getKey(testCase.getKey()); + final K expected = testCase.getValue(); + checkKeyType(type, actual); + assertTrue(Objects.deepEquals(expected, actual)); + } + } +} diff --git a/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/keyedgrouping/keys/LabelKeyFunctionTest.java b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/keyedgrouping/keys/LabelKeyFunctionTest.java new file mode 100644 index 000000000000..f790b981c349 --- /dev/null +++ b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/keyedgrouping/keys/LabelKeyFunctionTest.java @@ -0,0 +1,62 @@ +/* + * Copyright © 2014 - 2021 Leipzig University (Database Research Group) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.gradoop.flink.model.impl.operators.keyedgrouping.keys; + +import org.gradoop.common.model.api.entities.Element; +import org.gradoop.common.model.api.entities.Labeled; +import org.gradoop.common.model.api.entities.VertexFactory; +import org.gradoop.common.model.impl.pojo.EPGMVertex; +import org.gradoop.flink.model.api.functions.KeyFunction; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +/** + * Test for the {@link LabelKeyFunction}. + */ +public class LabelKeyFunctionTest extends KeyFunctionTestBase { + + @Override + protected KeyFunction getInstance() { + return new LabelKeyFunction<>(); + } + + @Override + protected Map getTestElements() { + final VertexFactory vertexFactory = getConfig().getLogicalGraphFactory().getVertexFactory(); + Map testCases = new HashMap<>(); + testCases.put(vertexFactory.createVertex(), ""); + final String testLabel = "test"; + testCases.put(vertexFactory.createVertex(testLabel), testLabel); + return testCases; + } + + /** + * Test for the {@link LabelKeyFunction#addKeyToElement(Labeled, Object)} function. + */ + @Test + public void testAddKeyToElement() { + final EPGMVertex testVertex = getConfig().getLogicalGraphFactory().getVertexFactory().createVertex(); + final String testLabel = "testLabel"; + assertNotEquals(testLabel, testVertex.getLabel()); + getInstance().addKeyToElement(testVertex, testLabel); + assertEquals(testLabel, testVertex.getLabel()); + } +} diff --git a/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/keyedgrouping/keys/LabelSpecificKeyFunctionTest.java b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/keyedgrouping/keys/LabelSpecificKeyFunctionTest.java index dc8cd293a040..8e12f6ce8f08 100644 --- a/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/keyedgrouping/keys/LabelSpecificKeyFunctionTest.java +++ b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/keyedgrouping/keys/LabelSpecificKeyFunctionTest.java @@ -22,6 +22,8 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.gradoop.common.GradoopTestUtils; +import org.gradoop.common.model.api.entities.Element; import org.gradoop.common.model.impl.pojo.EPGMVertex; import org.gradoop.common.model.impl.properties.PropertyValue; import org.gradoop.flink.model.GradoopFlinkTestBase; @@ -44,6 +46,7 @@ import static org.gradoop.flink.model.impl.operators.keyedgrouping.GroupingKeys.property; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -237,6 +240,27 @@ public void testAddKeyToElement() { assertEquals(valueForDefault, testVertex.getPropertyValue("forDefault")); } + /** + * Test if the {@link LabelSpecificKeyFunction#retainElement(Element)} method works as expected. + */ + @Test + public void testElementRetention() { + // The default values are set initially, check them for each label + final EPGMVertex vertex = getConfig().getLogicalGraphFactory().getVertexFactory().createVertex(); + for (String label : Arrays.asList("a", "b", "c", "")) { + vertex.setLabel(label); + assertTrue("Default key check fail for label: " + label, + testFunction.retainElement(vertex)); + } + // Changing the values on the vertex should only affect the result when the key function corresponding + // to the value is used. We check this by setting the property for label "c" to any value. + vertex.setProperty("forC", PropertyValue.create(GradoopTestUtils.BIG_DECIMAL_VAL_7)); + vertex.setLabel("a"); + assertTrue(testFunction.retainElement(vertex)); + vertex.setLabel("c"); + assertFalse(testFunction.retainElement(vertex)); + } + /** * Check if two tuples are equal. This is necessary since {@code byte[]} does not have a valid {@code * equals} implementation. diff --git a/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/keyedgrouping/keys/PropertyKeyFunctionTest.java b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/keyedgrouping/keys/PropertyKeyFunctionTest.java new file mode 100644 index 000000000000..67a7bc5b1b94 --- /dev/null +++ b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/keyedgrouping/keys/PropertyKeyFunctionTest.java @@ -0,0 +1,70 @@ +/* + * Copyright © 2014 - 2021 Leipzig University (Database Research Group) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.gradoop.flink.model.impl.operators.keyedgrouping.keys; + +import org.gradoop.common.GradoopTestUtils; +import org.gradoop.common.model.api.entities.Attributed; +import org.gradoop.common.model.api.entities.Element; +import org.gradoop.common.model.api.entities.VertexFactory; +import org.gradoop.common.model.impl.pojo.EPGMVertex; +import org.gradoop.common.model.impl.properties.PropertyValue; +import org.gradoop.flink.model.api.functions.KeyFunction; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +/** + * Test for the {@link PropertyKeyFunction}. + */ +public class PropertyKeyFunctionTest extends KeyFunctionTestBase { + + /** + * The key of the properties to extract. + */ + private final String key = "key"; + + @Override + protected KeyFunction getInstance() { + return new PropertyKeyFunction<>(key); + } + + @Override + protected Map getTestElements() { + final VertexFactory vertexFactory = getConfig().getLogicalGraphFactory().getVertexFactory(); + Map testCases = new HashMap<>(); + testCases.put(vertexFactory.createVertex(), PropertyValue.NULL_VALUE.getRawBytes()); + EPGMVertex testVertex = vertexFactory.createVertex(); + testVertex.setProperty(key, GradoopTestUtils.LONG_VAL_3); + testCases.put(testVertex, PropertyValue.create(GradoopTestUtils.LONG_VAL_3).getRawBytes()); + return testCases; + } + + /** + * Test for the {@link PropertyKeyFunction#addKeyToElement(Attributed, Object)} function. + */ + @Test + public void testAddKeyToElement() { + final EPGMVertex testVertex = getConfig().getLogicalGraphFactory().getVertexFactory().createVertex(); + final PropertyValue testValue = PropertyValue.create(GradoopTestUtils.BIG_DECIMAL_VAL_7); + assertFalse(testVertex.hasProperty(key)); + getInstance().addKeyToElement(testVertex, testValue.getRawBytes()); + assertEquals(testValue, testVertex.getPropertyValue(key)); + } +} diff --git a/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/keyedgrouping/labelspecific/WithAllKeysRetainedTest.java b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/keyedgrouping/labelspecific/WithAllKeysRetainedTest.java new file mode 100644 index 000000000000..7daf32705e5b --- /dev/null +++ b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/keyedgrouping/labelspecific/WithAllKeysRetainedTest.java @@ -0,0 +1,63 @@ +/* + * Copyright © 2014 - 2021 Leipzig University (Database Research Group) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.gradoop.flink.model.impl.operators.keyedgrouping.labelspecific; + +import org.gradoop.common.model.api.entities.Element; +import org.gradoop.flink.model.GradoopFlinkTestBase; +import org.gradoop.flink.model.impl.operators.keyedgrouping.keys.LabelKeyFunction; +import org.gradoop.flink.model.impl.operators.keyedgrouping.keys.PropertyKeyFunction; +import org.junit.Test; + +import java.util.Arrays; + +import static org.gradoop.common.GradoopTestUtils.BIG_DECIMAL_VAL_7; +import static org.gradoop.common.GradoopTestUtils.KEY_0; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Test for the {@link WithAllKeysRetained} filter function. + */ +public class WithAllKeysRetainedTest extends GradoopFlinkTestBase { + + /** + * A key function that can be used with the filter function. + */ + private final LabelKeyFunction validKeyFunction = new LabelKeyFunction<>(); + + /** + * Another key function that can be used with the filter function. + */ + private final PropertyKeyFunction validKeyFunction2 = new PropertyKeyFunction<>(KEY_0); + + /** + * Test if the filter works as expected. + */ + @Test + public void testFilter() { + final WithAllKeysRetained filter = new WithAllKeysRetained<>(Arrays.asList( + validKeyFunction, validKeyFunction2)); + final Element vertex = getConfig().getLogicalGraphFactory().getVertexFactory().createVertex(); + assertTrue(filter.filter(vertex)); + vertex.setLabel("a"); + assertFalse(filter.filter(vertex)); + vertex.setLabel(""); + vertex.setProperty(KEY_0, BIG_DECIMAL_VAL_7); + assertFalse(filter.filter(vertex)); + vertex.setLabel("a"); + assertFalse(filter.filter(vertex)); + } +}