Skip to content

Commit

Permalink
Merge pull request coreos#1496 from endocode/antrik/fix-shutdown-rebased
Browse files Browse the repository at this point in the history
server: fix panic on graceful shutdown
  • Loading branch information
jonboulle committed Mar 10, 2016
2 parents 907ba12 + 40691b5 commit 83c4099
Show file tree
Hide file tree
Showing 15 changed files with 210 additions and 56 deletions.
4 changes: 2 additions & 2 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ func (a *Agent) MarshalJSON() ([]byte, error) {

// Heartbeat updates the Registry periodically with an acknowledgement of the
// Jobs this Agent is expected to be running.
func (a *Agent) Heartbeat(stop chan bool) {
func (a *Agent) Heartbeat(stop <-chan struct{}) {
a.heartbeatJobs(a.ttl, stop)
}

func (a *Agent) heartbeatJobs(ttl time.Duration, stop chan bool) {
func (a *Agent) heartbeatJobs(ttl time.Duration, stop <-chan struct{}) {
heartbeat := func() {
machID := a.Machine.State().ID
launched := a.cache.launchedJobs()
Expand Down
2 changes: 1 addition & 1 deletion agent/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type AgentReconciler struct {
// Run periodically attempts to reconcile the provided Agent until the stop
// channel is closed. Run will also reconcile in reaction to events on the
// AgentReconciler's rStream.
func (ar *AgentReconciler) Run(a *Agent, stop chan bool) {
func (ar *AgentReconciler) Run(a *Agent, stop <-chan struct{}) {
reconcile := func() {
start := time.Now()
ar.Reconcile(a)
Expand Down
2 changes: 1 addition & 1 deletion agent/unit_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type UnitStatePublisher struct {
// Run caches all of the heartbeat objects from the provided channel, publishing
// them to the Registry every 5s. Heartbeat objects are also published as they
// are received on the channel.
func (p *UnitStatePublisher) Run(beatchan <-chan *unit.UnitStateHeartbeat, stop chan bool) {
func (p *UnitStatePublisher) Run(beatchan <-chan *unit.UnitStateHeartbeat, stop <-chan struct{}) {
go func() {
for {
select {
Expand Down
6 changes: 3 additions & 3 deletions agent/unit_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func TestUnitStatePublisherRunTiming(t *testing.T) {
}

bc := make(chan *unit.UnitStateHeartbeat)
sc := make(chan bool)
sc := make(chan struct{})
go func() {
usp.Run(bc, sc)
}()
Expand Down Expand Up @@ -466,7 +466,7 @@ func TestUnitStatePublisherRunQueuing(t *testing.T) {
clock: clockwork.NewFakeClock(),
}
bc := make(chan *unit.UnitStateHeartbeat)
sc := make(chan bool)
sc := make(chan struct{})
go func() {
usp.Run(bc, sc)
}()
Expand Down Expand Up @@ -599,7 +599,7 @@ func TestUnitStatePublisherRunWithDelays(t *testing.T) {
}

bc := make(chan *unit.UnitStateHeartbeat)
sc := make(chan bool)
sc := make(chan struct{})

wgs.Add(numPublishers)
wgf.Add(numPublishers)
Expand Down
2 changes: 1 addition & 1 deletion api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (s *Server) Serve() {
// Available switches the Server's HTTP handler from a generic 503 Unavailable
// response to the actual API. Once the provided channel is closed, the API is
// torn back down and 503 responses are served.
func (s *Server) Available(stop chan bool) {
func (s *Server) Available(stop <-chan struct{}) {
s.cur = s.api
<-stop
s.cur = unavailable
Expand Down
2 changes: 1 addition & 1 deletion engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func New(reg *registry.EtcdRegistry, lManager lease.Manager, rStream pkg.EventSt
}
}

func (e *Engine) Run(ival time.Duration, stop chan bool) {
func (e *Engine) Run(ival time.Duration, stop <-chan struct{}) {
leaseTTL := ival * 5
machID := e.machine.State().ID

Expand Down
17 changes: 15 additions & 2 deletions fleetd/fleetd.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"os"
"os/signal"
"sync"
"syscall"

"github.com/coreos/fleet/Godeps/_workspace/src/github.com/rakyll/globalconf"
Expand Down Expand Up @@ -104,16 +105,21 @@ func main() {
}
srv.Run()

srvMutex := sync.Mutex{}

reconfigure := func() {
log.Infof("Reloading configuration from %s", *cfgPath)

srvMutex.Lock()
defer srvMutex.Unlock()

cfg, err := getConfig(cfgset, *cfgPath)
if err != nil {
log.Fatalf(err.Error())
}

log.Infof("Restarting server components")
srv.Stop()
srv.Kill()

srv, err = server.New(*cfg)
if err != nil {
Expand All @@ -124,14 +130,21 @@ func main() {

shutdown := func() {
log.Infof("Gracefully shutting down")
srv.Stop()

srvMutex.Lock()
defer srvMutex.Unlock()

srv.Kill()
srv.Purge()
os.Exit(0)
}

writeState := func() {
log.Infof("Dumping server state")

srvMutex.Lock()
defer srvMutex.Unlock()

encoded, err := json.Marshal(srv)
if err != nil {
log.Errorf("Failed to dump server state: %v", err)
Expand Down
102 changes: 102 additions & 0 deletions functional/shutdown_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2016 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package functional

import (
"strings"
"testing"

"github.com/coreos/fleet/functional/platform"
)

// Check clean shutdown of fleetd under normal circumstances
func TestShutdown(t *testing.T) {
cluster, err := platform.NewNspawnCluster("smoke")
if err != nil {
t.Fatal(err)
}
defer cluster.Destroy()

m0, err := cluster.CreateMember()
if err != nil {
t.Fatal(err)
}
_, err = cluster.WaitForNMachines(m0, 1)
if err != nil {
t.Fatal(err)
}

// Stop the fleet process.
if _, err = cluster.MemberCommand(m0, "sudo", "systemctl", "stop", "fleet"); err != nil {
t.Fatal(err)
}

// Check expected state after stop.
stdout, _ := cluster.MemberCommand(m0, "sudo", "systemctl", "show", "--property=ActiveState", "fleet")
if strings.TrimSpace(stdout) != "ActiveState=inactive" {
t.Fatalf("Fleet unit not reported as inactive: %s", stdout)
}
stdout, _ = cluster.MemberCommand(m0, "sudo", "systemctl", "show", "--property=Result", "fleet")
if strings.TrimSpace(stdout) != "Result=success" {
t.Fatalf("Result for fleet unit not reported as success: %s", stdout)
}
}

// Check clean shutdown of fleetd while automatic restart (after failed health check) is in progress
func TestShutdownVsMonitor(t *testing.T) {
cluster, err := platform.NewNspawnCluster("smoke")
if err != nil {
t.Fatal(err)
}
defer cluster.Destroy()

m0, err := cluster.CreateMember()
if err != nil {
t.Fatal(err)
}
_, err = cluster.WaitForNMachines(m0, 1)
if err != nil {
t.Fatal(err)
}

// Cut connection to etcd.
//
// This will result in a failed health check, and consequently the monitor will attempt a restart.
if _, err = cluster.MemberCommand(m0, "sudo", "iptables", "-I", "OUTPUT", "-p", "tcp", "-m", "multiport", "--dports=2379,4001", "-j", "DROP"); err != nil {
t.Fatal(err)
}

// Wait for the monitor to trigger the restart.
//
// This will never complete, as long as there is no connectivity.
if _, err = cluster.MemberCommand(m0, "sudo", "sh", "-c", `'until journalctl -u fleet | grep -q "Server monitor triggered: Monitor timed out before successful heartbeat"; do sleep 1; done'`); err != nil {
t.Fatal(err)
}

// Stop fleetd while the restart is still in progress.
if _, err = cluster.MemberCommand(m0, "sudo", "systemctl", "stop", "fleet"); err != nil {
t.Fatal(err)
}

// Verify that fleetd was shut down cleanly in spite of the concurrent restart.
stdout, _ := cluster.MemberCommand(m0, "sudo", "systemctl", "show", "--property=ActiveState", "fleet")
if strings.TrimSpace(stdout) != "ActiveState=inactive" {
t.Fatalf("Fleet unit not reported as inactive: %s", stdout)
}
stdout, _ = cluster.MemberCommand(m0, "sudo", "systemctl", "show", "--property=Result", "fleet")
if strings.TrimSpace(stdout) != "Result=success" {
t.Fatalf("Result for fleet unit not reported as success: %s", stdout)
}
}
1 change: 1 addition & 0 deletions functional/util/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ write_files:
etcd_servers=[{{printf "%q" .EtcdEndpoint}}]
etcd_key_prefix={{.EtcdKeyPrefix}}
public_ip={{.IP}}
agent_ttl=3s
ssh_authorized_keys:
- {{printf "%q" .PublicKey}}
Expand Down
2 changes: 1 addition & 1 deletion machine/coreos.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (m *CoreOSMachine) Refresh() {

// PeriodicRefresh updates the current state of the CoreOSMachine at the
// interval indicated. Operation ceases when the provided channel is closed.
func (m *CoreOSMachine) PeriodicRefresh(interval time.Duration, stop chan bool) {
func (m *CoreOSMachine) PeriodicRefresh(interval time.Duration, stop <-chan struct{}) {
ticker := time.NewTicker(interval)
for {
select {
Expand Down
4 changes: 2 additions & 2 deletions pkg/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type EventStream interface {
}

type PeriodicReconciler interface {
Run(stop chan bool)
Run(stop <-chan struct{})
}

// NewPeriodicReconciler creates a PeriodicReconciler that will run recFunc at least every
Expand All @@ -53,7 +53,7 @@ type reconciler struct {
clock clockwork.Clock
}

func (r *reconciler) Run(stop chan bool) {
func (r *reconciler) Run(stop <-chan struct{}) {
trigger := make(chan struct{})
go func() {
abort := make(chan struct{})
Expand Down
6 changes: 3 additions & 3 deletions pkg/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ func TestPeriodicReconcilerRun(t *testing.T) {
clock: fclock,
}
// launch the PeriodicReconciler in the background
prDone := make(chan bool)
stop := make(chan bool)
prDone := make(chan struct{})
stop := make(chan struct{})
go func() {
pr.Run(stop)
prDone <- true
close(prDone)
}()
// reconcile should have occurred once at start-up
select {
Expand Down
26 changes: 14 additions & 12 deletions heart/monitor.go → server/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package heart
package server

import (
"errors"
"time"

"github.com/coreos/fleet/heart"
"github.com/coreos/fleet/log"
)

Expand All @@ -30,29 +31,30 @@ type Monitor struct {
ival time.Duration
}

// Monitor ensures a Heart is still beating until a channel is closed, returning
// an error if the heartbeats fail.
func (m *Monitor) Monitor(hrt Heart, stop chan bool) error {
// Monitor periodically checks the given Heart to make sure it
// beats successfully. If the heartbeat check fails for any
// reason, an error is returned. If the supplied channel is
// closed, Monitor returns true and a nil error.
func (m *Monitor) Monitor(hrt heart.Heart, sdc <-chan struct{}) (bool, error) {
ticker := time.Tick(m.ival)
for {
select {
case <-stop:
log.Debug("Monitor exiting due to stop signal")
return nil
case <-sdc:
return true, nil
case <-ticker:
if _, err := m.check(hrt); err != nil {
return err
if _, err := check(hrt, m.TTL); err != nil {
return false, err
}
}
}
}

// check attempts to beat a Heart several times within a timeout, returning the
// log index at which the beat succeeded or an error
func (m *Monitor) check(hrt Heart) (idx uint64, err error) {
func check(hrt heart.Heart, ttl time.Duration) (idx uint64, err error) {
// time out after a third of the machine presence TTL, attempting
// the heartbeat up to four times
timeout := m.TTL / 3
timeout := ttl / 3
interval := timeout / 4

tchan := time.After(timeout)
Expand All @@ -63,7 +65,7 @@ func (m *Monitor) check(hrt Heart) (idx uint64, err error) {
err = errors.New("Monitor timed out before successful heartbeat")
return
case <-next:
idx, err = hrt.Beat(m.TTL)
idx, err = hrt.Beat(ttl)
if err != nil {
log.Debugf("Monitor heartbeat function returned err, retrying in %v: %v", interval, err)
}
Expand Down
Loading

0 comments on commit 83c4099

Please sign in to comment.