Skip to content

Commit

Permalink
[CELEBORN-1567] Support throw FetchFailedException when Data corrupti…
Browse files Browse the repository at this point in the history
…on detected

### What changes were proposed in this pull request?

### Why are the changes needed?
#2655 (review)

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
GA

Closes #2691 from cxzl25/CELEBORN-1567.

Authored-by: sychen <[email protected]>
Signed-off-by: Shaoyun Chen <[email protected]>
(cherry picked from commit b8f275d)
Signed-off-by: Shaoyun Chen <[email protected]>
  • Loading branch information
cxzl25 committed Aug 20, 2024
1 parent b5ac49f commit 08716da
Showing 1 changed file with 10 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@

import scala.Tuple2;

import com.github.luben.zstd.ZstdException;
import com.google.common.util.concurrent.Uninterruptibles;
import io.netty.buffer.ByteBuf;
import net.jpountz.lz4.LZ4Exception;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -664,15 +666,20 @@ private boolean fillBuffer() throws IOException {
}

return hasData;
} catch (IOException e) {
} catch (LZ4Exception | ZstdException | IOException e) {
logger.error(
"Failed to fill buffer from chunk. AppShuffleId {}, shuffleId {}, partitionId {}, location {}",
appShuffleId,
shuffleId,
partitionId,
currentReader.getLocation(),
e);
IOException ioe = e;
IOException ioe;
if (e instanceof IOException) {
ioe = (IOException) e;
} else {
ioe = new IOException(e);
}
if (exceptionMaker != null) {
if (shuffleClient.reportShuffleFetchFailure(appShuffleId, shuffleId)) {
/*
Expand All @@ -689,7 +696,7 @@ private boolean fillBuffer() throws IOException {
throw ioe;
} catch (Exception e) {
logger.error(
"Failed to read data from chunk. AppShuffleId {}, shuffleId {}, partitionId {}, location {}",
"Failed to fill buffer from chunk. AppShuffleId {}, shuffleId {}, partitionId {}, location {}",
appShuffleId,
shuffleId,
partitionId,
Expand Down

0 comments on commit 08716da

Please sign in to comment.