Skip to content

Commit

Permalink
ae skew join (apache#117)
Browse files Browse the repository at this point in the history
* ae skew join

* add metircs querystage

* No support that a union node with differing number of partitions if we explicitly repartition them.

* fix style

* release r40
  • Loading branch information
7mming7 authored Jun 29, 2020
1 parent 5770a68 commit 528f909
Show file tree
Hide file tree
Showing 109 changed files with 3,832 additions and 807 deletions.
2 changes: 1 addition & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.4.1-kylin-r35</version>
<version>2.4.1-kylin-r40</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/kvstore/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.4.1-kylin-r35</version>
<version>2.4.1-kylin-r40</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/network-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.4.1-kylin-r35</version>
<version>2.4.1-kylin-r40</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/network-shuffle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.4.1-kylin-r35</version>
<version>2.4.1-kylin-r40</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,48 +197,60 @@ public Map<String, Metric> getMetrics() {
}
}

private boolean isShuffleBlock(String[] blockIdParts) {
// length == 4: ShuffleBlockId
// length == 5: ContinuousShuffleBlockId
return (blockIdParts.length == 4 || blockIdParts.length == 5) &&
blockIdParts[0].equals("shuffle");
}

private class ManagedBufferIterator implements Iterator<ManagedBuffer> {

private int index = 0;
private final String appId;
private final String execId;
private final int shuffleId;
// An array containing mapId and reduceId pairs.
private final int[] mapIdAndReduceIds;
// An array containing mapId, reduceId and numBlocks tuple
private final int[] shuffleBlockIds;

ManagedBufferIterator(String appId, String execId, String[] blockIds) {
this.appId = appId;
this.execId = execId;
String[] blockId0Parts = blockIds[0].split("_");
if (blockId0Parts.length != 4 || !blockId0Parts[0].equals("shuffle")) {
if (!isShuffleBlock(blockId0Parts)) {
throw new IllegalArgumentException("Unexpected shuffle block id format: " + blockIds[0]);
}
this.shuffleId = Integer.parseInt(blockId0Parts[1]);
mapIdAndReduceIds = new int[2 * blockIds.length];
shuffleBlockIds = new int[3 * blockIds.length];
for (int i = 0; i < blockIds.length; i++) {
String[] blockIdParts = blockIds[i].split("_");
if (blockIdParts.length != 4 || !blockIdParts[0].equals("shuffle")) {
if (!isShuffleBlock(blockIdParts)) {
throw new IllegalArgumentException("Unexpected shuffle block id format: " + blockIds[i]);
}
if (Integer.parseInt(blockIdParts[1]) != shuffleId) {
throw new IllegalArgumentException("Expected shuffleId=" + shuffleId +
", got:" + blockIds[i]);
", got:" + blockIds[i]);
}
shuffleBlockIds[3 * i] = Integer.parseInt(blockIdParts[2]);
shuffleBlockIds[3 * i + 1] = Integer.parseInt(blockIdParts[3]);
if (blockIdParts.length == 4) {
shuffleBlockIds[3 * i + 2] = 1;
} else {
shuffleBlockIds[3 * i + 2] = Integer.parseInt(blockIdParts[4]);
}
mapIdAndReduceIds[2 * i] = Integer.parseInt(blockIdParts[2]);
mapIdAndReduceIds[2 * i + 1] = Integer.parseInt(blockIdParts[3]);
}
}

@Override
public boolean hasNext() {
return index < mapIdAndReduceIds.length;
return index < shuffleBlockIds.length;
}

@Override
public ManagedBuffer next() {
final ManagedBuffer block = blockManager.getBlockData(appId, execId, shuffleId,
mapIdAndReduceIds[index], mapIdAndReduceIds[index + 1]);
index += 2;
shuffleBlockIds[index], shuffleBlockIds[index + 1], shuffleBlockIds[index + 2]);
index += 3;
metrics.blockTransferRateBytes.mark(block != null ? block.size() : 0);
return block;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,21 +162,22 @@ public void registerExecutor(
}

/**
* Obtains a FileSegmentManagedBuffer from (shuffleId, mapId, reduceId). We make assumptions
* Obtains a FileSegmentManagedBuffer from (shuffleId, mapId, reduceId, numBlocks). We make assumptions
* about how the hash and sort based shuffles store their data.
*/
public ManagedBuffer getBlockData(
String appId,
String execId,
int shuffleId,
int mapId,
int reduceId) {
String appId,
String execId,
int shuffleId,
int mapId,
int reduceId,
int numBlocks) {
ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
if (executor == null) {
throw new RuntimeException(
String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
}
return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId);
return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId, numBlocks);
}

/**
Expand Down Expand Up @@ -280,19 +281,19 @@ public boolean accept(File dir, String name) {
* and the block id format is from ShuffleDataBlockId and ShuffleIndexBlockId.
*/
private ManagedBuffer getSortBasedShuffleBlockData(
ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId) {
ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId, int numBlocks) {
File indexFile = getFile(executor.localDirs, executor.subDirsPerLocalDir,
"shuffle_" + shuffleId + "_" + mapId + "_0.index");
"shuffle_" + shuffleId + "_" + mapId + "_0.index");

try {
ShuffleIndexInformation shuffleIndexInformation = shuffleIndexCache.get(indexFile);
ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(reduceId);
ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(reduceId, numBlocks);
return new FileSegmentManagedBuffer(
conf,
getFile(executor.localDirs, executor.subDirsPerLocalDir,
"shuffle_" + shuffleId + "_" + mapId + "_0.data"),
shuffleIndexRecord.getOffset(),
shuffleIndexRecord.getLength());
conf,
getFile(executor.localDirs, executor.subDirsPerLocalDir,
"shuffle_" + shuffleId + "_" + mapId + "_0.data"),
shuffleIndexRecord.getOffset(),
shuffleIndexRecord.getLength());
} catch (ExecutionException e) {
throw new RuntimeException("Failed to open file: " + indexFile, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ public int getSize() {
/**
* Get index offset for a particular reducer.
*/
public ShuffleIndexRecord getIndex(int reduceId) {
public ShuffleIndexRecord getIndex(int reduceId, int numBlocks) {
long offset = offsets.get(reduceId);
long nextOffset = offsets.get(reduceId + 1);
long nextOffset = offsets.get(reduceId + numBlocks);
return new ShuffleIndexRecord(offset, nextOffset - offset);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ public void testOpenShuffleBlocks() {

ManagedBuffer block0Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[3]));
ManagedBuffer block1Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
when(blockResolver.getBlockData("app0", "exec1", 0, 0, 0)).thenReturn(block0Marker);
when(blockResolver.getBlockData("app0", "exec1", 0, 0, 1)).thenReturn(block1Marker);
when(blockResolver.getBlockData("app0", "exec1", 0, 0, 0, 1)).thenReturn(block0Marker);
when(blockResolver.getBlockData("app0", "exec1", 0, 0, 1, 1)).thenReturn(block1Marker);
ByteBuffer openBlocks = new OpenBlocks("app0", "exec1",
new String[] { "shuffle_0_0_0", "shuffle_0_0_1" })
.toByteBuffer();
Expand All @@ -106,8 +106,8 @@ public void testOpenShuffleBlocks() {
assertEquals(block0Marker, buffers.next());
assertEquals(block1Marker, buffers.next());
assertFalse(buffers.hasNext());
verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 0);
verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 1);
verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 0, 1);
verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 1, 1);

// Verify open block request latency metrics
Timer openBlockRequestLatencyMillis = (Timer) ((ExternalShuffleBlockHandler) handler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void testBadRequests() throws IOException {
ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null);
// Unregistered executor
try {
resolver.getBlockData("app0", "exec1", 1, 1, 0);
resolver.getBlockData("app0", "exec1", 1, 1, 0, 1);
fail("Should have failed");
} catch (RuntimeException e) {
assertTrue("Bad error message: " + e, e.getMessage().contains("not registered"));
Expand All @@ -75,7 +75,7 @@ public void testBadRequests() throws IOException {
// Invalid shuffle manager
try {
resolver.registerExecutor("app0", "exec2", dataContext.createExecutorInfo("foobar"));
resolver.getBlockData("app0", "exec2", 1, 1, 0);
resolver.getBlockData("app0", "exec2", 1, 1, 0, 1);
fail("Should have failed");
} catch (UnsupportedOperationException e) {
// pass
Expand All @@ -85,7 +85,7 @@ public void testBadRequests() throws IOException {
resolver.registerExecutor("app0", "exec3",
dataContext.createExecutorInfo(SORT_MANAGER));
try {
resolver.getBlockData("app0", "exec3", 1, 1, 0);
resolver.getBlockData("app0", "exec3", 1, 1, 0, 1);
fail("Should have failed");
} catch (Exception e) {
// pass
Expand All @@ -99,18 +99,25 @@ public void testSortShuffleBlocks() throws IOException {
dataContext.createExecutorInfo(SORT_MANAGER));

InputStream block0Stream =
resolver.getBlockData("app0", "exec0", 0, 0, 0).createInputStream();
resolver.getBlockData("app0", "exec0", 0, 0, 0, 1).createInputStream();
String block0 = CharStreams.toString(
new InputStreamReader(block0Stream, StandardCharsets.UTF_8));
block0Stream.close();
assertEquals(sortBlock0, block0);

InputStream block1Stream =
resolver.getBlockData("app0", "exec0", 0, 0, 1).createInputStream();
resolver.getBlockData("app0", "exec0", 0, 0, 1, 1).createInputStream();
String block1 = CharStreams.toString(
new InputStreamReader(block1Stream, StandardCharsets.UTF_8));
block1Stream.close();
assertEquals(sortBlock1, block1);

InputStream block01Stream =
resolver.getBlockData("app0", "exec0", 0, 0, 0, 2).createInputStream();
String block01 = CharStreams.toString(
new InputStreamReader(block01Stream, StandardCharsets.UTF_8));
block01Stream.close();
assertEquals(sortBlock0 + sortBlock1, block01);
}

@Test
Expand Down
2 changes: 1 addition & 1 deletion common/network-yarn/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.4.1-kylin-r35</version>
<version>2.4.1-kylin-r40</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/sketch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.4.1-kylin-r35</version>
<version>2.4.1-kylin-r40</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/tags/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.4.1-kylin-r35</version>
<version>2.4.1-kylin-r40</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/unsafe/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.4.1-kylin-r35</version>
<version>2.4.1-kylin-r40</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.4.1-kylin-r35</version>
<version>2.4.1-kylin-r40</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
if (!records.hasNext()) {
partitionLengths = new long[numPartitions];
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, new long[numPartitions]);
return;
}
final SerializerInstance serInstance = serializer.newInstance();
Expand Down Expand Up @@ -159,15 +159,18 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {

File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
File tmp = Utils.tempFileWith(output);
MapInfo mapInfo;
try {
partitionLengths = writePartitionedFile(tmp);
mapInfo = writePartitionedFile(tmp);
partitionLengths = mapInfo.lengths;
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
} finally {
if (tmp.exists() && !tmp.delete()) {
logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
}
}
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(), mapInfo.lengths, mapInfo.records);
}

@VisibleForTesting
Expand All @@ -180,12 +183,13 @@ long[] getPartitionLengths() {
*
* @return array of lengths, in bytes, of each partition of the file (used by map output tracker).
*/
private long[] writePartitionedFile(File outputFile) throws IOException {
private MapInfo writePartitionedFile(File outputFile) throws IOException {
// Track location of the partition starts in the output file
final long[] lengths = new long[numPartitions];
final long[] records = new long[numPartitions];
if (partitionWriters == null) {
// We were passed an empty iterator
return lengths;
return new MapInfo(lengths, records);
}

final FileOutputStream out = new FileOutputStream(outputFile, true);
Expand All @@ -194,6 +198,7 @@ private long[] writePartitionedFile(File outputFile) throws IOException {
try {
for (int i = 0; i < numPartitions; i++) {
final File file = partitionWriterSegments[i].file();
records[i] = partitionWriterSegments[i].record();
if (file.exists()) {
final FileInputStream in = new FileInputStream(file);
boolean copyThrewException = true;
Expand All @@ -214,7 +219,7 @@ private long[] writePartitionedFile(File outputFile) throws IOException {
writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
}
partitionWriters = null;
return lengths;
return new MapInfo(lengths, records);
}

@Override
Expand Down
28 changes: 28 additions & 0 deletions core/src/main/java/org/apache/spark/shuffle/sort/MapInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.sort;

public final class MapInfo {
final long[] lengths;
final long[] records;

public MapInfo(long[] lengths, long[] records) {
this.lengths = lengths;
this.records = records;
}
}
Loading

0 comments on commit 528f909

Please sign in to comment.