Skip to content

Commit

Permalink
wasm-sdk: use wasmtime.Linker for resolving imports, only have one wa…
Browse files Browse the repository at this point in the history
…smtime.Engine instance per pool (#3764)

* wasm-sdk: use wasmtime.Linker for resolving imports

No functional change here, but this is easier to read and understand:
anything the module needs, the linker will try to satisfy using the
defined module/name entries ("env.memory", "env.opa_println", etc).

* wasm-sdk: only use one *wasmtime.Engine instance

This should bring us a little more in-line with the
https://docs.rs/wasmtime/0.29.0/wasmtime/#example-architecture

> When the server starts, we’ll start off by creating an Engine (and
> maybe tweaking Config settings if necessary). This Engine will be
> the only engine for the lifetime of the server itself.
> Next, we can compile our WebAssembly.

Signed-off-by: Stephan Renatus <[email protected]>
  • Loading branch information
srenatus authored Aug 27, 2021
1 parent 0b7a2c3 commit 968220f
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 21 deletions.
12 changes: 10 additions & 2 deletions internal/wasm/sdk/internal/wasm/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"context"
"sync"

"github.com/bytecodealliance/wasmtime-go"

"github.com/open-policy-agent/opa/internal/wasm/sdk/opa/errors"
"github.com/open-policy-agent/opa/metrics"
)
Expand All @@ -17,6 +19,7 @@ var errNotReady = errors.New(errors.NotReadyErr, "")

// Pool maintains a pool of WebAssemly VM instances.
type Pool struct {
engine *wasmtime.Engine
available chan struct{}
mutex sync.Mutex
dataMtx sync.Mutex
Expand All @@ -35,12 +38,17 @@ type Pool struct {

// NewPool constructs a new pool with the pool and VM configuration provided.
func NewPool(poolSize, memoryMinPages, memoryMaxPages uint32) *Pool {

cfg := wasmtime.NewConfig()
cfg.SetInterruptable(true)

available := make(chan struct{}, poolSize)
for i := uint32(0); i < poolSize; i++ {
available <- struct{}{}
}

return &Pool{
engine: wasmtime.NewEngineWithConfig(cfg),
memoryMinPages: memoryMinPages,
memoryMaxPages: memoryMaxPages,
available: available,
Expand Down Expand Up @@ -106,7 +114,7 @@ func (p *Pool) Acquire(ctx context.Context, metrics metrics.Metrics) (*VM, error
parsedDataAddr: parsedDataAddr,
memoryMin: p.memoryMinPages,
memoryMax: p.memoryMaxPages,
})
}, p.engine)
p.mutex.Lock()

if err != nil {
Expand Down Expand Up @@ -168,7 +176,7 @@ func (p *Pool) SetPolicyData(ctx context.Context, policy []byte, data []byte) er
parsedDataAddr: 0,
memoryMin: p.memoryMinPages,
memoryMax: p.memoryMaxPages,
})
}, p.engine)

if err == nil {
parsedDataAddr, parsedData := vm.cloneDataSegment()
Expand Down
32 changes: 13 additions & 19 deletions internal/wasm/sdk/internal/wasm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
// VM is a wrapper around a Wasm VM instance
type VM struct {
dispatcher *builtinDispatcher
engine *wasmtime.Engine
store *wasmtime.Store
instance *wasmtime.Instance // Pointer to avoid unintented destruction (triggering finalizers within).
intHandle *wasmtime.InterruptHandle
Expand Down Expand Up @@ -67,12 +68,10 @@ type vmOpts struct {
memoryMax uint32
}

func newVM(opts vmOpts) (*VM, error) {
func newVM(opts vmOpts, engine *wasmtime.Engine) (*VM, error) {
ctx := context.Background()
v := &VM{}
cfg := wasmtime.NewConfig()
cfg.SetInterruptable(true)
store := wasmtime.NewStore(wasmtime.NewEngineWithConfig(cfg))
v := &VM{engine: engine}
store := wasmtime.NewStore(engine)
memorytype := wasmtime.NewMemoryType(wasmtime.Limits{Min: opts.memoryMin, Max: opts.memoryMax})
memory, err := wasmtime.NewMemory(store, memorytype)
if err != nil {
Expand All @@ -84,24 +83,19 @@ func newVM(opts vmOpts) (*VM, error) {
return nil, err
}

linker := wasmtime.NewLinker(store.Engine)
v.dispatcher = newBuiltinDispatcher()
externs := opaFunctions(v.dispatcher, store)
imports := []wasmtime.AsExtern{}
for _, imp := range module.Type().Imports() {
if imp.Type().MemoryType() != nil {
imports = append(imports, memory)
}
if imp.Type().FuncType() == nil {
continue
}
if ext, ok := externs[*imp.Name()]; ok {
imports = append(imports, ext)
} else {
return nil, fmt.Errorf("cannot provide import %s", *imp.Name())
for name, extern := range externs {
if err := linker.Define("env", name, extern); err != nil {
return nil, fmt.Errorf("linker: env.%s: %w", name, err)
}
}
if err := linker.Define("env", "memory", memory); err != nil {
return nil, fmt.Errorf("linker: env.memory: %w", err)
}

i, err := wasmtime.NewInstance(store, module, imports)
i, err := linker.Instantiate(store, module)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -430,7 +424,7 @@ func (i *VM) SetPolicyData(ctx context.Context, opts vmOpts) error {

if !bytes.Equal(opts.policy, i.policy) {
// Swap the instance to a new one, with new policy.
n, err := newVM(opts)
n, err := newVM(opts, i.engine)
if err != nil {
return err
}
Expand Down

0 comments on commit 968220f

Please sign in to comment.