Skip to content

Commit

Permalink
Flush buffered decision logs on graceful shutdown
Browse files Browse the repository at this point in the history
..re-attempting until either the graceful shutdown period is over or all logs have been uploaded.

Fixes open-policy-agent#780

Signed-off-by: Anders Eknert <[email protected]>
  • Loading branch information
anderseknert committed Oct 14, 2020
1 parent 8dd0896 commit a467fe9
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 3 deletions.
36 changes: 36 additions & 0 deletions plugins/logs/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,12 +401,48 @@ func (p *Plugin) Start(ctx context.Context) error {
// Stop stops the plugin.
func (p *Plugin) Stop(ctx context.Context) {
p.logInfo("Stopping decision logger.")

gracefulDeadline, _ := ctx.Deadline()
gracefulShutdownPeriod := gracefulDeadline.Sub(time.Now())

if p.config.Service != "" && gracefulShutdownPeriod > 0 {
p.flushDecisions(context.WithTimeout(ctx, gracefulShutdownPeriod))
}

done := make(chan struct{})
p.stop <- done
_ = <-done
p.manager.UpdatePluginStatus(Name, &plugins.Status{State: plugins.StateNotReady})
}

func (p *Plugin) flushDecisions(ctx context.Context, cancel context.CancelFunc) {
p.logInfo("Flushing decision logs.")
defer cancel()

go func(ctx context.Context, cancel context.CancelFunc) {
for {
ok, err := p.oneShot(ctx)
if err != nil {
p.logError("%v.", err)
} else if ok {
cancel()
}
// Wait some before retrying, but skip incrementing interval since we are shutting down
time.Sleep(1 * time.Second)
}
}(ctx, cancel)

select {
case <-ctx.Done():
switch ctx.Err() {
case context.DeadlineExceeded:
p.logError("Graceful shutdown period ended with decisions possibly still in buffer.")
case context.Canceled:
p.logInfo("All decisions in buffer uploaded.")
}
}
}

// Log appends a decision log event to the buffer for uploading.
func (p *Plugin) Log(ctx context.Context, decision *server.Info) error {

Expand Down
70 changes: 69 additions & 1 deletion plugins/logs/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,74 @@ func TestPluginRequeBufferPreserved(t *testing.T) {
}
}

func TestPluginGracefulShutdownFlushesDecisions(t *testing.T) {
ctx := context.Background()

fixture := newTestFixture(t)
defer fixture.server.stop()

fixture.server.ch = make(chan []EventV1, 8)

if err := fixture.plugin.Start(ctx); err != nil {
t.Fatal(err)
}

var input interface{} = map[string]interface{}{"method": "GET"}
var result interface{} = false

logsSent := 200
for i := 0; i < logsSent; i++ {
input = generateInputMap(i)
_ = fixture.plugin.Log(ctx, logServerInfo("abc", input, result))
}

fixture.server.expCode = 200

timeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
fixture.plugin.Stop(timeoutCtx)

close(fixture.server.ch)
logsReceived := 0
for element := range fixture.server.ch {
logsReceived += len(element)
}

if logsReceived != logsSent {
t.Fatalf("Expected %v, got %v", logsSent, logsReceived)
}
}

func TestPluginTerminatesImmediatelyAfterGracefulShutdownPeriod(t *testing.T) {
ctx := context.Background()

fixture := newTestFixture(t)
defer fixture.server.stop()

fixture.server.ch = make(chan []EventV1, 1)

if err := fixture.plugin.Start(ctx); err != nil {
t.Fatal(err)
}

var input interface{} = map[string]interface{}{"method": "GET"}
var result interface{} = false

input = generateInputMap(0)
_ = fixture.plugin.Log(ctx, logServerInfo("abc", input, result))

fixture.server.expCode = 500

timeoutCtx, cancel := context.WithTimeout(ctx, 1*time.Millisecond)
defer cancel()

timeBefore := time.Now()
fixture.plugin.Stop(timeoutCtx)
if time.Since(timeBefore).Milliseconds() > 100 {
t.Fatal("Expected forceful shutdown to be instantaneous.")
}
}

func TestPluginReconfigure(t *testing.T) {

ctx := context.Background()
Expand Down Expand Up @@ -866,7 +934,7 @@ func newTestFixture(t *testing.T, options ...testPluginCustomizer) testFixture {
}
]}`, ts.server.URL))

manager, err := plugins.New(managerConfig, "test-instance-id", inmem.New())
manager, err := plugins.New(managerConfig, "test-instance-id", inmem.New(), plugins.GracefulShutdownPeriod(10))
if err != nil {
t.Fatal(err)
}
Expand Down
14 changes: 13 additions & 1 deletion plugins/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package plugins
import (
"context"
"sync"
"time"

"github.com/open-policy-agent/opa/ast"
"github.com/open-policy-agent/opa/bundle"
Expand Down Expand Up @@ -138,6 +139,7 @@ type Manager struct {
maxErrors int
initialized bool
interQueryBuiltinCacheConfig *cache.Config
gracefulShutdownPeriod int
}

type managerContextKey string
Expand Down Expand Up @@ -195,6 +197,13 @@ func MaxErrors(n int) func(*Manager) {
}
}

// GracefulShutdownPeriod passes the configured graceful shutdown period to plugins
func GracefulShutdownPeriod(gracefulShutdownPeriod int) func(*Manager) {
return func(m *Manager) {
m.gracefulShutdownPeriod = gracefulShutdownPeriod
}
}

// New creates a new Manager using config.
func New(raw []byte, id string, store storage.Store, opts ...func(*Manager)) (*Manager, error) {

Expand Down Expand Up @@ -382,7 +391,8 @@ func (m *Manager) Start(ctx context.Context) error {
return nil
}

// Stop stops the manager, stopping all the plugins registered with it
// Stop stops the manager, stopping all the plugins registered with it. Any plugin that needs to perform cleanup should
// do so within the duration of the graceful shutdown period passed with the context as a timeout.
func (m *Manager) Stop(ctx context.Context) {
var toStop []Plugin

Expand All @@ -395,6 +405,8 @@ func (m *Manager) Stop(ctx context.Context) {
}
}()

ctx, cancel := context.WithTimeout(ctx, time.Duration(m.gracefulShutdownPeriod)*time.Second)
defer cancel()
for i := range toStop {
toStop[i].Stop(ctx)
}
Expand Down
2 changes: 1 addition & 1 deletion runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func NewRuntime(ctx context.Context, params Params) (*Runtime, error) {
return nil, err
}

manager, err := plugins.New(config, params.ID, inmem.New(), plugins.Info(info), plugins.InitBundles(loaded.Bundles), plugins.InitFiles(loaded.Files), plugins.MaxErrors(params.ErrorLimit))
manager, err := plugins.New(config, params.ID, inmem.New(), plugins.Info(info), plugins.InitBundles(loaded.Bundles), plugins.InitFiles(loaded.Files), plugins.MaxErrors(params.ErrorLimit), plugins.GracefulShutdownPeriod(params.GracefulShutdownPeriod))
if err != nil {
return nil, errors.Wrap(err, "config error")
}
Expand Down

0 comments on commit a467fe9

Please sign in to comment.