diff --git a/src/main/java/net/spy/memcached/internal/BulkOperationFuture.java b/src/main/java/net/spy/memcached/internal/BulkOperationFuture.java index 28aaa997e..c4350b257 100644 --- a/src/main/java/net/spy/memcached/internal/BulkOperationFuture.java +++ b/src/main/java/net/spy/memcached/internal/BulkOperationFuture.java @@ -3,6 +3,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -90,16 +91,21 @@ public Map get(long duration, MemcachedConnection.opsSucceeded(ops); } + List exceptions = new ArrayList<>(); for (Operation op : ops) { if (op != null && op.hasErrored()) { - throw new ExecutionException(op.getException()); + exceptions.add(op.getException()); } if (op != null && op.isCancelled()) { - throw new ExecutionException(new RuntimeException(op.getCancelCause())); + exceptions.add(new RuntimeException(op.getCancelCause())); } } + if (!exceptions.isEmpty()) { + throw new CompositeException(exceptions); + } + return failedResult; } diff --git a/src/main/java/net/spy/memcached/protocol/BaseOperationImpl.java b/src/main/java/net/spy/memcached/protocol/BaseOperationImpl.java index 52da18328..62d63b8ce 100644 --- a/src/main/java/net/spy/memcached/protocol/BaseOperationImpl.java +++ b/src/main/java/net/spy/memcached/protocol/BaseOperationImpl.java @@ -36,6 +36,8 @@ import net.spy.memcached.ops.OperationType; import net.spy.memcached.ops.StatusCode; +import static net.spy.memcached.ops.OperationErrorType.CLIENT; + /** * Base class for protocol-specific operation implementations. */ @@ -51,7 +53,7 @@ public abstract class BaseOperationImpl extends SpyObject { private boolean cancelled = false; private final AtomicBoolean callbacked = new AtomicBoolean(false); private String cancelCause = null; - private OperationException exception = null; + protected OperationException exception = null; private OperationCallback callback = null; private volatile MemcachedNode handlingNode = null; @@ -241,25 +243,15 @@ public final void writeComplete() { protected void handleError(OperationErrorType eType, String line) throws IOException { getLogger().error("Error: %s by %s", line, this); - switch (eType) { - case GENERAL: - case SERVER: - exception = new OperationException(eType, line + " @ " + handlingNode.getNodeName()); - break; - case CLIENT: - if (line.contains("bad command line format")) { - initialize(); - byte[] bytes = new byte[cmd.remaining()]; - cmd.get(bytes); - - String[] cmdLines = new String(bytes).split("\r\n"); - getLogger().error("Bad command: %s", cmdLines[0]); - } - exception = new OperationException(eType, line + " @ " + handlingNode.getNodeName()); - break; - default: - assert false; + if (eType == CLIENT && line.contains("bad command line format")) { + initialize(); + byte[] bytes = new byte[cmd.remaining()]; + cmd.get(bytes); + + String[] cmdLines = new String(bytes).split("\r\n"); + getLogger().error("Bad command: %s", cmdLines[0]); } + exception = new OperationException(eType, line + " @ " + handlingNode.getNodeName()); transitionState(OperationState.COMPLETE); throw exception; } diff --git a/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java b/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java index e472f7e8a..5122b72c0 100644 --- a/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java +++ b/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java @@ -17,6 +17,7 @@ */ package net.spy.memcached.protocol.ascii; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collection; import java.util.Collections; @@ -27,6 +28,8 @@ import net.spy.memcached.ops.CollectionOperationStatus; import net.spy.memcached.ops.CollectionPipedInsertOperation; import net.spy.memcached.ops.OperationCallback; +import net.spy.memcached.ops.OperationErrorType; +import net.spy.memcached.ops.OperationException; import net.spy.memcached.ops.OperationState; import net.spy.memcached.ops.OperationStatus; import net.spy.memcached.ops.OperationType; @@ -66,7 +69,7 @@ public final class CollectionPipedInsertOperationImpl extends OperationImpl private final CollectionPipedInsert insert; private final CollectionPipedInsertOperation.Callback cb; - private int count; + private int count = 0; private int index = 0; private boolean successAll = true; @@ -99,9 +102,11 @@ assert getState() == OperationState.READING if (hasSwitchedOver(line)) { this.insert.setNextOpIndex(index); prepareSwitchover(line); + count = 0; return; } /* ENABLE_REPLICATION end */ + /* ENABLE_MIGRATION if */ if (hasNotMyKey(line)) { // Only one NOT_MY_KEY is provided in response of single key piped operation when redirection. @@ -110,6 +115,7 @@ assert getState() == OperationState.READING transitionState(OperationState.REDIRECT); } else { insert.setNextOpIndex(index); + count = 0; } return; } @@ -126,7 +132,6 @@ assert getState() == OperationState.READING cb.receivedStatus(FAILED_END); } transitionState(OperationState.COMPLETE); - return; } /* @@ -168,6 +173,16 @@ assert getState() == OperationState.READING } } + @Override + protected void handleError(OperationErrorType eType, String line) throws IOException { + if (count == 0) { + super.handleError(eType, line); + } else { + getLogger().error("Error: %s by %s", line, this); + exception = new OperationException(eType, line + " @ " + getHandlingNode().getNodeName()); + } + } + @Override public void initialize() { ByteBuffer buffer = insert.getAsciiCommand(); diff --git a/src/main/java/net/spy/memcached/protocol/ascii/OperationImpl.java b/src/main/java/net/spy/memcached/protocol/ascii/OperationImpl.java index 37a8d7723..231436cb0 100644 --- a/src/main/java/net/spy/memcached/protocol/ascii/OperationImpl.java +++ b/src/main/java/net/spy/memcached/protocol/ascii/OperationImpl.java @@ -163,6 +163,9 @@ public void readFromBuffer(ByteBuffer data) throws IOException { } else { // OperationReadType.DATA handleRead(data); } + if (isPipeOperation() && getState() == OperationState.COMPLETE && hasErrored()) { + throw getException(); + } } } diff --git a/src/test/java/net/spy/memcached/protocol/ascii/BaseOpTest.java b/src/test/java/net/spy/memcached/protocol/ascii/BaseOpTest.java index 65cd8b8cc..5d0ead20a 100644 --- a/src/test/java/net/spy/memcached/protocol/ascii/BaseOpTest.java +++ b/src/test/java/net/spy/memcached/protocol/ascii/BaseOpTest.java @@ -19,14 +19,24 @@ package net.spy.memcached.protocol.ascii; +import java.net.InetSocketAddress; import java.nio.Buffer; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; + +import net.spy.memcached.collection.CollectionPipedInsert; +import net.spy.memcached.ops.CollectionPipedInsertOperation; +import net.spy.memcached.ops.Operation; +import net.spy.memcached.ops.OperationCallback; +import net.spy.memcached.ops.OperationException; +import net.spy.memcached.ops.OperationStatus; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertSame; @@ -99,6 +109,51 @@ void testPartialLine() throws Exception { assertEquals("this is a test", op.getCurrentLine()); } + @Test + void throwExceptionAfterReadingEndOrPipeError() throws Exception { + String key = "testPipeLine"; + CollectionPipedInsert.ListPipedInsert insert = + new CollectionPipedInsert.ListPipedInsert<>(key, 0, + Arrays.asList("a", "b"), null, null); + OperationCallback cb = new CollectionPipedInsertOperation.Callback() { + @Override + public void receivedStatus(OperationStatus status) { + } + + @Override + public void complete() { + } + + @Override + public void gotStatus(Integer index, OperationStatus status) { + } + }; + CollectionPipedInsertOperationImpl op = + new CollectionPipedInsertOperationImpl("test", insert, cb); + LinkedBlockingQueue queue = new LinkedBlockingQueue<>(); + op.setHandlingNode(new AsciiMemcachedNodeImpl("testnode", new InetSocketAddress(11211), + 60, queue, queue, queue, 0L)); + + ByteBuffer b = ByteBuffer.allocate(40); + String line1 = "RESPONSE 2\r\n"; + op.writeComplete(); + b.put(line1.getBytes()); + b.flip(); + assertDoesNotThrow(() -> op.readFromBuffer(b)); + b.clear(); + + String line2 = "SERVER_ERROR out of memory\r\n"; + b.put(line2.getBytes()); + b.flip(); + assertDoesNotThrow(() -> op.readFromBuffer(b)); + b.clear(); + + String line4 = "PIPE_ERROR failed\r\n"; + b.put(line4.getBytes()); + b.flip(); + assertThrows(OperationException.class, () -> op.readFromBuffer(b)); + } + private static class SimpleOp extends OperationImpl { private final LinkedList lines = new LinkedList<>();