Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Don't assume the zk client hasn't yet connected #15

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 47 additions & 34 deletions src/main/scala/com/boundary/ordasity/Cluster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -112,33 +112,36 @@ class Cluster(val name: String, val listener: Listener, config: ClusterConfig)

val connectionWatcher = new Watcher {
def process(event: WatchedEvent) {
event.getState match {
case KeeperState.SyncConnected => {
log.info("ZooKeeper session established.")
connected.set(true)
try {
if (state.get() != NodeState.Shutdown)
onConnect()
else
log.info("This node is shut down. ZK connection re-established, but not relaunching.")
} catch {
case e:Exception =>
log.error(e, "Exception during zookeeper connection established callback")
// Synchronize since we are triggered by both the zk event thread and the user thread
synchronized {
event.getState match {
case KeeperState.SyncConnected => {
log.info("ZooKeeper session established.")
connected.set(true)
try {
if (state.get() != NodeState.Shutdown)
onConnect()
else
log.info("This node is shut down. ZK connection re-established, but not relaunching.")
} catch {
case e:Exception =>
log.error(e, "Exception during zookeeper connection established callback")
}
}
case KeeperState.Expired =>
log.info("ZooKeeper session expired.")
connected.set(false)
forceShutdown()
awaitReconnect()
case KeeperState.Disconnected =>
log.info("ZooKeeper session disconnected. Awaiting reconnect...")
connected.set(false)
awaitReconnect()
case x: Any =>
log.info("ZooKeeper session interrupted. Shutting down due to %s", x)
connected.set(false)
awaitReconnect()
}
case KeeperState.Expired =>
log.info("ZooKeeper session expired.")
connected.set(false)
forceShutdown()
awaitReconnect()
case KeeperState.Disconnected =>
log.info("ZooKeeper session disconnected. Awaiting reconnect...")
connected.set(false)
awaitReconnect()
case x: Any =>
log.info("ZooKeeper session interrupted. Shutting down due to %s", x)
connected.set(false)
awaitReconnect()
}
}

Expand All @@ -164,19 +167,29 @@ class Cluster(val name: String, val listener: Listener, config: ClusterConfig)
*/
def connect(injectedClient: Option[ZooKeeperClient] = None) {
if (!initialized.get) {
val hosts = config.hosts.split(",").map { server =>
val host = server.split(":")(0)
val port = Integer.parseInt(server.split(":")(1))
new InetSocketAddress(host, port)
}.toList

log.info("Connecting to hosts: %s", hosts.toString)
zk = injectedClient.getOrElse(
new ZooKeeperClient(Amount.of(config.zkTimeout, Time.MILLISECONDS), hosts))
zk = injectedClient match {
case Some(zk) =>
log.info("Using user-supplied ZooKeeper")
zk
case None =>
log.info("Using my own ZooKeeper (hosts: %s)", config.hosts)
val hosts = config.hosts.split(",").map { server =>
val host = server.split(":")(0)
val port = Integer.parseInt(server.split(":")(1))
new InetSocketAddress(host, port)
}.toList
new ZooKeeperClient(Amount.of(config.zkTimeout, Time.MILLISECONDS), hosts)
}

log.info("Ensuring ZooKeeper connection is live")
zk.get()
log.info("Simulating newly established session to kickoff cluster startup")
connectionWatcher.process(new WatchedEvent(null, KeeperState.SyncConnected, null))
log.info("Registering connection watcher.")
zk.register(connectionWatcher)
}

log.info("Ensuring ZooKeeper connection is live (still)")
zk.get()
}

Expand Down
222 changes: 141 additions & 81 deletions src/test/scala/com/boundary/ordasity/ClusterSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import com.codahale.jerkson.Json
import org.apache.zookeeper.data.Stat
import org.apache.zookeeper.ZooDefs.Ids
import com.twitter.common.zookeeper.{ZooKeeperMap, ZooKeeperClient}
import org.apache.zookeeper.{Watcher, CreateMode, ZooKeeper}
import org.apache.zookeeper.{Watcher, CreateMode, ZooKeeper, WatchedEvent}
import org.apache.zookeeper.Watcher.Event.KeeperState

class ClusterSpec extends Spec with Logging {
Logging.configure()
Expand Down Expand Up @@ -272,13 +273,13 @@ class ClusterSpec extends Spec with Logging {

cluster.registerWatchers()

cluster.nodes.isInstanceOf[ZooKeeperMap[String]].must(be(true))
cluster.allWorkUnits.isInstanceOf[ZooKeeperMap[String]].must(be(true))
cluster.workUnitMap.isInstanceOf[ZooKeeperMap[String]].must(be(true))
cluster.nodes.isInstanceOf[ZooKeeperMap[_]].must(be(true))
cluster.allWorkUnits.isInstanceOf[ZooKeeperMap[_]].must(be(true))
cluster.workUnitMap.isInstanceOf[ZooKeeperMap[_]].must(be(true))

// Not using soft handoff (TODO: assert ZKMap w/soft handoff on)
cluster.handoffRequests.isInstanceOf[HashMap[String, String]].must(be(true))
cluster.handoffResults.isInstanceOf[HashMap[String, String]].must(be(true))
cluster.handoffRequests.isInstanceOf[HashMap[_, _]].must(be(true))
cluster.handoffResults.isInstanceOf[HashMap[_, _]].must(be(true))

// TODO: Test loadMap isinstanceof zkmap with smart balancing on.
}
Expand Down Expand Up @@ -390,107 +391,166 @@ class ClusterSpec extends Spec with Logging {
verify.one(mockClusterListener).onJoin(any)
verify.one(policy).onConnect()
cluster.state.get().must(be(NodeState.Started))
cluster.watchesRegistered.set(true)
}

@Test def `connect` {
val (mockZK, mockZKClient) = getMockZK()
val policy = mock[BalancingPolicy]
cluster.balancingPolicy = policy

// Pretend that we get a SyncConnected event during our register call (synchronously)
mockZKClient.register(any).answersWith { _.getArguments match {
case Array(watcher: Watcher) => watcher.process(new WatchedEvent(null, KeeperState.SyncConnected, null))
}}

// Pretend that the paths exist for the ZooKeeperMaps we're creating
mockZK.exists(any[String], any[Watcher]).returns(mock[Stat])

cluster.connect(Some(mockZKClient))

// Apply same verifications as onConnect, as all of these should be called.
//verify.one(mockClusterListener).onJoin(any)
//verify.one(policy).onConnect()
//cluster.state.get().must(be(NodeState.Started))
//cluster.watchesRegistered.set(true)
verify.one(mockClusterListener).onJoin(any)
verify.one(policy).onConnect()
cluster.state.get().must(be(NodeState.Started))
}
}

@Test def `join` {
val (mockZK, mockZKClient) = getMockZK()
cluster.zk = mockZKClient

val policy = mock[BalancingPolicy]
cluster.balancingPolicy = policy

// Should no-op if draining.
cluster.setState(NodeState.Draining)
cluster.join().must(be(NodeState.Draining.toString))
verify.exactly(0)(mockZKClient).get()

// Should no-op if started.
cluster.setState(NodeState.Started)
cluster.join().must(be(NodeState.Started.toString))
verify.exactly(0)(mockZKClient).get()

// Pretend that the paths exist for the ZooKeeperMaps we're creating
mockZK.exists(any[String], any[Watcher]).returns(mock[Stat])

cluster.setState(NodeState.Fresh)
cluster.join().must(be(NodeState.Started.toString))

// Apply same verifications as connect, as all of these should be called.
verify.one(mockClusterListener).onJoin(any)
verify.one(policy).onConnect()
cluster.state.get().must(be(NodeState.Started))
cluster.watchesRegistered.set(true)
}


@Test def `join after shutdown` {
val (mockZK, mockZKClient) = getMockZK()
cluster.zk = mockZKClient
class `join` {
val (mockZK, mockZKClient) = getMockZK()
cluster.zk = mockZKClient

val policy = mock[BalancingPolicy]
cluster.balancingPolicy = policy
val policy = mock[BalancingPolicy]
cluster.balancingPolicy = policy

// Pretend that the paths exist for the ZooKeeperMaps we're creating
mockZK.exists(any[String], any[Watcher]).returns(mock[Stat])
// Pretend that the paths exist for any ZooKeeperMaps we might create
mockZK.exists(any[String], any[Watcher]).returns(mock[Stat])

cluster.setState(NodeState.Shutdown)
cluster.join().must(be(NodeState.Started.toString))
class `with not-yet-connected zk client` {

// Pretend that we get a SyncConnected event during our register call (synchronously)
mockZKClient.register(any).answersWith { _.getArguments match {
case Array(watcher: Watcher) => watcher.process(new WatchedEvent(null, KeeperState.SyncConnected, null))
}}

@Test def `when fresh` {
cluster.setState(NodeState.Fresh)
cluster.join(Some(mockZKClient)).must(be(NodeState.Started.toString))

// Apply same verifications as connect, as all of these should be called.
verify.one(mockClusterListener).onJoin(any)
verify.one(policy).onConnect()
cluster.state.get().must(be(NodeState.Started))
}

@Test def `after shutdown` {
cluster.setState(NodeState.Shutdown)
cluster.join(Some(mockZKClient)).must(be(NodeState.Shutdown.toString))

// Should no-op if shutdown.
verify.no(mockClusterListener).onJoin(any)
verify.no(policy).onConnect()
cluster.state.get().must(be(NodeState.Shutdown))
}

@Test def `when draining` {
cluster.setState(NodeState.Draining)
cluster.join(Some(mockZKClient)).must(be(NodeState.Draining.toString))

// Should no-op if draining.
verify.no(mockClusterListener).onJoin(any)
verify.no(policy).onConnect()
cluster.state.get().must(be(NodeState.Draining))
cluster.watchesRegistered.set(false)
}

@Test def `after started` {
cluster.setState(NodeState.Started)
cluster.join(Some(mockZKClient)).must(be(NodeState.Started.toString))

// Should no-op if already started.
verify.no(mockClusterListener).onJoin(any)
verify.no(policy).onConnect()
cluster.state.get().must(be(NodeState.Started))
cluster.watchesRegistered.set(false)
}
}

// Apply same verifications as connect, as all of these should be called.
verify.one(mockClusterListener).onJoin(any)
verify.one(policy).onConnect()
cluster.state.get().must(be(NodeState.Started))
cluster.watchesRegistered.set(true)
}
class `with already connected zk client` {

// We don't get a SyncConnected event during our register call, since the zk client is already connected

@Test def `when fresh` {
cluster.setState(NodeState.Fresh)
cluster.join(Some(mockZKClient)).must(be(NodeState.Started.toString))

// Apply same verifications as connect, as all of these should be called.
verify.one(mockClusterListener).onJoin(any)
verify.one(policy).onConnect()
cluster.state.get().must(be(NodeState.Started))
}

@Test def `after shutdown` {
cluster.setState(NodeState.Shutdown)
cluster.join(Some(mockZKClient)).must(be(NodeState.Shutdown.toString))

// Should no-op if shutdown.
verify.no(mockClusterListener).onJoin(any)
verify.no(policy).onConnect()
cluster.state.get().must(be(NodeState.Shutdown))
}

@Test def `when draining` {
cluster.setState(NodeState.Draining)
cluster.join(Some(mockZKClient)).must(be(NodeState.Draining.toString))

// Should no-op if draining.
verify.no(mockClusterListener).onJoin(any)
verify.no(policy).onConnect()
cluster.state.get().must(be(NodeState.Draining))
cluster.watchesRegistered.set(false)
}

@Test def `after started` {
cluster.setState(NodeState.Started)
cluster.join(Some(mockZKClient)).must(be(NodeState.Started.toString))

// Should no-op if already started.
verify.no(mockClusterListener).onJoin(any)
verify.no(policy).onConnect()
cluster.state.get().must(be(NodeState.Started))
cluster.watchesRegistered.set(false)
}
}
}

@Test def `cluster constructor` {
val cluster = new Cluster("foo", mockClusterListener, config)
cluster.name.must(be("foo"))
cluster.listener.must(be(mockClusterListener))
}

@Test def `cluster constructor` {
val cluster = new Cluster("foo", mockClusterListener, config)
cluster.name.must(be("foo"))
cluster.listener.must(be(mockClusterListener))
}

@Test def `getOrElse String` {
val foo = new HashMap[String, String]
foo.put("foo", "bar")

@Test def `getOrElse String` {
val foo = new HashMap[String, String]
foo.put("foo", "bar")
cluster.getOrElse(foo, "foo", "taco").must(be("bar"))
cluster.getOrElse(foo, "bar", "taco").must(be("taco"))
}

cluster.getOrElse(foo, "foo", "taco").must(be("bar"))
cluster.getOrElse(foo, "bar", "taco").must(be("taco"))
}
@Test def `getOrElse Double` {
val foo = new HashMap[String, Double]
foo.put("foo", 0.01d)
cluster.getOrElse(foo, "foo", 0.02d).must(be(0.01d))
cluster.getOrElse(foo, "bar", 0.02d).must(be(0.02d))
}

@Test def `getOrElse Double` {
val foo = new HashMap[String, Double]
foo.put("foo", 0.01d)
cluster.getOrElse(foo, "foo", 0.02d).must(be(0.01d))
cluster.getOrElse(foo, "bar", 0.02d).must(be(0.02d))
}
def getMockZK() : (ZooKeeper, ZooKeeperClient) = {
val mockZK = mock[ZooKeeper]
val mockZKClient = mock[ZooKeeperClient]
mockZKClient.get().returns(mockZK)
(mockZK, mockZKClient)
}

def getMockZK() : (ZooKeeper, ZooKeeperClient) = {
val mockZK = mock[ZooKeeper]
val mockZKClient = mock[ZooKeeperClient]
mockZKClient.get().returns(mockZK)
(mockZK, mockZKClient)
}

}