Skip to content

Commit

Permalink
Add StartupLocker (#12)
Browse files Browse the repository at this point in the history
* Add StartupLocker

Add the following listeners as well:
* StartupJettyLifeCycleListener
* StartupWithLockJettyLifeCycleListener

Bump kiwi-parent, kiwi, kiwi-test
Add dropwizard-curator

* Fixing versions

* Update logic from code review suggestions

Closes #5

* Update src/main/java/org/kiwiproject/dropwizard/util/startup/StartupLocker.java

Co-authored-by: Scott Leberknight <[email protected]>

* Update src/main/java/org/kiwiproject/dropwizard/util/startup/StartupLocker.java

Co-authored-by: Scott Leberknight <[email protected]>

* Update src/main/java/org/kiwiproject/dropwizard/util/startup/StartupLocker.java

Co-authored-by: Scott Leberknight <[email protected]>

* Move StartupLockInfo to top level class and add validations during construction

* Change WARN log message in releaseStartupLockIfPresent to INFO

Co-authored-by: Scott Leberknight <[email protected]>
  • Loading branch information
chrisrohr and sleberknight authored Nov 29, 2020
1 parent b52c2ef commit c8038b6
Show file tree
Hide file tree
Showing 10 changed files with 767 additions and 1 deletion.
57 changes: 57 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
<properties>
<!-- Versions for required dependencies -->
<dropwizard.version>2.0.16</dropwizard.version>
<dropwizard-curator.version>0.14.0</dropwizard-curator.version>
<kiwi.version>0.17.0</kiwi.version>
<zookeeper.version>3.4.14</zookeeper.version>

<!-- Versions for provided dependencies -->
<hibernate-validator.version>6.1.6.Final</hibernate-validator.version>
Expand All @@ -42,6 +44,7 @@
<!-- Versions for optional dependencies -->

<!-- Versions for test dependencies -->
<curator.version>2.13.0</curator.version>
<jackson.version>2.10.5</jackson.version>
<kiwi-test.version>0.12.0</kiwi-test.version>

Expand Down Expand Up @@ -70,6 +73,18 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.kiwiproject</groupId>
<artifactId>dropwizard-curator</artifactId>
<version>${dropwizard-curator.version}</version>
<exclusions>
<exclusion>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>io.dropwizard</groupId>
<artifactId>dropwizard-jackson</artifactId>
Expand All @@ -92,6 +107,23 @@
<version>${kiwi.version}</version>
</dependency>

<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- Provided dependencies -->

<dependency>
Expand Down Expand Up @@ -161,6 +193,31 @@

<!-- Test dependencies -->

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<version>${curator.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>org.javassist</groupId>
<artifactId>javassist</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>io.dropwizard</groupId>
<artifactId>dropwizard-testing</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package org.kiwiproject.dropwizard.util.startup;

import static org.kiwiproject.base.KiwiPreconditions.requireNotBlank;
import static org.kiwiproject.base.KiwiPreconditions.requireNotNull;

import lombok.Builder;
import lombok.Getter;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessLock;

/**
* A value class that contains information about a startup lock, such as whether a lock was successfully
* acquired, the lock path, the lock itself, as well as information when any exception occurs.
* <p>
* NOTE: There is an assumption here that any users of this class will check the {@code lockState} prior to accessing other fields.
* Not all fields are populated for every state. For instance, {@code client}, {@code lock}, and {@code lockPath} are only used when
* lock state is {@code ACQUIRED}, where as {@code exception} is only used when lock state is {@code ACQUIRE_FAIL}.
*/
@Getter
public class StartupLockInfo {

public enum LockState {
NOT_ATTEMPTED, ACQUIRED, ACQUIRE_FAIL
}

private CuratorFramework client;
private InterProcessLock lock;
private String lockPath;
private final LockState lockState;
private final String infoMessage;
private Exception exception;

@Builder
StartupLockInfo(CuratorFramework client,
InterProcessLock lock,
String lockPath,
LockState lockState,
String infoMessage,
Exception exception) {

this.infoMessage = requireNotBlank(infoMessage, "infoMessage is required");
this.lockState = requireNotNull(lockState, "lockState is required");

if (lockState == LockState.ACQUIRED) {
this.client = requireNotNull(client, "client is required");
this.lock = requireNotNull(lock, "lock is required");
this.lockPath = requireNotBlank(lockPath, "lockPath is required");
} else if (lockState == LockState.ACQUIRE_FAIL) {
this.exception = requireNotNull(exception, "exception is required");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package org.kiwiproject.dropwizard.util.startup;

import static org.kiwiproject.base.KiwiStrings.format;

import io.dropwizard.setup.Environment;
import io.dropwizard.util.Duration;
import lombok.AccessLevel;
import lombok.Builder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.kiwiproject.curator.CuratorFrameworkHelper;
import org.kiwiproject.curator.CuratorLockHelper;
import org.kiwiproject.curator.config.CuratorConfig;
import org.kiwiproject.curator.exception.LockAcquisitionException;
import org.kiwiproject.curator.zookeeper.ZooKeeperAvailabilityChecker;
import org.kiwiproject.dropwizard.util.startup.PortAssigner.PortAssignment;
import org.kiwiproject.dropwizard.util.startup.listener.StartupJettyLifeCycleListener;
import org.kiwiproject.dropwizard.util.startup.listener.StartupWithLockJettyLifeCycleListener;

import java.util.Optional;

/**
* Utility to acquire and release a lock from ZooKeeper during startup of a Dropwizard service.
* <p>
* This is useful if you have multiple services on the same host or container, and each service
* needs to access resources without contention. For example, if multiple services start simultaneously
* and they are all attempting to obtain ports dynamically from a limited port range, you can use this
* class to ensure only one attempts to obtain ports at a time to avoid "Address already in use" errors.
*/
@Slf4j
@Getter(AccessLevel.PACKAGE) // For testing
public class StartupLocker {

private final ZooKeeperAvailabilityChecker zkAvailabilityChecker;
private final CuratorFrameworkHelper curatorFrameworkHelper;
private final CuratorLockHelper curatorLockHelper;
private final SystemExecutioner executioner;

@Builder
private StartupLocker(SystemExecutioner executioner,
ZooKeeperAvailabilityChecker zkAvailabilityChecker,
CuratorFrameworkHelper curatorFrameworkHelper,
CuratorLockHelper curatorLockHelper) {
this.executioner = Optional.ofNullable(executioner).orElseGet(SystemExecutioner::new);
this.zkAvailabilityChecker = Optional.ofNullable(zkAvailabilityChecker).orElseGet(ZooKeeperAvailabilityChecker::new);
this.curatorFrameworkHelper = Optional.ofNullable(curatorFrameworkHelper).orElseGet(CuratorFrameworkHelper::new);
this.curatorLockHelper = Optional.ofNullable(curatorLockHelper).orElseGet(CuratorLockHelper::new);
}

/**
* Attempts to acquire a lock from ZooKeeper during startup.
*
* @param lockPath the path in ZooKeeper to store the lock
* @param lockTimeout the amount of time to wait for the lock to be acquired
* @param curatorConfig the Curator configuration
* @param environment the Dropwizard environment
* @return information about the attempted lock, whether it was obtained, etc. Clients are expected check the
* lock state contained in this object, and take appropriate actions.
*/
public StartupLockInfo acquireStartupLock(String lockPath,
Duration lockTimeout,
PortAssignment assignment,
CuratorConfig curatorConfig,
Environment environment) {

if (assignment == PortAssignment.STATIC) {
return StartupLockInfo.builder()
.lockState(StartupLockInfo.LockState.NOT_ATTEMPTED)
.infoMessage("Using static port assignment. Lock not needed.")
.build();
}

if (zkAvailabilityChecker.anyZooKeepersAvailable(curatorConfig)) {
var curatorFramework = curatorFrameworkHelper.startCuratorFramework(curatorConfig);
var lock = curatorLockHelper.createInterProcessMutex(curatorFramework, lockPath);

try {
tryAcquireStartupLock(lock, lockPath, lockTimeout);
environment.lifecycle().addLifeCycleListener(
new StartupWithLockJettyLifeCycleListener(curatorFramework, lock, lockPath, executioner));

return StartupLockInfo.builder()
.client(curatorFramework)
.lock(lock)
.lockPath(lockPath)
.lockState(StartupLockInfo.LockState.ACQUIRED)
.infoMessage("Lock acquired")
.build();
} catch (LockAcquisitionException e) {
LOG.warn("Lock on path [{}] not obtained. Closing Curator.", lockPath);
curatorFrameworkHelper.closeQuietly(curatorFramework);

return StartupLockInfo.builder()
.lockState(StartupLockInfo.LockState.ACQUIRE_FAIL)
.infoMessage("Failed to obtain startup lock")
.exception(e)
.build();
}
}

return StartupLockInfo.builder()
.lockState(StartupLockInfo.LockState.NOT_ATTEMPTED)
.infoMessage(format("No ZooKeepers are available from connect string [{}]", curatorConfig.getZkConnectString()))
.build();
}

private void tryAcquireStartupLock(InterProcessMutex lock, String lockPath, Duration lockTimeout) {
LOG.debug("Start lock acquisition for path [{}]. Timeout set to [{}]", lockPath, lockTimeout);
curatorLockHelper.acquire(lock, lockTimeout.getQuantity(), lockTimeout.getUnit());
LOG.debug("Acquired lock on path [{}]", lockPath);
}

/**
* Adds a {@link StartupJettyLifeCycleListener} in case the lock was never acquired.
*
* @param lockInfo the lock info indicating if the lock was acquired
* @param environment the Dropwizard environment used to add the listener
*/
public void addFallbackJettyStartupLifeCycleListener(StartupLockInfo lockInfo, Environment environment) {
if (lockInfo.getLockState() != StartupLockInfo.LockState.ACQUIRED) {
environment.lifecycle().addLifeCycleListener(new StartupJettyLifeCycleListener(executioner));
}
}

/**
* Cleans up the startup lock if it was acquired.
*
* @param lockInfo the lock info indicating if the lock was acquired
*/
public void releaseStartupLockIfPresent(StartupLockInfo lockInfo) {
if (lockInfo.getLockState() == StartupLockInfo.LockState.ACQUIRED) {
LOG.info("Releasing lock [{}] on path [{}]", lockInfo.getLock(), lockInfo.getLockPath());
curatorLockHelper.releaseLockQuietlyIfHeld(lockInfo.getLock());
curatorFrameworkHelper.closeIfStarted(lockInfo.getClient());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
@Slf4j
public class SystemExecutioner {

void exit() {
public void exit() {
LOG.warn("Terminating the VM!");
System.exit(1);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.kiwiproject.dropwizard.util.startup.listener;

import static org.kiwiproject.base.KiwiPreconditions.requireNotNull;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.component.LifeCycle;
import org.kiwiproject.dropwizard.util.startup.SystemExecutioner;

/**
* A Jetty {@link org.eclipse.jetty.util.component.LifeCycle.Listener} that shuts down the system in the case of
* a lifecycle failure.
*/
@Slf4j
public class StartupJettyLifeCycleListener extends AbstractLifeCycle.AbstractLifeCycleListener {

private final SystemExecutioner executioner;

public StartupJettyLifeCycleListener(SystemExecutioner executioner) {
this.executioner = requireNotNull(executioner);
}

@Override
public void lifeCycleFailure(LifeCycle event, Throwable cause) {
LOG.error("Jetty LifeCycleFailure with event [{}]. Exiting the JVM!", event);
executioner.exit();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package org.kiwiproject.dropwizard.util.startup.listener;

import static org.kiwiproject.base.KiwiPreconditions.requireNotBlank;
import static org.kiwiproject.base.KiwiPreconditions.requireNotNull;

import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.component.LifeCycle;
import org.kiwiproject.curator.CuratorFrameworkHelper;
import org.kiwiproject.curator.CuratorLockHelper;
import org.kiwiproject.dropwizard.util.startup.SystemExecutioner;

/**
* A Jetty {@link org.eclipse.jetty.util.component.LifeCycle.Listener} that releases the ZooKeeper startup lock if
* it exists. In the case of a lifecycle failure, it shuts down the system.
*/
@Slf4j
public class StartupWithLockJettyLifeCycleListener extends AbstractLifeCycle.AbstractLifeCycleListener {

private final CuratorFramework curatorFramework;
private final InterProcessLock lock;
private final String lockPath;
private final CuratorLockHelper curatorLockHelper = new CuratorLockHelper();
private final CuratorFrameworkHelper curatorFrameworkHelper = new CuratorFrameworkHelper();
private final SystemExecutioner executioner;

public StartupWithLockJettyLifeCycleListener(CuratorFramework curatorFramework, InterProcessLock lock, String lockPath, SystemExecutioner executioner) {
this.curatorFramework = requireNotNull(curatorFramework);
this.lock = requireNotNull(lock);
this.lockPath = requireNotBlank(lockPath);
this.executioner = requireNotNull(executioner);
}

@Override
public void lifeCycleFailure(LifeCycle event, Throwable cause) {
LOG.error("Jetty LifeCycleFailure with event [{}]. Releasing lock [{}] on path [{}] and exiting the JVM!", event, lock, lockPath);
releaseLockAndClose();
executioner.exit();
}

@Override
public void lifeCycleStarted(LifeCycle event) {
LOG.trace("Jetty LifeCycleStarted with event [{}]. Releasing lock [{}] on path [{}].", event, lock, lockPath);
releaseLockAndClose();
}

@Override
public void lifeCycleStopped(LifeCycle event) {
LOG.trace("Jetty LifeCycleStopped with event [{}]. Releasing lock [{}] on path [{}] if still held (which is highly unlikely).", event, lock, lockPath);
releaseLockAndClose();
}

private void releaseLockAndClose() {
curatorLockHelper.releaseLockQuietlyIfHeld(lock);
curatorFrameworkHelper.closeQuietly(curatorFramework);
}
}
Loading

0 comments on commit c8038b6

Please sign in to comment.