Skip to content

Commit

Permalink
Add provider.ReplayInvokes (#105)
Browse files Browse the repository at this point in the history
This allows PreviewProviderUpgrade to avoid side-effects from invokes
which cause failures when the replay environment differs from the
recording environment such as local vs CI.

- Allow wrapping a provider factory and intercept invokes to replay them
from a log file.
- Expose `GetUpgradeCacheDir` so users don't have to hard code the whole
long path themselves.

Closes #43
Fixes #31

### Example usage

```go
// Construct a provider factory as usual
providerFactory := providers.ResourceProviderFactory(providerServer)
// Locate where the gRPC logs were recorded
cacheDir := providertest.GetUpgradeCacheDir(filepath.Base(dir), "5.60.0")
// Intercept invoke calls and replay responses from the log file.
factoryWithReplay := providerFactory.ReplayInvokes(filepath.Join(cacheDir, "grpc.json"), true)
```
  • Loading branch information
danielrbradley authored Sep 6, 2024
1 parent be10035 commit cecfbc7
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 25 deletions.
14 changes: 10 additions & 4 deletions previewProviderUpgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ func PreviewProviderUpgrade(t pulumitest.PT, pulumiTest *pulumitest.PulumiTest,
for _, opt := range opts {
opt.Apply(&options)
}
programName := filepath.Base(pulumiTest.Source())
cacheDir := getCacheDir(options, programName, baselineVersion)
programName := filepath.Base(pulumiTest.WorkingDir())
cacheDir := GetUpgradeCacheDir(programName, baselineVersion, options.CacheDirTemplate...)
previewTest.Run(t,
func(test *pulumitest.PulumiTest) {
t.Helper()
Expand Down Expand Up @@ -54,9 +54,15 @@ func baselineProviderOpt(options optproviderupgrade.PreviewProviderUpgradeOption
}
}

func getCacheDir(options optproviderupgrade.PreviewProviderUpgradeOptions, programName string, baselineVersion string) string {
// GetUpgradeCacheDir returns the cache directory for a provider upgrade test.
// If no cacheDirTemplatePath is provided, the default cache directory is used.
func GetUpgradeCacheDir(programName, baselineVersion string, cacheDirTemplatePath ...string) string {
cacheDirTemplate := cacheDirTemplatePath
if len(cacheDirTemplate) == 0 {
cacheDirTemplate = optproviderupgrade.Defaults().CacheDirTemplate
}
var cacheDir string
for _, pathTemplateElement := range options.CacheDirTemplate {
for _, pathTemplateElement := range cacheDirTemplate {
switch pathTemplateElement {
case optproviderupgrade.ProgramName:
cacheDir = filepath.Join(cacheDir, programName)
Expand Down
18 changes: 18 additions & 0 deletions previewProviderUpgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/pulumi/providertest"
"github.com/pulumi/providertest/optproviderupgrade"
"github.com/pulumi/providertest/providers"
"github.com/pulumi/providertest/pulumitest"
"github.com/pulumi/providertest/pulumitest/assertpreview"
"github.com/pulumi/providertest/pulumitest/opttest"
Expand Down Expand Up @@ -44,3 +45,20 @@ func TestPreviewUpgradeWithKnownSourceEdit(t *testing.T) {

assert.Contains(t, previewResult.StdOut, "random:index:RandomPassword password create")
}

func TestPreviewWithInvokeReplayed(t *testing.T) {
t.Parallel()
cacheDir := t.TempDir()
commandProvider := providers.DownloadPluginBinaryFactory("command", "1.0.1")
// Intercept all invokes and replay them from a gRPC log during the preview.
commandProvider = commandProvider.ReplayInvokes(filepath.Join(cacheDir, "grpc.json"), false)
test := pulumitest.NewPulumiTest(t, filepath.Join("pulumitest", "testdata", "yaml_command_invoke"),
opttest.AttachProvider("command", commandProvider))

// We're not changing the version, but if the preview doesn't re-use the captured invoke the value will be different.
// This will cause a resource update which we can assert against.
previewResult := providertest.PreviewProviderUpgrade(t, test, "command", "1.0.1",
optproviderupgrade.CacheDir(cacheDir))

assertpreview.HasNoChanges(t, previewResult)
}
57 changes: 57 additions & 0 deletions providers/replayInvokes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package providers

import (
"context"
"fmt"
"reflect"

"github.com/pulumi/providertest/grpclog"
pulumirpc "github.com/pulumi/pulumi/sdk/v3/proto/go"
)

// ReplayInvokes wraps a provider factory, intercepting all invokes and replaying them from a gRPC log.
// Example:
// providerFactory := providers.ResourceProviderFactory(providerServer)
// cacheDir := providertest.GetUpgradeCacheDir(filepath.Base(dir), "5.60.0")
// factoryWithReplay := providerFactory.ReplayInvokes(filepath.Join(cacheDir, "grpc.json"), true)
func (pf ProviderFactory) ReplayInvokes(grpcLogPath string, allowLiveFallback bool) ProviderFactory {
interceptors := ProviderInterceptors{
Invoke: func(ctx context.Context, in *pulumirpc.InvokeRequest, client pulumirpc.ResourceProviderClient) (*pulumirpc.InvokeResponse, error) {
log, err := grpclog.LoadLog(grpcLogPath)
if err != nil {
return nil, fmt.Errorf("failed to load gRPC log: %w", err)
}
invokes, err := log.Invokes()
if err != nil {
return nil, fmt.Errorf("failed to get invokes from log: %w", err)
}
requestedToken := in.GetTok()
// Avoid using range due to invokes containing sync locks.
for i := 0; i < len(invokes); i++ {
if invokes[i].Request.Tok == requestedToken {
if reflect.DeepEqual(in.Args.AsMap(), invokes[i].Request.Args.AsMap()) {
return &invokes[i].Response, nil
}
}
}
if allowLiveFallback {
return client.Invoke(ctx, in)
} else {
return nil, fmt.Errorf("failed to find invoke %s in gRPC log", requestedToken)
}
},
}
return func(ctx context.Context, pt PulumiTest) (Port, error) {
port, err := pf(ctx, pt)
if err != nil {
return -1, err
}
interceptResourceProviderServer, err := NewProviderInterceptProxy(ctx, port, interceptors)
if err != nil {
return -1, err
}
return startResourceProviderServer(ctx, pt, func(pt PulumiTest) (pulumirpc.ResourceProviderServer, error) {
return interceptResourceProviderServer, nil
})
}
}
46 changes: 25 additions & 21 deletions providers/resourceProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,32 @@ type ResourceProviderServerFactory func(PulumiTest) (pulumirpc.ResourceProviderS
// To shut down the provider, cancel the context.
func ResourceProviderFactory(makeResourceProviderServer ResourceProviderServerFactory) ProviderFactory {
return func(ctx context.Context, pt PulumiTest) (Port, error) {
cancelChannel := make(chan bool)
go func() {
<-ctx.Done()
close(cancelChannel)
}()
return startResourceProviderServer(ctx, pt, makeResourceProviderServer)
}
}

handle, err := rpcutil.ServeWithOptions(rpcutil.ServeOptions{
Cancel: cancelChannel,
Init: func(srv *grpc.Server) error {
prov, proverr := makeResourceProviderServer(pt)
if proverr != nil {
return fmt.Errorf("failed to create resource provider server: %v", proverr)
}
pulumirpc.RegisterResourceProviderServer(srv, prov)
return nil
},
Options: rpcutil.OpenTracingServerInterceptorOptions(nil),
})
if err != nil {
return 0, fmt.Errorf("fatal: %v", err)
}
func startResourceProviderServer(ctx context.Context, pt PulumiTest, makeResourceProviderServer ResourceProviderServerFactory) (Port, error) {
cancelChannel := make(chan bool)
go func() {
<-ctx.Done()
close(cancelChannel)
}()

return Port(handle.Port), nil
handle, err := rpcutil.ServeWithOptions(rpcutil.ServeOptions{
Cancel: cancelChannel,
Init: func(srv *grpc.Server) error {
prov, proverr := makeResourceProviderServer(pt)
if proverr != nil {
return fmt.Errorf("failed to create resource provider server: %v", proverr)
}
pulumirpc.RegisterResourceProviderServer(srv, prov)
return nil
},
Options: rpcutil.OpenTracingServerInterceptorOptions(nil),
})
if err != nil {
return 0, fmt.Errorf("fatal: %v", err)
}

return Port(handle.Port), nil
}
32 changes: 32 additions & 0 deletions pulumitest/testdata/yaml_command_invoke/Pulumi.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
name: yaml_command_invoke
description: This is used to test invoke replay.
runtime: yaml
variables:
# Create a variable which always changes on every run
randomShellString1:
fn::invoke:
function: command:local:run
arguments:
command: cat /dev/urandom | LC_ALL=C tr -dc 'a-zA-Z0-9' | head -c 10
# Create a second to check the args are deeply matched
# We therefore have a slightly different command so the command text isn't identical
randomShellString2:
fn::invoke:
function: command:local:run
arguments:
command: cat /dev/urandom | LC_ALL=C tr -dc 'a-zA-Z0-9' | head -c 20
resources:
randomEchoResource1:
type: command:local:Command
properties:
create: echo "${randomShellString1}"
# Force this resource to be updated if the invoke changes which is easy to assert on in the preview result.
triggers:
- ${randomShellString1}
randomEchoResource2:
type: command:local:Command
properties:
create: echo "${randomShellString2}"
# Force this resource to be updated if the invoke changes which is easy to assert on in the preview result.
triggers:
- ${randomShellString2}

0 comments on commit cecfbc7

Please sign in to comment.