Skip to content

Commit

Permalink
plugins/status: add plugin support
Browse files Browse the repository at this point in the history
Added support for plugins in status plugin, similar to the pattern
employed in the decision logs plugin. By setting `status.plugin`
in configuration, you can override how the status plugin sends status
updates.

Related to #3047

Signed-off-by: Grant Shively <[email protected]>
  • Loading branch information
gshively11 authored and tsandall committed Apr 16, 2021
1 parent ad5eb05 commit dd35d6c
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 17 deletions.
1 change: 1 addition & 0 deletions docs/content/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,7 @@ included in the actual bundle gzipped tarball.
| `status.service` | `string` | Yes | Name of service to use to contact remote server. |
| `status.partition_name` | `string` | No | Path segment to include in status updates. |
| `status.console` | `boolean` | No (default: `false`) | Log the status updates locally to the console. When enabled alongside a remote status update API the `service` must be configured, the default `service` selection will be disabled. |
| `status.plugin` | `string` | No | Use the named plugin for status updates. If this field exists, the other configuration fields are not required. |


### Decision Logs
Expand Down
30 changes: 30 additions & 0 deletions docs/content/management.md
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,21 @@ to track **remove** vs **upsert** mask operations.
}
```

### Decision Logs Plugin

If the default decision logging behavior does not work for your use case, you can implement your own plugin to customize decision logging, e.g. use a different transport, transform logs, etc.

```yaml
decision_logs:
plugin: my_decision_logs
plugins:
my_decision_logs:
some: property
```

Check out [this example](extensions#putting-it-together) to learn how to build and register the plugin with OPA.

## Status

OPA can periodically report status updates to remote HTTP servers. The
Expand Down Expand Up @@ -932,6 +947,21 @@ This will dump all status updates to the console. See
>`metrics.prometheus` portion of the status update in particular can create a considerable
> amount of log text at info level.

### Status Plugin

If the default status logging behavior does not work for your use case, you can implement your own plugin to customize status logging, e.g. use a different transport, transform logs, etc.

```yaml
status:
plugin: my_status_logs
plugins:
my_status_logs:
some: property
```

Check out [this example](extensions#putting-it-together) for a custom decision logs plugin, and adapt it for the [Logger interface](https://pkg.go.dev/github.com/open-policy-agent/opa/plugins/status#Logger) defined in the status plugin.

## Discovery

OPA can be configured to download bundles of policy and data, report status, and
Expand Down
2 changes: 1 addition & 1 deletion plugins/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func getPluginSet(factories map[string]plugins.Factory, manager *plugins.Manager
return nil, err
}

statusConfig, err := status.ParseConfig(config.Status, manager.Services())
statusConfig, err := status.ParseConfig(config.Status, manager.Services(), pluginNames)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion plugins/logs/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -804,7 +804,7 @@ func TestPluginRateLimitDropCountStatus(t *testing.T) {
"console": true,
}`))

config, _ := status.ParseConfig(pluginConfig, fixture.manager.Services())
config, _ := status.ParseConfig(pluginConfig, fixture.manager.Services(), nil)
p := status.New(config, fixture.manager).WithMetrics(fixture.plugin.metrics)

fixture.manager.Register(status.Name, p)
Expand Down
47 changes: 37 additions & 10 deletions plugins/status/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ import (
"github.com/open-policy-agent/opa/util"
)

// Logger defines the interface for status plugins.
type Logger interface {
plugins.Plugin

Log(context.Context, *UpdateRequestV1) error
}

// UpdateRequestV1 represents the status update message that OPA sends to
// remote HTTP endpoints.
type UpdateRequestV1 struct {
Expand Down Expand Up @@ -54,14 +61,26 @@ type Plugin struct {

// Config contains configuration for the plugin.
type Config struct {
Service string `json:"service"`
PartitionName string `json:"partition_name,omitempty"`
ConsoleLogs bool `json:"console"`
Plugin *string `json:"plugin"`
Service string `json:"service"`
PartitionName string `json:"partition_name,omitempty"`
ConsoleLogs bool `json:"console"`
}

func (c *Config) validateAndInjectDefaults(services []string) error {
func (c *Config) validateAndInjectDefaults(services []string, plugins []string) error {

if c.Service == "" && len(services) != 0 && !c.ConsoleLogs {
if c.Plugin != nil {
var found bool
for _, other := range plugins {
if other == *c.Plugin {
found = true
break
}
}
if !found {
return fmt.Errorf("invalid plugin name %q in status", *c.Plugin)
}
} else if c.Service == "" && len(services) != 0 && !c.ConsoleLogs {
// For backwards compatibility allow defaulting to the first
// service listed, but only if console logging is disabled. If enabled
// we can't tell if the deployer wanted to use only console logs or
Expand All @@ -82,16 +101,16 @@ func (c *Config) validateAndInjectDefaults(services []string) error {
}
}

// If a service wasn't found, and console logging isn't enabled.
if c.Service == "" && !c.ConsoleLogs {
return fmt.Errorf("invalid status config, must have a `service` target or `console` logging specified")
// If a plugin or service wasn't found, and console logging isn't enabled.
if c.Plugin == nil && c.Service == "" && !c.ConsoleLogs {
return fmt.Errorf("invalid status config, must have a `service`, `plugin`, or `console` logging specified")
}

return nil
}

// ParseConfig validates the config and injects default values.
func ParseConfig(config []byte, services []string) (*Config, error) {
func ParseConfig(config []byte, services []string, plugins []string) (*Config, error) {

if config == nil {
return nil, nil
Expand All @@ -103,7 +122,7 @@ func ParseConfig(config []byte, services []string) (*Config, error) {
return nil, err
}

if err := parsedConfig.validateAndInjectDefaults(services); err != nil {
if err := parsedConfig.validateAndInjectDefaults(services, plugins); err != nil {
return nil, err
}

Expand Down Expand Up @@ -270,6 +289,14 @@ func (p *Plugin) oneShot(ctx context.Context) error {
}
}

if p.config.Plugin != nil {
proxy, ok := p.manager.Plugin(*p.config.Plugin).(Logger)
if !ok {
return fmt.Errorf("plugin does not implement Logger interface")
}
return proxy.Log(ctx, req)
}

if p.config.Service != "" {
resp, err := p.manager.Client(p.config.Service).
WithJSON(req).
Expand Down
50 changes: 45 additions & 5 deletions plugins/status/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func TestPluginReconfigure(t *testing.T) {
"partition_name": "test"
}`))

config, _ := ParseConfig(pluginConfig, fixture.manager.Services())
config, _ := ParseConfig(pluginConfig, fixture.manager.Services(), nil)

fixture.plugin.Reconfigure(ctx, config)
fixture.plugin.Stop(ctx)
Expand Down Expand Up @@ -335,7 +335,7 @@ func TestParseConfigUseDefaultServiceNoConsole(t *testing.T) {
"console": false
}`))

config, err := ParseConfig([]byte(loggerConfig), services)
config, err := ParseConfig([]byte(loggerConfig), services, nil)

if err != nil {
t.Errorf("Unexpected error: %s", err)
Expand All @@ -357,7 +357,7 @@ func TestParseConfigDefaultServiceWithConsole(t *testing.T) {
"console": true
}`))

config, err := ParseConfig([]byte(loggerConfig), services)
config, err := ParseConfig([]byte(loggerConfig), services, nil)

if err != nil {
t.Errorf("Unexpected error: %s", err)
Expand All @@ -371,7 +371,7 @@ func TestParseConfigDefaultServiceWithConsole(t *testing.T) {
func TestParseConfigDefaultServiceWithNoServiceOrConsole(t *testing.T) {
loggerConfig := []byte(fmt.Sprintf(`{}`))

_, err := ParseConfig([]byte(loggerConfig), []string{})
_, err := ParseConfig([]byte(loggerConfig), []string{}, nil)

if err == nil {
t.Errorf("Expected an error but err==nil")
Expand Down Expand Up @@ -421,7 +421,7 @@ func newTestFixture(t *testing.T, m metrics.Metrics, options ...testPluginCustom
"service": "example",
}`))

config, _ := ParseConfig(pluginConfig, manager.Services())
config, _ := ParseConfig(pluginConfig, manager.Services(), nil)
for _, option := range options {
option(config)
}
Expand Down Expand Up @@ -480,3 +480,43 @@ func testStatus() *bundle.Status {

return &status
}

type testPlugin struct {
reqs []UpdateRequestV1
}

func (*testPlugin) Start(context.Context) error {
return nil
}

func (p *testPlugin) Stop(context.Context) {
}

func (p *testPlugin) Reconfigure(context.Context, interface{}) {
}

func (p *testPlugin) Log(_ context.Context, req *UpdateRequestV1) error {
p.reqs = append(p.reqs, *req)
return nil
}

func TestPluginCustomBackend(t *testing.T) {
ctx := context.Background()
manager, _ := plugins.New(nil, "test-instance-id", inmem.New())

backend := &testPlugin{}
manager.Register("test_plugin", backend)

config, err := ParseConfig([]byte(`{"plugin": "test_plugin"}`), nil, []string{"test_plugin"})
if err != nil {
t.Fatal(err)
}

plugin := New(config, manager)
plugin.oneShot(ctx)
plugin.oneShot(ctx)

if len(backend.reqs) != 2 {
t.Fatalf("Unexpected number of reqs: expected 2, got %d: %v", len(backend.reqs), backend.reqs)
}
}

0 comments on commit dd35d6c

Please sign in to comment.