Skip to content

Commit

Permalink
Update logic from code review suggestions
Browse files Browse the repository at this point in the history
Closes #5
  • Loading branch information
chrisrohr committed Nov 28, 2020
1 parent 76dda10 commit de37e32
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 87 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
package org.kiwiproject.dropwizard.util.startup;

import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;
import static org.kiwiproject.base.KiwiPreconditions.requireNotNull;
import static org.kiwiproject.base.KiwiStrings.format;

import com.google.common.annotations.VisibleForTesting;
import io.dropwizard.setup.Environment;
import io.dropwizard.util.Duration;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -30,38 +26,42 @@
* Utilities to acquire and release a lock from ZooKeeper during startup of the service
*/
@Slf4j
@Getter(AccessLevel.PACKAGE) // For testing
public class StartupLocker {

@Getter(AccessLevel.PACKAGE)
@VisibleForTesting
private final ZooKeeperAvailabilityChecker zkAvailabilityChecker;

@Getter(AccessLevel.PACKAGE)
@VisibleForTesting
private final CuratorFrameworkHelper curatorFrameworkHelper;

@Getter(AccessLevel.PACKAGE)
@VisibleForTesting
private final CuratorLockHelper curatorLockHelper;

private final SystemExecutioner executioner;

@Builder
private StartupLocker(SystemExecutioner executioner,
ZooKeeperAvailabilityChecker zkAvailabilityChecker,
CuratorFrameworkHelper curatorFrameworkHelper,
CuratorLockHelper curatorLockHelper) {
this.executioner = requireNotNull(executioner, "SystemExecutioner is required");
this.zkAvailabilityChecker = Optional.ofNullable(zkAvailabilityChecker).orElse(new ZooKeeperAvailabilityChecker());
this.curatorFrameworkHelper = Optional.ofNullable(curatorFrameworkHelper).orElse(new CuratorFrameworkHelper());
this.curatorLockHelper = Optional.ofNullable(curatorLockHelper).orElse(new CuratorLockHelper());
this.executioner = Optional.ofNullable(executioner).orElseGet(SystemExecutioner::new);
this.zkAvailabilityChecker = Optional.ofNullable(zkAvailabilityChecker).orElseGet(ZooKeeperAvailabilityChecker::new);
this.curatorFrameworkHelper = Optional.ofNullable(curatorFrameworkHelper).orElseGet(CuratorFrameworkHelper::new);
this.curatorLockHelper = Optional.ofNullable(curatorLockHelper).orElseGet(CuratorLockHelper::new);
}

@AllArgsConstructor
/**
* Model object containing information on the state of the acquired lock and necessary objects needed to release the lock.
*/
@Getter
@Builder
public static class StartupLockInfo {
final CuratorFramework client;
final InterProcessLock lock;
final String lockPath;

public enum LockState {
NOT_ATTEMPTED, ACQUIRED, ACQUIRE_FAIL
}

CuratorFramework client;
InterProcessLock lock;
String lockPath;
LockState lockState;
String infoMessage;
Exception exception;
}

/**
Expand All @@ -71,52 +71,67 @@ public static class StartupLockInfo {
* @param lockTimeout the amount of time to wait for the lock to be acquired
* @param curatorConfig the Curator configuration
* @param environment the Dropwizard environment
* @return information about the acquired lock or null if lock could not be acquired
* @return information about the acquired lock or {@link Optional#empty()} if lock could not be acquired
*/
public StartupLockInfo acquireStartupLock(String lockPath, Duration lockTimeout, PortAssignment assignment, CuratorConfig curatorConfig, Environment environment) {
public StartupLockInfo acquireStartupLock(String lockPath,
Duration lockTimeout,
PortAssignment assignment,
CuratorConfig curatorConfig,
Environment environment) {

if (assignment == PortAssignment.STATIC) {
return null;
return StartupLockInfo.builder()
.lockState(StartupLockInfo.LockState.NOT_ATTEMPTED)
.infoMessage("Using static port assignment. Lock not needed.")
.build();
}

if (zkAvailabilityChecker.anyZooKeepersAvailable(curatorConfig)) {
var curatorFramework = curatorFrameworkHelper.startCuratorFramework(curatorConfig);
var lock = curatorLockHelper.createInterProcessMutex(curatorFramework, lockPath);

if (tryAcquireStartupLock(lock, lockPath, lockTimeout)) {
try {
tryAcquireStartupLock(lock, lockPath, lockTimeout);
environment.lifecycle().addLifeCycleListener(new StartupWithLockJettyLifeCycleListener(curatorFramework, lock, lockPath, executioner));
return new StartupLockInfo(curatorFramework, lock, lockPath);
} else {
return StartupLockInfo.builder()
.client(curatorFramework)
.lock(lock)
.lockPath(lockPath)
.lockState(StartupLockInfo.LockState.ACQUIRED)
.infoMessage("Lock acquired")
.build();
} catch (LockAcquisitionException e) {
LOG.warn("Lock on path [{}] not obtained. Closing Curator.", lockPath);
curatorFrameworkHelper.closeQuietly(curatorFramework);

return StartupLockInfo.builder()
.lockState(StartupLockInfo.LockState.ACQUIRE_FAIL)
.infoMessage("Failed to obtain startup lock")
.exception(e)
.build();
}
} else {
LOG.warn("No ZooKeepers are available from connect string [{}]", curatorConfig.getZkConnectString());
}

LOG.warn("Startup using dynamic ports will continue without ZooKeeper lock (which may result in port conflicts)");
return null;
return StartupLockInfo.builder()
.lockState(StartupLockInfo.LockState.NOT_ATTEMPTED)
.infoMessage(format("No ZooKeepers are available from connect string [{}]", curatorConfig.getZkConnectString()))
.build();
}

private boolean tryAcquireStartupLock(InterProcessMutex lock, String lockPath, Duration lockTimeout) {
try {
LOG.debug("Start lock acquisition for path [{}]. Timeout set to [{}]", lockPath, lockTimeout);
curatorLockHelper.acquire(lock, lockTimeout.getQuantity(), lockTimeout.getUnit());
LOG.debug("Acquired lock on path [{}]", lockPath);
return true;
} catch (LockAcquisitionException e) {
LOG.warn("Failed to obtain startup lock. Allow startup to continue and maybe we will be lucky with port assignment!", e);
return false;
}
private void tryAcquireStartupLock(InterProcessMutex lock, String lockPath, Duration lockTimeout) {
LOG.debug("Start lock acquisition for path [{}]. Timeout set to [{}]", lockPath, lockTimeout);
curatorLockHelper.acquire(lock, lockTimeout.getQuantity(), lockTimeout.getUnit());
LOG.debug("Acquired lock on path [{}]", lockPath);
}

/**
* Adds a {@link StartupJettyLifeCycleListener} in case the lock was never acquired.
*
* @param lockInfo the lock info indicating if the lock was acquired
* @param environment the Dropwizard environment used to add the listener
* @param lockInfo the lock info indicating if the lock was acquired
* @param environment the Dropwizard environment used to add the listener
*/
public void addFallbackJettyStartupLifeCycleListener(StartupLockInfo lockInfo, Environment environment) {
if (isNull(lockInfo)) {
if (lockInfo.getLockState() != StartupLockInfo.LockState.ACQUIRED) {
environment.lifecycle().addLifeCycleListener(new StartupJettyLifeCycleListener(executioner));
}
}
Expand All @@ -127,7 +142,7 @@ public void addFallbackJettyStartupLifeCycleListener(StartupLockInfo lockInfo, E
* @param lockInfo the lock info indicating if the lock was acquired
*/
public void releaseStartupLockIfPresent(StartupLockInfo lockInfo) {
if (nonNull(lockInfo)) {
if (lockInfo.getLockState() == StartupLockInfo.LockState.ACQUIRED) {
LOG.warn("Due to exception caught running app [{}], early releasing lock [{}] on path [{}]", this, lockInfo.lock, lockInfo.lockPath);
curatorLockHelper.releaseLockQuietlyIfHeld(lockInfo.lock);
curatorFrameworkHelper.closeIfStarted(lockInfo.client);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.kiwiproject.dropwizard.util.startup.listener;

import static org.kiwiproject.base.KiwiPreconditions.requireNotNull;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.component.LifeCycle;
Expand All @@ -15,7 +17,7 @@ public class StartupJettyLifeCycleListener extends AbstractLifeCycle.AbstractLif
private final SystemExecutioner executioner;

public StartupJettyLifeCycleListener(SystemExecutioner executioner) {
this.executioner = executioner;
this.executioner = requireNotNull(executioner);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.kiwiproject.dropwizard.util.startup.listener;

import static org.kiwiproject.base.KiwiPreconditions.requireNotBlank;
import static org.kiwiproject.base.KiwiPreconditions.requireNotNull;

import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
Expand All @@ -24,10 +27,10 @@ public class StartupWithLockJettyLifeCycleListener extends AbstractLifeCycle.Abs
private final SystemExecutioner executioner;

public StartupWithLockJettyLifeCycleListener(CuratorFramework curatorFramework, InterProcessLock lock, String lockPath, SystemExecutioner executioner) {
this.curatorFramework = curatorFramework;
this.lock = lock;
this.lockPath = lockPath;
this.executioner = executioner;
this.curatorFramework = requireNotNull(curatorFramework);
this.lock = requireNotNull(lock);
this.lockPath = requireNotBlank(lockPath);
this.executioner = requireNotNull(executioner);
}

@Override
Expand Down
Loading

0 comments on commit de37e32

Please sign in to comment.