From 5676b780e75e58ebfa51e3ca354a2a61c36afb66 Mon Sep 17 00:00:00 2001 From: Tim Peoples Date: Mon, 25 Jan 2021 11:50:52 -0800 Subject: [PATCH] Start --- go.mod | 11 +++ go.sum | 14 +++ options.go | 65 +++++++++++++ runner.go | 75 +++++++++++++++ server.go | 207 +++++++++++++++++++++++++++++++++++++++++ server_test.go | 143 ++++++++++++++++++++++++++++ signals.go | 35 +++++++ test_event_test.go | 97 +++++++++++++++++++ test_logger_test.go | 26 ++++++ test_server_test.go | 85 +++++++++++++++++ test_signaller_test.go | 62 ++++++++++++ 11 files changed, 820 insertions(+) create mode 100644 go.mod create mode 100644 go.sum create mode 100644 options.go create mode 100644 runner.go create mode 100644 server.go create mode 100644 server_test.go create mode 100644 signals.go create mode 100644 test_event_test.go create mode 100644 test_logger_test.go create mode 100644 test_server_test.go create mode 100644 test_signaller_test.go diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..9299606 --- /dev/null +++ b/go.mod @@ -0,0 +1,11 @@ +module toolman.org/net/lameduck + +go 1.14 + +require ( + github.com/kr/pretty v0.2.1 + github.com/spf13/pflag v1.0.3 + golang.org/x/sync v0.0.0-20201207232520-09787c993a3a + golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c + toolman.org/base/log/v2 v2.1.0 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..4c1b3c9 --- /dev/null +++ b/go.sum @@ -0,0 +1,14 @@ +github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= +github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs= +golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190203050204-7ae0202eb74c/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c h1:VwygUrnw9jn88c4u8GD3rZQbqrP/tgas88tPUbBxQrk= +golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +toolman.org/base/log/v2 v2.1.0 h1://Gx1ca5ri88Z7wTuip9Ja7GPlnvp7BlL7C1VRsmTfA= +toolman.org/base/log/v2 v2.1.0/go.mod h1:S/IHsuY72A9srk+mMzp+QcV4suGhkjlOAeHj5ADURFc= diff --git a/options.go b/options.go new file mode 100644 index 0000000..d4501d5 --- /dev/null +++ b/options.go @@ -0,0 +1,65 @@ +package lameduck + +import ( + "os" + "time" +) + +// Option is the interface implemented by types that offer optional behavior +// while running a Server with lame-duck support. +type Option interface { + set(*runner) +} + +// Period returns an Option that alters the lame-duck period to the given +// Duration. +func Period(p time.Duration) Option { + return period(p) +} + +type period time.Duration + +func (p period) set(r *runner) { + r.period = time.Duration(p) +} + +// Signals returns an Options that changes the list of Signals that trigger the +// beginning of lame-duck mode. Using this Option fully replaces the previous +// list of triggering signals. +func Signals(s ...os.Signal) Option { + return signals(s) +} + +type signals []os.Signal + +func (s signals) set(r *runner) { + r.signals = ([]os.Signal)(s) +} + +// Logger is the interface needed for the WithLogger Option. +type Logger interface { + Infof(string, ...interface{}) +} + +type loggerOption struct { + logger Logger +} + +// WithLogger returns an Option that alters this package's logging facility +// to the provided Logger. Note, the default Logger is one derived from +// 'github.com/golang/glog'. To prevent all logging, use WithoutLogger. +func WithLogger(l Logger) Option { + return &loggerOption{l} +} + +// WithoutLogger returns an option the disables all logging from this package. +func WithoutLogger() Option { + return &loggerOption{} +} + +func (o *loggerOption) set(r *runner) { + if r.logf = o.logger.Infof; r.logf == nil { + // a "silent" logger + r.logf = func(string, ...interface{}) {} + } +} diff --git a/runner.go b/runner.go new file mode 100644 index 0000000..f47e18f --- /dev/null +++ b/runner.go @@ -0,0 +1,75 @@ +package lameduck + +import ( + "errors" + "os" + "sync" + "time" + + "golang.org/x/sys/unix" + "toolman.org/base/log/v2" +) + +var ( + defaultPeriod = 3 * time.Second + defaultSignals = []os.Signal{unix.SIGINT, unix.SIGTERM} +) + +type runner struct { + server Server + period time.Duration + signals []os.Signal + logf func(string, ...interface{}) + done chan struct{} + + once sync.Once +} + +func newRunner(svr Server, options []Option) (*runner, error) { + if svr == nil { + return nil, errors.New("nil Server") + } + + r := &runner{ + server: svr, + period: defaultPeriod, + signals: defaultSignals, + logf: log.Infof, + done: make(chan struct{}), + } + + for _, o := range options { + o.set(r) + } + + if r.period <= 0 { + return nil, errors.New("lame-duck period must be greater than zero") + } + + if len(r.signals) == 0 { + return nil, errors.New("no lame-duck signals defined") + } + + return r, nil +} + +func (r *runner) close() { + if r == nil || r.done == nil { + if r != nil { + r.logf("r.done is nil !!!") + } + return + } + + var closed bool + + r.once.Do(func() { + close(r.done) + r.logf("runner closed") + closed = true + }) + + if !closed { + r.logf("runner *NOT* closed") + } +} diff --git a/server.go b/server.go new file mode 100644 index 0000000..424de6e --- /dev/null +++ b/server.go @@ -0,0 +1,207 @@ +// Package lameduck provides coordinated lame-duck behavior for any service +// implementing this package's Server interface. +// +// By default, lame-duck mode is triggered by receipt of SIGINT or SIGTERM +// and the default lame-duck period is 3 seconds. Options are provided to +// alter these (an other) values. +// +// This package is written assuming behavior similar to the standard library's +// http.Server -- in that its Shutdown and Close methods exhibit behavior +// matching the lameduck.Server interface -- however, in order to allow other +// types to be used, a Serve method that returns nil is also needed. +// +// +// type LameDuckServer struct { +// // This embedded http.Server provides Shutdown and Close methods +// // with behavior expected by the lameduck.Server interface. +// *http.Server +// } +// +// // Serve executes ListenAndServe in a manner compliant with the +// // lameduck.Server interface. +// func (s *LameDuckServer) Serve(context.Contxt) error { +// err := s.Server.ListenAndServe() +// +// if err == http.ErrServerClosed { +// err = nil +// } +// +// return err +// } +// +// // Run will run the receiver's embedded http.Server and provide +// // lame-duck coverage on receipt of SIGTERM or SIGINT. +// func (s *LameDuckServer) Run(ctx context.Context) error { +// return lameduck.Run(ctx, s) +// } +package lameduck + +import ( + "context" + "strings" + + "golang.org/x/sync/errgroup" +) + +// Server defines the interface that should be implemented by types intended +// for lame-duck support. It is expected that these methods exhibit behavior +// similar to http.Server -- in that a call to Shutdown or Close should cause +// Serve to return immediately. +// +// However, unlike http.Server's Serve, ListenAndServe, and ListenAndServeTLS +// methods (which return http.ErrServerClosed in this situation), this Serve +// method should return a nil error when lame-duck mode is desired. +// +type Server interface { + // Serve executes the Server. If Serve returns an error, that error will be + // returned immediately by Run and no lame-duck coverage will be provided. + // + Serve(context.Context) error + + // Shutdown is called by Run (after catching one of the configured signals) + // to initiate a graceful shutdown of the Server; this marks the beginning + // of lame-duck mode. If Shutdown returns a nil error before the configured + // lame-duck period has elapsed, Run will immediately return nil as well. + // + // The Context provided to Shutdown will have a timeout set to the configured + // lame-duck Period. If Shutdown returns context.DeadlineExceeded, Run will + // return a LameDuckError with its Expired field set to true and Err set to + // the return value from calling Close. + // + // Any other error returned by Shutdown will be wrapped by a LameDuckError + // with its Expired field set to false. + Shutdown(context.Context) error + + // Close is called by Run when Shutdown returns context.DeadlineExceeded and + // its return value will be assigned to the Err field of the LameDuckError + // returned by Run. + Close() error +} + +// Run executes the given Server providing coordinated lame-duck behavior on +// reciept of one or more configurable signals. By default, the lame-duck +// period is 3s and is triggered by SIGINT or SIGTERM. Options are available +// to alter these values. +func Run(ctx context.Context, svr Server, options ...Option) error { + r, err := newRunner(svr, options) + if err != nil { + return err + } + + return r.run(ctx) +} + +func (r *runner) run(ctx context.Context) error { + eg, ctx := errgroup.WithContext(ctx) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // Goroutine #1 + // + // - Waits for one of the configured signals + // - Calls Shutdown using a Context with a deadline for the configure period + // - If deadline is exceeded, returns the result of calling Close + // - Otherwise, returns the result from the call to Shutdown + // - On return, calls r.close() + // + eg.Go(func() error { + defer r.close() + + r.logf("Waiting for signals: %v", r.signals) + + sig, err := r.waitForSignal(ctx) + if err != nil { + return err + } + + r.logf("Received signal [%s]; entering lame-duck mode for %v", sig, r.period) + + ctx, cancel2 := context.WithTimeout(ctx, r.period) + defer cancel2() + + err = r.server.Shutdown(ctx) + switch err { + case nil: + r.logf("Completed lame-duck mode") + return nil + + case context.DeadlineExceeded: + r.logf("Lame-duck period has expired") + return &LameDuckError{ + Expired: true, + Err: r.server.Close(), + } + + default: + r.logf("error shutting down server: %v", err) + cancel() + return &LameDuckError{Err: err} + } + }) + + // Goroutine #2 + // + // - Calls Serve + // - If Server returns a non-nil error, return it immediately + // - Otherwise, wait for the Context or receiver to be "done". + // + eg.Go(func() error { + r.logf("Starting server") + if err := r.server.Serve(ctx); err != nil { + r.logf("Server failed: %v", err) + return err + } + + r.logf("Stopping server") + + select { + case <-ctx.Done(): + r.logf("Context canceled wait for server shutdown") + + case <-r.done: + r.logf("Server stopped") + } + + return nil + }) + + return eg.Wait() +} + +// LameDuckError is the error type returned by Run for errors related to +// lame-duck mode. +type LameDuckError struct { + Expired bool + Err error +} + +func (lde *LameDuckError) Error() string { + if lde == nil { + return "" + } + + var msgs []string + + if lde.Expired { + msgs = append(msgs, "Lame-duck period has expired") + } + + if lde.Err != nil { + if msg := lde.Err.Error(); msg != "" { + msgs = append(msgs, msg) + } + } + + if len(msgs) == 0 { + return "" + } + + return strings.Join(msgs, " + ") +} + +func (lde *LameDuckError) Unwrap() error { + if lde == nil { + return nil + } + return lde.Err +} diff --git a/server_test.go b/server_test.go new file mode 100644 index 0000000..e442e53 --- /dev/null +++ b/server_test.go @@ -0,0 +1,143 @@ +package lameduck + +import ( + "context" + "errors" + "os" + "sync" + "testing" + "time" + + "golang.org/x/sys/unix" +) + +type testcase struct { + signal os.Signal + signalAfter time.Duration + shutdownAfter time.Duration + cancelAfter time.Duration + serveError error + shutdownError error + closeError error + runOptions []Option + + want error +} + +var ( + errServeFailed = errors.New("server failed to start") + errShutdownFailed = errors.New("server failed to shutdown") +) + +func TestRun(t *testing.T) { + // Test cases are generated from a series of ordered events that should + // result in a specific outcome (as declared by 'wantError'). If no error + // is declared, a nil error is expected. + cases := map[string]*testcase{ + "normal": mkCase(sendSignal(unix.SIGTERM), shutdownReturn(nil), lameDuckExpires), + "expired": mkCase(sendSignal(unix.SIGTERM), lameDuckExpires, shutdownReturn(nil), wantError(&LameDuckError{Expired: true})), + "canceled": mkCase(cancelContext, wantError(context.Canceled)), + "nostart": mkCase(serveReturn(errServeFailed), wantError(errServeFailed)), + "badstop": mkCase(sendSignal(unix.SIGTERM), shutdownReturn(errShutdownFailed), lameDuckExpires, wantError(&LameDuckError{Err: errShutdownFailed})), + } + + for label, tc := range cases { + t.Run(label, tc.test) + } +} + +func (tc *testcase) test(t *testing.T) { + if tc.cancelAfter == 0 && tc.signalAfter == 0 && tc.serveError == nil { + t.Fatal("Invalid testcase: must set one of 'cancelAfter', 'signalAfter', or 'serveError'") + } + + ts := injectSignaller() + defer ts.revert() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tl := &testLogger{t.Logf} + + tc.runOptions = append(tc.runOptions, WithLogger(tl)) + + svr := newTestServer(tl, tc.serveError, tc.shutdownError, tc.closeError) + + errs := make(chan error) + + var wg sync.WaitGroup + + wg.Add(1) + + go func() { + defer wg.Done() + defer close(errs) + + if err := Run(ctx, svr, tc.runOptions...); err != nil { + tl.Infof("Run error: %v", err) + errs <- err + } else { + tl.Infof("Server Run Successful") + } + }() + + if tc.cancelAfter != 0 { + wg.Add(1) + tl.Infof("Will cancel context after %v", tc.cancelAfter) + time.AfterFunc(tc.cancelAfter, func() { + defer wg.Done() + tl.Infof("Cancelling Context") + cancel() + }) + } + + if tc.signal != nil && tc.signalAfter != 0 { + wg.Add(1) + tl.Infof("Will emit signal %q after %v", tc.signal, tc.signalAfter) + time.AfterFunc(tc.signalAfter, func() { + defer wg.Done() + tl.Infof("Emitting signal: %v", tc.signal) + ts.emit(tc.signal) + }) + } + + if tc.shutdownAfter != 0 { + wg.Add(1) + tl.Infof("Will finish Shutdown after %v", tc.shutdownAfter) + time.AfterFunc(tc.shutdownAfter, func() { + defer wg.Done() + tl.Infof("Finishing server Shutdown method") + svr.shutdown.finish() + }) + } + + if got := <-errs; !tc.isWanted(got) { + t.Errorf("Run(ctx, svr) == (%v); wanted (%v)", got, tc.want) + } + + wg.Wait() +} + +func (tc *testcase) isWanted(got error) bool { + if lde, ok := tc.want.(*LameDuckError); ok { + return lde.isEqual(got) + } + + return got == tc.want +} + +// A convenience method added to type LameDuckError (only during testing). +func (lde *LameDuckError) isEqual(err error) bool { + switch { + case lde == nil && err == nil: + return true + case lde == nil || err == nil: + return false + } + + if olde, ok := err.(*LameDuckError); ok { + return lde.Expired == olde.Expired && lde.Err == olde.Err + } + + return false +} diff --git a/signals.go b/signals.go new file mode 100644 index 0000000..370cb7a --- /dev/null +++ b/signals.go @@ -0,0 +1,35 @@ +package lameduck + +import ( + "context" + "os" + "os/signal" +) + +type osSignals struct{} + +func (*osSignals) notify(c chan<- os.Signal, sig ...os.Signal) { signal.Notify(c, sig...) } +func (*osSignals) stop(c chan<- os.Signal) { signal.Stop(c) } + +var sig signaler = new(osSignals) + +type signaler interface { + notify(chan<- os.Signal, ...os.Signal) + stop(chan<- os.Signal) +} + +func (r *runner) waitForSignal(ctx context.Context) (os.Signal, error) { + ch := make(chan os.Signal, 1) + defer close(ch) + + sig.notify(ch, r.signals...) + defer sig.stop(ch) + + select { + case <-ctx.Done(): + return nil, ctx.Err() + + case sig := <-ch: + return sig, nil + } +} diff --git a/test_event_test.go b/test_event_test.go new file mode 100644 index 0000000..5b3797f --- /dev/null +++ b/test_event_test.go @@ -0,0 +1,97 @@ +package lameduck + +import ( + "os" + "time" +) + +// NOTE: This file contains no tests of its own. +// +// Instead it provides the definitions for testEvent and evtType. These +// are used by the mkCase function to generate test cases from a series +// of ordered events. +// + +type evtType int + +const ( + evtNone evtType = iota + evtCancelContext + evtCloseReturn + evtLameDuckExpires + evtSendSignal + evtServeReturn + evtShutdownReturn + evtWantError +) + +type testEvent struct { + etype evtType + signal os.Signal + err error +} + +var ( + lameDuckExpires = testEvent{etype: evtLameDuckExpires} + cancelContext = testEvent{etype: evtCancelContext} +) + +func closeReturn(err error) testEvent { return testEvent{etype: evtCloseReturn, err: err} } +func sendSignal(sig os.Signal) testEvent { return testEvent{etype: evtSendSignal, signal: sig} } +func serveReturn(err error) testEvent { return testEvent{etype: evtServeReturn, err: err} } +func shutdownReturn(err error) testEvent { return testEvent{etype: evtShutdownReturn, err: err} } +func wantError(err error) testEvent { return testEvent{etype: evtWantError, err: err} } + +// mkCase converts a series of ordered testEvents into a testcase. +func mkCase(evts ...testEvent) *testcase { + tc := new(testcase) + + var ld time.Duration + + gap := 10 * time.Millisecond + after := gap + + for _, e := range evts { + switch e.etype { + case evtCancelContext: + tc.cancelAfter = after + after += gap + + case evtCloseReturn: + tc.closeError = e.err + + case evtLameDuckExpires: + ld = after + after += gap + + case evtSendSignal: + tc.signal = e.signal + tc.signalAfter = after + after += gap + + case evtServeReturn: + tc.serveError = e.err + + case evtShutdownReturn: + tc.shutdownError = e.err + tc.shutdownAfter = after + after += gap + if ld != 0 { + after += gap / 2 + } + + case evtWantError: + tc.want = e.err + } + } + + if ld < tc.signalAfter { + panic("cannot expire lame-duck before sending signal") + } + + if ld > 0 { + tc.runOptions = append(tc.runOptions, Period(ld-tc.signalAfter)) + } + + return tc +} diff --git a/test_logger_test.go b/test_logger_test.go new file mode 100644 index 0000000..aa2b72d --- /dev/null +++ b/test_logger_test.go @@ -0,0 +1,26 @@ +package lameduck + +import ( + "time" + + "github.com/spf13/pflag" +) + +// NOTE: This file contains no tests of its own. +// +// Instead, it provides the definition for type testLogger -- used to +// inject T.Logf (from package testing) as a lameduck.Logger. + +func init() { + // To stifle warnings from logger. + pflag.Parse() +} + +type testLogger struct { + logf func(string, ...interface{}) +} + +func (tl *testLogger) Infof(msg string, args ...interface{}) { + ts := time.Now().Format("[15:04:05.000000] ") + tl.logf(ts+msg, args...) +} diff --git a/test_server_test.go b/test_server_test.go new file mode 100644 index 0000000..6e775a6 --- /dev/null +++ b/test_server_test.go @@ -0,0 +1,85 @@ +package lameduck + +import ( + "context" + "sync" +) + +// NOTE: This file contains no tests of its own. +// +// Instead, it contains the definition for type testServer - a test +// object implementing this package's Server interface. +// +type testServer struct { + logger Logger + serve *gate + shutdown *gate + close *gate +} + +func newTestServer(logger Logger, serveErr, shutdownErr, closeErr error) *testServer { + serveBlocking := true + if serveErr != nil { + serveBlocking = false + } + + ts := &testServer{ + logger: logger, + serve: newGate(serveErr, serveBlocking), + shutdown: newGate(shutdownErr, true), + close: newGate(closeErr, false), + } + + return ts +} + +func (ts *testServer) Serve(ctx context.Context) error { + err := ts.serve.wait(ctx) + ts.logger.Infof("Serve returned: %v", err) + return err +} + +func (ts *testServer) Shutdown(ctx context.Context) error { + ts.serve.finish() + return ts.shutdown.wait(ctx) +} + +func (ts *testServer) Close() error { + ts.serve.finish() + return ts.close.wait(context.Background()) +} + +type gate struct { + err error + done chan struct{} + once sync.Once +} + +func newGate(err error, blocking bool) *gate { + g := &gate{err: err} + if blocking { + g.done = make(chan struct{}) + } + + return g +} + +func (g *gate) wait(ctx context.Context) error { + if g.done == nil { + return g.err + } + + select { + case <-ctx.Done(): + return ctx.Err() + + case <-g.done: + return g.err + } +} + +func (g *gate) finish() { + if g != nil && g.done != nil { + g.once.Do(func() { close(g.done) }) + } +} diff --git a/test_signaller_test.go b/test_signaller_test.go new file mode 100644 index 0000000..2a9d939 --- /dev/null +++ b/test_signaller_test.go @@ -0,0 +1,62 @@ +package lameduck + +import ( + "os" +) + +// NOTE: This file contains no tests of its own. +// +// Instead, it contains the testSignaller implemenation for emitting fake +// signals into the main code base without the need for actually using +// real (OS level) signaling. +// + +type testSignaller struct { + orig signaler + sigs map[os.Signal]bool + ch chan<- os.Signal +} + +// injectSignaller injects and returns a new testSignaller into the main codebase. +func injectSignaller() *testSignaller { + ts := &testSignaller{orig: sig} + sig = ts + return ts +} + +// emit sends a Signal through the testSignaller. +func (ts *testSignaller) emit(s os.Signal) { + if ts == nil || ts.ch == nil || ts.sigs == nil || !ts.sigs[s] { + return + } + + go func() { ts.ch <- s }() +} + +// revert replaces an injected testSignaller with the signaler in place at the +// time of injection. +func (ts *testSignaller) revert() { + if ts != nil && ts.orig != nil { + sig = ts.orig + } +} + +// notify contributes to the signaler interface +func (ts *testSignaller) notify(c chan<- os.Signal, sig ...os.Signal) { + ts.ch = c + + if ts.sigs == nil { + ts.sigs = make(map[os.Signal]bool) + } + + for _, s := range sig { + ts.sigs[s] = true + } +} + +// stop contributes to the signaler interface +func (ts *testSignaller) stop(c chan<- os.Signal) { + if ts != nil && ts.ch == c { + ts.sigs = nil + } +}