Skip to content

Commit

Permalink
CURATOR-607: InterProcessReadWriteLock should expose exposing getLock…
Browse files Browse the repository at this point in the history
…Path (#394)

Co-authored-by: Nikita Sokolov <[email protected]>
  • Loading branch information
faucct and Nikita Sokolov authored Oct 3, 2021
1 parent 137159b commit ddadeea
Show file tree
Hide file tree
Showing 2 changed files with 200 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@
*/
public class InterProcessReadWriteLock
{
private final InterProcessMutex readMutex;
private final InterProcessMutex writeMutex;
private final ReadLock readMutex;
private final WriteLock writeMutex;

// must be the same length. LockInternals depends on it
private static final String READ_LOCK_NAME = "__READ__";
Expand All @@ -82,33 +82,100 @@ private static class InternalInterProcessMutex extends InterProcessMutex
{
super(client, path, lockName, maxLeases, driver);
this.lockName = lockName;
this.lockData = lockData;
this.lockData = (lockData == null) ? null : Arrays.copyOf(lockData, lockData.length);
}

@Override
public Collection<String> getParticipantNodes() throws Exception
final public Collection<String> getParticipantNodes() throws Exception
{
Collection<String> nodes = super.getParticipantNodes();
Iterable<String> filtered = Iterables.filter
(
nodes,
new Predicate<String>()
{
@Override
public boolean apply(String node)
{
return node.contains(lockName);
}
return ImmutableList.copyOf(Iterables.filter(super.getParticipantNodes(), new Predicate<String>() {
@Override
public boolean apply(String node) {
return node.contains(lockName);
}
);
return ImmutableList.copyOf(filtered);
}));
}

@Override
protected byte[] getLockNodeBytes()
final protected byte[] getLockNodeBytes()
{
return lockData;
}

@Override
protected String getLockPath()
{
return super.getLockPath();
}
}

public static class WriteLock extends InternalInterProcessMutex
{
public WriteLock(CuratorFramework client, String basePath, byte[] lockData)
{
super(client, basePath, WRITE_LOCK_NAME, lockData, 1, new SortingLockInternalsDriver() {
@Override
public PredicateResults getsTheLock(
CuratorFramework client,
List<String> children,
String sequenceNodeName,
int maxLeases
) throws Exception {
return super.getsTheLock(client, children, sequenceNodeName, maxLeases);
}
});
}

@Override
protected String getLockPath()
{
return super.getLockPath();
}
}

public static class ReadLock extends InternalInterProcessMutex {
public ReadLock(CuratorFramework client, String basePath, byte[] lockData, WriteLock writeLock)
{
super(client, basePath, READ_LOCK_NAME, lockData, Integer.MAX_VALUE, new SortingLockInternalsDriver() {
@Override
public PredicateResults getsTheLock(
CuratorFramework client,
List<String> children,
String sequenceNodeName,
int maxLeases
) throws Exception {
if (writeLock.isOwnedByCurrentThread()) {
return new PredicateResults(null, true);
}

int index = 0;
int firstWriteIndex = Integer.MAX_VALUE;
int ourIndex = -1;
for (String node : children) {
if (node.contains(WRITE_LOCK_NAME)) {
firstWriteIndex = Math.min(index, firstWriteIndex);
} else if (node.startsWith(sequenceNodeName)) {
ourIndex = index;
break;
}

++index;
}

validateOurIndex(sequenceNodeName, ourIndex);

boolean getsTheLock = (ourIndex < firstWriteIndex);
String pathToWatch = getsTheLock ? null : children.get(firstWriteIndex);
return new PredicateResults(pathToWatch, getsTheLock);
}
});
}

@Override
protected String getLockPath()
{
return super.getLockPath();
}
}

/**
Expand All @@ -127,49 +194,22 @@ public InterProcessReadWriteLock(CuratorFramework client, String basePath)
*/
public InterProcessReadWriteLock(CuratorFramework client, String basePath, byte[] lockData)
{
lockData = (lockData == null) ? null : Arrays.copyOf(lockData, lockData.length);

writeMutex = new InternalInterProcessMutex
(
client,
basePath,
WRITE_LOCK_NAME,
lockData,
1,
new SortingLockInternalsDriver()
{
@Override
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
{
return super.getsTheLock(client, children, sequenceNodeName, maxLeases);
}
}
);

readMutex = new InternalInterProcessMutex
(
client,
basePath,
READ_LOCK_NAME,
lockData,
Integer.MAX_VALUE,
new SortingLockInternalsDriver()
{
@Override
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
{
return readLockPredicate(children, sequenceNodeName);
}
}
);
this.writeMutex = new WriteLock(client, basePath, lockData);
this.readMutex = new ReadLock(client, basePath, lockData, writeMutex);
}

protected InterProcessReadWriteLock(WriteLock writeLock, ReadLock readLock)
{
this.writeMutex = writeLock;
this.readMutex = readLock;
}

/**
* Returns the lock used for reading.
*
* @return read lock
*/
public InterProcessMutex readLock()
public ReadLock readLock()
{
return readMutex;
}
Expand All @@ -179,40 +219,8 @@ public InterProcessMutex readLock()
*
* @return write lock
*/
public InterProcessMutex writeLock()
public WriteLock writeLock()
{
return writeMutex;
}

private PredicateResults readLockPredicate(List<String> children, String sequenceNodeName) throws Exception
{
if ( writeMutex.isOwnedByCurrentThread() )
{
return new PredicateResults(null, true);
}

int index = 0;
int firstWriteIndex = Integer.MAX_VALUE;
int ourIndex = -1;
for ( String node : children )
{
if ( node.contains(WRITE_LOCK_NAME) )
{
firstWriteIndex = Math.min(index, firstWriteIndex);
}
else if ( node.startsWith(sequenceNodeName) )
{
ourIndex = index;
break;
}

++index;
}

StandardLockInternalsDriver.validateOurIndex(sequenceNodeName, ourIndex);

boolean getsTheLock = (ourIndex < firstWriteIndex);
String pathToWatch = getsTheLock ? null : children.get(firstWriteIndex);
return new PredicateResults(pathToWatch, getsTheLock);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import com.google.common.collect.Lists;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.TestCleanState;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.KillSession;
import org.apache.zookeeper.KeeperException;
import org.junit.jupiter.api.Test;

import java.util.Collection;
Expand Down Expand Up @@ -362,4 +365,106 @@ private void doLocking(InterProcessLock lock, AtomicInteger concurrentCount, Ato
}
}
}

public static class LockPathInterProcessReadWriteLock extends InterProcessReadWriteLock
{
private final WriteLock writeLock;
private final ReadLock readLock;

public LockPathInterProcessReadWriteLock(CuratorFramework client, String basePath)
{
this(client, basePath, null);
}

public LockPathInterProcessReadWriteLock(CuratorFramework client, String basePath, byte[] lockData)
{
this(client, basePath, lockData, new WriteLock(client, basePath, lockData));
}

private LockPathInterProcessReadWriteLock(
CuratorFramework client,
String basePath,
byte[] lockData,
WriteLock writeLock
)
{
this(writeLock, new ReadLock(client, basePath, lockData, writeLock));
}

private LockPathInterProcessReadWriteLock(WriteLock writeLock, ReadLock readLock)
{
super(writeLock, readLock);
this.writeLock = writeLock;
this.readLock = readLock;
}

@Override
public WriteLock writeLock()
{
return writeLock;
}

@Override
public ReadLock readLock()
{
return readLock;
}

public static class WriteLock extends InterProcessReadWriteLock.WriteLock
{
private WriteLock(CuratorFramework client, String basePath, byte[] lockData)
{
super(client, basePath, lockData);
}

@Override
public String getLockPath()
{
return super.getLockPath();
}
}

public static class ReadLock extends InterProcessReadWriteLock.ReadLock
{
private ReadLock(CuratorFramework client, String basePath, byte[] lockData, WriteLock writeLock)
{
super(client, basePath, lockData, writeLock);
}

@Override
public String getLockPath()
{
return super.getLockPath();
}
}
}

@Test
public void testLockPath() throws Exception
{
CuratorFramework client1 = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
CuratorFramework client2 = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
try
{
client1.start();
client2.start();
LockPathInterProcessReadWriteLock lock1 = new LockPathInterProcessReadWriteLock(client1, "/lock");
LockPathInterProcessReadWriteLock lock2 = new LockPathInterProcessReadWriteLock(client2, "/lock");
lock1.writeLock().acquire();
KillSession.kill(client1.getZookeeperClient().getZooKeeper());
lock2.readLock().acquire();
try {
client1.getData().forPath(lock1.writeLock().getLockPath());
fail("expected not to find node");
} catch (KeeperException.NoNodeException ignored) {
}
lock2.readLock().release();
lock1.writeLock().release();
}
finally
{
TestCleanState.closeAndTestClean(client2);
TestCleanState.closeAndTestClean(client1);
}
}
}

0 comments on commit ddadeea

Please sign in to comment.