Skip to content

Commit

Permalink
INTERNAL: Using addOperations for atomicity in Broadcast Op.
Browse files Browse the repository at this point in the history
  • Loading branch information
brido4125 authored and uhm0311 committed Apr 2, 2024
1 parent 7aec6f0 commit ae2a82f
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 18 deletions.
6 changes: 4 additions & 2 deletions src/main/java/net/spy/memcached/ArcusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1901,6 +1901,7 @@ public OperationFuture<Boolean> flush(final String prefix, final int delay) {
Collection<MemcachedNode> nodes = getAllNodes();
final BroadcastFuture<Boolean> rv
= new BroadcastFuture<Boolean>(operationTimeout, Boolean.TRUE, nodes.size());
final Map<MemcachedNode, Operation> opsMap = new HashMap<MemcachedNode, Operation>();

checkState();
for (MemcachedNode node : nodes) {
Expand All @@ -1917,9 +1918,10 @@ public void complete() {
rv.complete();
}
});
rv.addOp(op);
getMemcachedConnection().addOperation(node, op);
opsMap.put(node, op);
}
rv.addOperations(opsMap.values());
getMemcachedConnection().addOperations(opsMap);
return rv;
}

Expand Down
24 changes: 16 additions & 8 deletions src/main/java/net/spy/memcached/MemcachedClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1549,6 +1549,7 @@ public Map<SocketAddress, String> getVersions() {
final BroadcastFuture<Map<SocketAddress, String>> future
= new BroadcastFuture<Map<SocketAddress, String>>(
operationTimeout, result, nodes.size());
final Map<MemcachedNode, Operation> opsMap = new HashMap<MemcachedNode, Operation>();

checkState();
for (MemcachedNode node : nodes) {
Expand All @@ -1564,9 +1565,10 @@ public void complete() {
future.complete();
}
});
future.addOp(op);
conn.addOperation(node, op);
opsMap.put(node, op);
}
future.addOperations(opsMap.values());
conn.addOperations(opsMap);

Map<SocketAddress, String> rv = null;
try {
Expand Down Expand Up @@ -1612,6 +1614,7 @@ public Map<SocketAddress, Map<String, String>> getStats(final String arg) {
final BroadcastFuture<Map<SocketAddress, Map<String, String>>> future
= new BroadcastFuture<Map<SocketAddress, Map<String, String>>>(
operationTimeout, resultMap, nodes.size());
final Map<MemcachedNode, Operation> opsMap = new HashMap<MemcachedNode, Operation>();

checkState();
for (MemcachedNode node : nodes) {
Expand All @@ -1637,9 +1640,10 @@ public void complete() {
future.complete();
}
});
future.addOp(op);
conn.addOperation(node, op);
opsMap.put(node, op);
}
future.addOperations(opsMap.values());
conn.addOperations(opsMap);

Map<SocketAddress, Map<String, String>> rv = null;
try {
Expand Down Expand Up @@ -1965,6 +1969,7 @@ public Future<Boolean> flush(final int delay) {
Collection<MemcachedNode> nodes = getAllNodes();
final BroadcastFuture<Boolean> rv
= new BroadcastFuture<Boolean>(operationTimeout, Boolean.TRUE, nodes.size());
final Map<MemcachedNode, Operation> opsMap = new HashMap<MemcachedNode, Operation>();

checkState();
for (MemcachedNode node : nodes) {
Expand All @@ -1981,9 +1986,10 @@ public void complete() {
rv.complete();
}
});
rv.addOp(op);
conn.addOperation(node, op);
opsMap.put(node, op);
}
rv.addOperations(opsMap.values());
conn.addOperations(opsMap);
return rv;
}

Expand All @@ -2005,6 +2011,7 @@ public Set<String> listSaslMechanisms() {
final BroadcastFuture<ConcurrentMap<String, String>> future
= new BroadcastFuture<ConcurrentMap<String, String>>(
operationTimeout, resultMap, nodes.size());
final Map<MemcachedNode, Operation> opsMap = new HashMap<MemcachedNode, Operation>();

checkState();
for (MemcachedNode node : nodes) {
Expand All @@ -2021,9 +2028,10 @@ public void complete() {
future.complete();
}
});
future.addOp(op);
conn.addOperation(node, op);
opsMap.put(node, op);
}
future.addOperations(opsMap.values());
conn.addOperations(opsMap);

Set<String> rv = null;
try {
Expand Down
7 changes: 1 addition & 6 deletions src/main/java/net/spy/memcached/MemcachedConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -1482,13 +1482,8 @@ public void addOperation(final MemcachedNode node, final Operation o) {

public void addOperations(final Map<MemcachedNode, Operation> ops) {
for (Map.Entry<MemcachedNode, Operation> me : ops.entrySet()) {
final MemcachedNode node = me.getKey();
Operation o = me.getValue();
node.addOpToInputQ(o);
addedQueue.offer(node);
addOperation(me.getKey(), me.getValue());
}
Selector s = selector.wakeup();
assert s == selector : "Wakeup returned the wrong selector.";
}

public void wakeUpSelector() {
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/net/spy/memcached/internal/BroadcastFuture.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ public T get(long duration, TimeUnit units)
return objRef.get();
}

public void addOp(Operation op) {
ops.add(op);
public void addOperations(Collection<Operation> ops) {
this.ops.addAll(ops);
}

public void complete() {
Expand Down

0 comments on commit ae2a82f

Please sign in to comment.