Skip to content

Commit

Permalink
plugins/discovery: Support to persist and load discovery bundle from …
Browse files Browse the repository at this point in the history
…disk

This commit adds support to persist and load discovery bundle from disk.
Only the discovery bundle itself is persisted and not the configuration produced
by the discovery bundle. A new field is introduced in OPA's discovery
configuration that can be optionally set to enable OPA to write and
read the discovery bundle from disk. This feature would enable OPA to evaluate
the discovery bundle in scenarios where it is unable to communicate with the
bundle server on start-up.

Fixes #2886

Signed-off-by: Ashutosh Narkar <[email protected]>
  • Loading branch information
ashutosh-narkar committed Feb 6, 2023
1 parent f93d0f8 commit 1630052
Show file tree
Hide file tree
Showing 7 changed files with 785 additions and 68 deletions.
23 changes: 12 additions & 11 deletions docs/content/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -840,18 +840,19 @@ included in the actual bundle gzipped tarball.

### Discovery

| Field | Type | Required | Description |
| --- | --- | --- | --- |
| `discovery.resource` | `string` | Yes | Resource path to use to download bundle from configured service. |
| Field | Type | Required | Description |
| --- | --- | --- |-------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `discovery.resource` | `string` | Yes | Resource path to use to download bundle from configured service. |
| `discovery.service` | `string` | No | Name of the service to use to contact remote server. If omitted, the configuration must contain exactly one service. Discovery will default to this service. |
| `discovery.decision` | `string` | No | The path of the decision to evaluate in the discovery bundle. By default, OPA will evaluate `data` in the discovery bundle to produce the configuration. |
| `discovery.polling.min_delay_seconds` | `int64` | No (default: `60`) | Minimum amount of time to wait between configuration downloads. |
| `discovery.polling.max_delay_seconds` | `int64` | No (default: `120`) | Maximum amount of time to wait between configuration downloads. |
| `discovery.trigger` | `string` (default: `periodic`) | No | Controls how bundle is downloaded from the remote server. Allowed values are `periodic` and `manual`. |
| `discovery.polling.long_polling_timeout_seconds` | `int64` | No | Maximum amount of time the server should wait before issuing a timeout if there's no update available. |
| `discovery.signing.keyid` | `string` | No | Name of the key to use for bundle signature verification. |
| `discovery.signing.scope` | `string` | No | Scope to use for bundle signature verification. |
| `discovery.signing.exclude_files` | `array` | No | Files in the bundle to exclude during verification. |
| `discovery.decision` | `string` | No | The path of the decision to evaluate in the discovery bundle. By default, OPA will evaluate `data` in the discovery bundle to produce the configuration. |
| `discovery.polling.min_delay_seconds` | `int64` | No (default: `60`) | Minimum amount of time to wait between configuration downloads. |
| `discovery.polling.max_delay_seconds` | `int64` | No (default: `120`) | Maximum amount of time to wait between configuration downloads. |
| `discovery.trigger` | `string` (default: `periodic`) | No | Controls how bundle is downloaded from the remote server. Allowed values are `periodic` and `manual`. |
| `discovery.polling.long_polling_timeout_seconds` | `int64` | No | Maximum amount of time the server should wait before issuing a timeout if there's no update available. |
| `discovery.signing.keyid` | `string` | No | Name of the key to use for bundle signature verification. |
| `discovery.signing.scope` | `string` | No | Scope to use for bundle signature verification. |
| `discovery.signing.exclude_files` | `array` | No | Files in the bundle to exclude during verification. |
| `discovery.persist` | `bool` | No | Persist activated discovery bundle to disk. |

> ⚠️ The plugin trigger mode configured on the discovery plugin will be inherited by the bundle, decision log
> and status plugins. For example, if the discovery plugin is configured to use the manual trigger mode, all other
Expand Down
16 changes: 16 additions & 0 deletions docs/content/management-discovery.md
Original file line number Diff line number Diff line change
Expand Up @@ -285,3 +285,19 @@ signature verification of a discovery bundle **CANNOT** be modified via discover
> 🚨 We recommend that if you are using discovery you should be signing the discovery bundles because those bundles
> include the keys used to verify the non-discovery bundles. However, OPA does not enforce that recommendation. You may use
> unsigned discovery bundles that themselves require non-discovery bundles to be signed.
### Discovery Bundle Persistence

OPA can optionally persist the activated discovery bundle to disk for recovery purposes. To enable
persistence, set the `discovery.persist` field to `true`. When bundle
persistence is enabled, OPA will attempt to read the discovery bundle from disk on startup. This
allows OPA to start with the most recently activated bundle in case OPA cannot communicate
with the bundle server. OPA will try to load and activate the persisted discovery bundle on a best-effort basis. Any errors
encountered during the process will be surfaced in the bundle's status update. When communication between OPA and
the bundle server is restored, the latest bundle is downloaded, activated, and persisted. Like regular bundles, only
the discovery bundle itself is persisted. The discovered configuration that is generated by evaluating the data and
policies contained in the discovery bundle will **NOT** be persisted.

{{< info >}}
By default, the discovery bundle is persisted under the current working directory of the OPA process (e.g., `./.opa/bundles/<discovery.name>/bundle.tar.gz`).
{{< /info >}}
55 changes: 55 additions & 0 deletions internal/bundle/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ package bundle
import (
"context"
"fmt"
"io"
"os"
"path/filepath"

"github.com/open-policy-agent/opa/ast"
"github.com/open-policy-agent/opa/bundle"
Expand Down Expand Up @@ -83,3 +86,55 @@ func LoadWasmResolversFromStore(ctx context.Context, store storage.Store, txn st
}
return resolvers, nil
}

// LoadBundleFromDisk loads a previously persisted activated bundle from disk
func LoadBundleFromDisk(path, name string, bvc *bundle.VerificationConfig) (*bundle.Bundle, error) {
bundlePath := filepath.Join(path, name, "bundle.tar.gz")

if _, err := os.Stat(bundlePath); err == nil {
f, err := os.Open(filepath.Join(bundlePath))
if err != nil {
return nil, err
}
defer f.Close()

r := bundle.NewCustomReader(bundle.NewTarballLoaderWithBaseURL(f, ""))

if bvc != nil {
r = r.WithBundleVerificationConfig(bvc)
}

b, err := r.Read()
if err != nil {
return nil, err
}
return &b, nil
} else if os.IsNotExist(err) {
return nil, nil
} else {
return nil, err
}
}

// SaveBundleToDisk saves the given raw bytes representing the bundle's content to disk
func SaveBundleToDisk(path string, raw io.Reader) (string, error) {
if _, err := os.Stat(path); os.IsNotExist(err) {
err = os.MkdirAll(path, os.ModePerm)
if err != nil {
return "", err
}
}

if raw == nil {
return "", fmt.Errorf("no raw bundle bytes to persist to disk")
}

dest, err := os.CreateTemp(path, ".bundle.tar.gz.*.tmp")
if err != nil {
return "", err
}
defer dest.Close()

_, err = io.Copy(dest, raw)
return dest.Name(), err
}
47 changes: 4 additions & 43 deletions plugins/bundle/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,53 +675,14 @@ func (p *Plugin) saveBundleToDisk(name string, raw io.Reader) error {
}

func saveCurrentBundleToDisk(path string, raw io.Reader) (string, error) {
if _, err := os.Stat(path); os.IsNotExist(err) {
err = os.MkdirAll(path, os.ModePerm)
if err != nil {
return "", err
}
}

if raw == nil {
return "", fmt.Errorf("no raw bundle bytes to persist to disk")
}

dest, err := os.CreateTemp(path, ".bundle.tar.gz.*.tmp")
if err != nil {
return "", err
}
defer dest.Close()

_, err = io.Copy(dest, raw)
return dest.Name(), err
return bundleUtils.SaveBundleToDisk(path, raw)
}

func loadBundleFromDisk(path, name string, src *Source) (*bundle.Bundle, error) {
bundlePath := filepath.Join(path, name, "bundle.tar.gz")

if _, err := os.Stat(bundlePath); err == nil {
f, err := os.Open(filepath.Join(bundlePath))
if err != nil {
return nil, err
}
defer f.Close()

r := bundle.NewReader(f)

if src != nil {
r = r.WithBundleVerificationConfig(src.Signing)
}

b, err := r.Read()
if err != nil {
return nil, err
}
return &b, nil
} else if os.IsNotExist(err) {
return nil, nil
} else {
return nil, err
if src != nil {
return bundleUtils.LoadBundleFromDisk(path, name, src.Signing)
}
return bundleUtils.LoadBundleFromDisk(path, name, nil)
}

func (p *Plugin) log(name string) logging.Logger {
Expand Down
1 change: 1 addition & 0 deletions plugins/discovery/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Config struct {
Service string `json:"service"` // the name of the service used to download discovery bundle from
Resource *string `json:"resource,omitempty"` // the resource path which will be downloaded from the service
Signing *bundle.VerificationConfig `json:"signing,omitempty"` // configuration used to verify a signed bundle
Persist bool `json:"persist"` // control whether to persist activated discovery bundle to disk

service string
path string
Expand Down
147 changes: 134 additions & 13 deletions plugins/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"strings"
Expand All @@ -18,6 +19,7 @@ import (
bundleApi "github.com/open-policy-agent/opa/bundle"
"github.com/open-policy-agent/opa/config"
"github.com/open-policy-agent/opa/download"
bundleUtils "github.com/open-policy-agent/opa/internal/bundle"
cfg "github.com/open-policy-agent/opa/internal/config"
"github.com/open-policy-agent/opa/keys"
"github.com/open-policy-agent/opa/logging"
Expand All @@ -30,24 +32,33 @@ import (
"github.com/open-policy-agent/opa/storage/inmem"
)

// Name is the discovery plugin name that will be registered with the plugin manager.
const Name = "discovery"
const (
// Name is the discovery plugin name that will be registered with the plugin manager.
Name = "discovery"

// maxActivationRetry represents the maximum number of attempts
// to activate a persisted discovery bundle. The value chosen for
// maxActivationRetry ensures that too much time is not spent to activate
// a bundle that will never successfully activate.
maxActivationRetry = 10
)

// Discovery implements configuration discovery for OPA. When discovery is
// started it will periodically download a configuration bundle and try to
// reconfigure the OPA.
type Discovery struct {
manager *plugins.Manager
config *Config
factories map[string]plugins.Factory
downloader bundle.Loader // discovery bundle downloader
status *bundle.Status // discovery status
listenersMtx sync.Mutex // lock for listener map
listeners map[interface{}]func(bundle.Status) // listeners for discovery update events
etag string // discovery bundle etag for caching purposes
metrics metrics.Metrics
readyOnce sync.Once
logger logging.Logger
manager *plugins.Manager
config *Config
factories map[string]plugins.Factory
downloader bundle.Loader // discovery bundle downloader
status *bundle.Status // discovery status
listenersMtx sync.Mutex // lock for listener map
listeners map[interface{}]func(bundle.Status) // listeners for discovery update events
etag string // discovery bundle etag for caching purposes
metrics metrics.Metrics
readyOnce sync.Once
logger logging.Logger
bundlePersistPath string
}

// Factories provides a set of factory functions to use for
Expand Down Expand Up @@ -116,6 +127,15 @@ func New(manager *plugins.Manager, opts ...func(*Discovery)) (*Discovery, error)

// Start starts the dynamic discovery process if configured.
func (c *Discovery) Start(ctx context.Context) error {

bundlePersistPath, err := c.getBundlePersistPath()
if err != nil {
return err
}
c.bundlePersistPath = bundlePersistPath

c.loadAndActivateBundleFromDisk(ctx)

if c.downloader != nil {
c.downloader.Start(ctx)
} else {
Expand Down Expand Up @@ -171,6 +191,94 @@ func (c *Discovery) RegisterListener(name interface{}, f func(bundle.Status)) {
c.listeners[name] = f
}

func (c *Discovery) getBundlePersistPath() (string, error) {
persistDir, err := c.manager.Config.GetPersistenceDirectory()
if err != nil {
return "", err
}

return filepath.Join(persistDir, "bundles"), nil
}

func (c *Discovery) loadAndActivateBundleFromDisk(ctx context.Context) {

if c.config != nil && c.config.Persist {
b, err := c.loadBundleFromDisk()
if err != nil {
c.logger.Error("Failed to load discovery bundle from disk: %v", err)
c.status.SetError(err)
return
}

if b == nil {
return
}

for retry := 0; retry < maxActivationRetry; retry++ {

ps, err := c.processBundle(ctx, b)
if err != nil {
c.logger.Error("Discovery bundle processing error occurred: %v", err)
c.status.SetError(err)
continue
}

for _, p := range ps.Start {
if err := p.Start(ctx); err != nil {
c.logger.Error("Failed to start configured plugins: %v", err)
c.status.SetError(err)
return
}
}

for _, p := range ps.Reconfig {
p.Plugin.Reconfigure(ctx, p.Config)
}

c.status.SetError(nil)
c.status.SetActivateSuccess(b.Manifest.Revision)

// On the first activation success mark the plugin as being in OK state
c.readyOnce.Do(func() {
c.manager.UpdatePluginStatus(Name, &plugins.Status{State: plugins.StateOK})
})

c.logger.Debug("Discovery bundle loaded from disk and activated successfully.")
}
}
}

func (c *Discovery) loadBundleFromDisk() (*bundleApi.Bundle, error) {
return bundleUtils.LoadBundleFromDisk(c.bundlePersistPath, *c.config.Name, c.config.Signing)
}

func (c *Discovery) saveBundleToDisk(raw io.Reader) error {

bundleDir := filepath.Join(c.bundlePersistPath, *c.config.Name)
bundleFile := filepath.Join(bundleDir, "bundle.tar.gz")

tmpFile, saveErr := saveCurrentBundleToDisk(bundleDir, raw)
if saveErr != nil {
c.logger.Error("Failed to save new discovery bundle to disk: %v", saveErr)

if err := os.Remove(tmpFile); err != nil {
c.logger.Warn("Failed to remove temp file ('%s'): %v", tmpFile, err)
}

if _, err := os.Stat(bundleFile); err == nil {
c.logger.Warn("Older version of activated discovery bundle persisted, ignoring error")
return nil
}
return saveErr
}

return os.Rename(tmpFile, bundleFile)
}

func saveCurrentBundleToDisk(path string, raw io.Reader) (string, error) {
return bundleUtils.SaveBundleToDisk(path, raw)
}

func (c *Discovery) oneShot(ctx context.Context, u download.Update) {

c.processUpdate(ctx, u)
Expand Down Expand Up @@ -211,6 +319,19 @@ func (c *Discovery) processUpdate(ctx context.Context, u download.Update) {
return
}

if c.config != nil && c.config.Persist {
c.logger.Debug("Persisting discovery bundle to disk in progress.")

err := c.saveBundleToDisk(u.Raw)
if err != nil {
c.logger.Error("Persisting discovery bundle to disk failed: %v", err)
c.status.SetError(err)
c.downloader.SetCache("")
return
}
c.logger.Debug("Discovery bundle persisted to disk successfully at path %v.", filepath.Join(c.bundlePersistPath, *c.config.Name))
}

c.status.SetError(nil)
c.status.SetActivateSuccess(u.Bundle.Manifest.Revision)

Expand Down
Loading

0 comments on commit 1630052

Please sign in to comment.