diff --git a/src/main/java/net/spy/memcached/ArcusClient.java b/src/main/java/net/spy/memcached/ArcusClient.java index f7ad33b15..be5340cde 100644 --- a/src/main/java/net/spy/memcached/ArcusClient.java +++ b/src/main/java/net/spy/memcached/ArcusClient.java @@ -1901,6 +1901,7 @@ public OperationFuture flush(final String prefix, final int delay) { Collection nodes = getAllNodes(); final BroadcastFuture rv = new BroadcastFuture(operationTimeout, Boolean.TRUE, nodes.size()); + final Map opsMap = new HashMap(); checkState(); for (MemcachedNode node : nodes) { @@ -1917,9 +1918,10 @@ public void complete() { rv.complete(); } }); - rv.addOp(op); - getMemcachedConnection().addOperation(node, op); + opsMap.put(node, op); } + rv.addOperations(opsMap.values()); + getMemcachedConnection().addOperations(opsMap); return rv; } diff --git a/src/main/java/net/spy/memcached/MemcachedClient.java b/src/main/java/net/spy/memcached/MemcachedClient.java index e280f4dad..5219b5c53 100644 --- a/src/main/java/net/spy/memcached/MemcachedClient.java +++ b/src/main/java/net/spy/memcached/MemcachedClient.java @@ -1549,6 +1549,7 @@ public Map getVersions() { final BroadcastFuture> future = new BroadcastFuture>( operationTimeout, result, nodes.size()); + final Map opsMap = new HashMap(); checkState(); for (MemcachedNode node : nodes) { @@ -1564,9 +1565,10 @@ public void complete() { future.complete(); } }); - future.addOp(op); - conn.addOperation(node, op); + opsMap.put(node, op); } + future.addOperations(opsMap.values()); + conn.addOperations(opsMap); Map rv = null; try { @@ -1612,6 +1614,7 @@ public Map> getStats(final String arg) { final BroadcastFuture>> future = new BroadcastFuture>>( operationTimeout, resultMap, nodes.size()); + final Map opsMap = new HashMap(); checkState(); for (MemcachedNode node : nodes) { @@ -1637,9 +1640,10 @@ public void complete() { future.complete(); } }); - future.addOp(op); - conn.addOperation(node, op); + opsMap.put(node, op); } + future.addOperations(opsMap.values()); + conn.addOperations(opsMap); Map> rv = null; try { @@ -1965,6 +1969,7 @@ public Future flush(final int delay) { Collection nodes = getAllNodes(); final BroadcastFuture rv = new BroadcastFuture(operationTimeout, Boolean.TRUE, nodes.size()); + final Map opsMap = new HashMap(); checkState(); for (MemcachedNode node : nodes) { @@ -1981,9 +1986,10 @@ public void complete() { rv.complete(); } }); - rv.addOp(op); - conn.addOperation(node, op); + opsMap.put(node, op); } + rv.addOperations(opsMap.values()); + conn.addOperations(opsMap); return rv; } @@ -2005,6 +2011,7 @@ public Set listSaslMechanisms() { final BroadcastFuture> future = new BroadcastFuture>( operationTimeout, resultMap, nodes.size()); + final Map opsMap = new HashMap(); checkState(); for (MemcachedNode node : nodes) { @@ -2021,9 +2028,10 @@ public void complete() { future.complete(); } }); - future.addOp(op); - conn.addOperation(node, op); + opsMap.put(node, op); } + future.addOperations(opsMap.values()); + conn.addOperations(opsMap); Set rv = null; try { diff --git a/src/main/java/net/spy/memcached/MemcachedConnection.java b/src/main/java/net/spy/memcached/MemcachedConnection.java index 9679b4219..b2bc4935b 100644 --- a/src/main/java/net/spy/memcached/MemcachedConnection.java +++ b/src/main/java/net/spy/memcached/MemcachedConnection.java @@ -1482,13 +1482,8 @@ public void addOperation(final MemcachedNode node, final Operation o) { public void addOperations(final Map ops) { for (Map.Entry me : ops.entrySet()) { - final MemcachedNode node = me.getKey(); - Operation o = me.getValue(); - node.addOpToInputQ(o); - addedQueue.offer(node); + addOperation(me.getKey(), me.getValue()); } - Selector s = selector.wakeup(); - assert s == selector : "Wakeup returned the wrong selector."; } public void wakeUpSelector() { diff --git a/src/main/java/net/spy/memcached/internal/BroadcastFuture.java b/src/main/java/net/spy/memcached/internal/BroadcastFuture.java index 46f746eb3..ec25a3007 100644 --- a/src/main/java/net/spy/memcached/internal/BroadcastFuture.java +++ b/src/main/java/net/spy/memcached/internal/BroadcastFuture.java @@ -84,8 +84,8 @@ public T get(long duration, TimeUnit units) return objRef.get(); } - public void addOp(Operation op) { - ops.add(op); + public void addOperations(Collection ops) { + this.ops.addAll(ops); } public void complete() {