Skip to content

Commit

Permalink
POC proxy plugin
Browse files Browse the repository at this point in the history
Signed-off-by: Billy Zaelani Malik <[email protected]>
  • Loading branch information
minizilla committed Dec 8, 2024
1 parent fb531bf commit 687daae
Show file tree
Hide file tree
Showing 7 changed files with 452 additions and 0 deletions.
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ generate:
go build -buildmode=plugin -o ./transport/http/server/plugin/tests/lura-server-example.so ./transport/http/server/plugin/tests
go build -buildmode=plugin -o ./proxy/plugin/tests/lura-request-modifier-example.so ./proxy/plugin/tests/logger
go build -buildmode=plugin -o ./proxy/plugin/tests/lura-error-example.so ./proxy/plugin/tests/error
go build -buildmode=plugin -o ./proxy/pluginproxy/tests/logger.so ./proxy/pluginproxy/tests/logger
go build -buildmode=plugin -o ./proxy/pluginproxy/tests/error.so ./proxy/pluginproxy/tests/error

test: generate
go test -cover -race ./...
Expand Down
2 changes: 2 additions & 0 deletions proxy/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func (pf defaultFactory) New(cfg *config.EndpointConfig) (p Proxy, err error) {

p = NewPluginMiddleware(pf.logger, cfg)(p)
p = NewStaticMiddleware(pf.logger, cfg)(p)
// p = newPluginProxyMiddleware(pf.logger, cfg)(p) // TODO: call plugin proxy here?
return
}

Expand All @@ -90,6 +91,7 @@ func (pf defaultFactory) newSingle(cfg *config.EndpointConfig) (Proxy, error) {
func (pf defaultFactory) newStack(backend *config.Backend) (p Proxy) {
p = pf.backendFactory(backend)
p = NewBackendPluginMiddleware(pf.logger, backend)(p)
// p = NewBackendPluginProxyMiddleware(pf.logger, backend)(p) // TODO: call plugin proxy here?
p = NewGraphQLMiddleware(pf.logger, backend)(p)
p = NewFilterHeadersMiddleware(pf.logger, backend)(p)
p = NewFilterQueryStringsMiddleware(pf.logger, backend)(p)
Expand Down
14 changes: 14 additions & 0 deletions proxy/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,20 @@ type metadataWrapper struct {
func (m metadataWrapper) Headers() map[string][]string { return m.headers }
func (m metadataWrapper) StatusCode() int { return m.statusCode }

func newResponseWrapper(ctx context.Context, r *Response) *responseWrapper {
return &responseWrapper{
ctx: ctx,
// TODO: not sure how to populate field request
data: r.Data,
isComplete: r.IsComplete,
metadata: metadataWrapper{
headers: r.Metadata.Headers,
statusCode: r.Metadata.StatusCode,
},
io: r.Io,
}
}

type responseWrapper struct {
ctx context.Context
request interface{}
Expand Down
100 changes: 100 additions & 0 deletions proxy/pluginproxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package proxy

import (
"context"
"fmt"

"github.com/luraproject/lura/v2/config"
"github.com/luraproject/lura/v2/logging"
"github.com/luraproject/lura/v2/proxy/pluginproxy"
)

func NewPluginProxyMiddleware(logger logging.Logger, endpoint *config.EndpointConfig) Middleware {
cfg, ok := endpoint.ExtraConfig[pluginproxy.Namespace].(map[string]interface{})
if !ok {
return emptyMiddlewareFallback(logger)
}
return newPluginProxyMiddleware(logger, "ENDPOINT", endpoint.Endpoint, cfg)
}

func NewBackendPluginProxyMiddleware(logger logging.Logger, remote *config.Backend) Middleware {
cfg, ok := remote.ExtraConfig[pluginproxy.Namespace].(map[string]interface{})
if !ok {
return emptyMiddlewareFallback(logger)
}
return newPluginProxyMiddleware(logger, "BACKEND",
fmt.Sprintf("%s %s -> %s", remote.ParentEndpointMethod, remote.ParentEndpoint, remote.URLPattern), cfg)
}

func newPluginProxyMiddleware(logger logging.Logger, tag, pattern string, cfg map[string]interface{}) Middleware {
plugins, ok := cfg["name"].([]interface{})
if !ok {
return emptyMiddlewareFallback(logger)
}

var proxies []pluginproxy.Handler

for _, p := range plugins {
name, ok := p.(string)
if !ok {
continue
}

h, ok := pluginproxy.GetProxy(name)
if !ok {
continue
}
proxies = append(proxies, h)
}

return func(next ...Proxy) Proxy {
if len(next) > 1 {
logger.Fatal("too many proxies for this proxy middleware: newPluginProxyMiddleware only accepts 1 proxy, got %d tag: %s, pattern: %s",
len(next), tag, pattern)
return nil
}

return func(ctx context.Context, r *Request) (*Response, error) {
proxies = append(proxies, func(context.Context, map[string]interface{}, pluginproxy.ProxyWrapper) pluginproxy.ProxyWrapper {
return func(ctx context.Context, rw pluginproxy.RequestWrapper) (pluginproxy.ResponseWrapper, error) {
r := Request{
Method: rw.Method(),
URL: rw.URL(),
Query: rw.Query(),
Path: rw.Path(),
Body: rw.Body(),
Params: rw.Params(),
Headers: rw.Headers(),
}
resp, err := next[0](ctx, &r)
return newResponseWrapper(ctx, resp), err
}
})
return executeProxies(ctx, r, cfg, proxies)
}
}
}

// executeProxies executes all proxies and expecting the last proxies to be not calling again the next proxy.
func executeProxies(ctx context.Context, r *Request, cfg map[string]interface{}, proxies []pluginproxy.Handler) (*Response, error) {
var proxy pluginproxy.ProxyWrapper

for i := len(proxies) - 1; i >= 0; i-- {
proxy = proxies[i](ctx, cfg, proxy)
}

resp, err := proxy(ctx, newRequestWrapper(ctx, r))
if err != nil {
return nil, err
}

return &Response{
Data: resp.Data(),
IsComplete: resp.IsComplete(),
Io: resp.Io(),
Metadata: Metadata{
Headers: resp.Headers(),
StatusCode: resp.StatusCode(),
},
}, nil
}
174 changes: 174 additions & 0 deletions proxy/pluginproxy/proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package pluginproxy

import (
"context"
"fmt"
"io"
"net/url"
"plugin"
"strings"

"github.com/luraproject/lura/v2/logging"
luraplugin "github.com/luraproject/lura/v2/plugin"
"github.com/luraproject/lura/v2/register"
)

const Namespace = "github.com/devopsfaith/krakend/proxy/pluginproxy"

var proxyRegister = register.New()

type (
Handler = func(context.Context, map[string]interface{}, ProxyWrapper) ProxyWrapper
ProxyWrapper = func(context.Context, RequestWrapper) (ResponseWrapper, error)
RequestWrapper = interface {
Params() map[string]string
Headers() map[string][]string
Body() io.ReadCloser
Method() string
URL() *url.URL
Query() url.Values
Path() string
}
ResponseWrapper = interface {
Data() map[string]interface{}
Io() io.Reader
IsComplete() bool
Headers() map[string][]string
StatusCode() int
}
)

func RegisterProxies(name string, handler Handler) {
proxyRegister.Register(Namespace, name, handler)
}

type ProxyRegisterer interface {
RegisterProxies(func(name string, handler Handler))
}

type LoggerRegisterer interface {
RegisterLogger(interface{})
}

type ContextRegisterer interface {
RegisterContext(context.Context)
}

type RegisterProxyFunc func(
name string,
handler Handler,
)

func Load(path, pattern string, rpf RegisterProxyFunc) (int, error) {
return LoadWithLogger(path, pattern, rpf, nil)
}

func LoadWithLogger(path, pattern string, rpf RegisterProxyFunc, logger logging.Logger) (int, error) {
plugins, err := luraplugin.Scan(path, pattern)
if err != nil {
return 0, err
}
return load(plugins, rpf, logger)
}

func load(plugins []string, rpf RegisterProxyFunc, logger logging.Logger) (int, error) {
errors := []error{}
loadedPlugins := 0
for k, pluginName := range plugins {
if err := open(pluginName, rpf, logger); err != nil {
errors = append(errors, fmt.Errorf("plugin #%d (%s): %s", k, pluginName, err.Error()))
continue
}
loadedPlugins++
}

if len(errors) > 0 {
return loadedPlugins, loaderError{errors: errors}
}
return loadedPlugins, nil
}

func open(pluginName string, rpf RegisterProxyFunc, logger logging.Logger) (err error) {
defer func() {
if r := recover(); r != nil {
var ok bool
err, ok = r.(error)
if !ok {
err = fmt.Errorf("%v", r)
}
}
}()

var p Plugin
p, err = pluginOpener(pluginName)
if err != nil {
return
}
var r interface{}
r, err = p.Lookup("ProxyRegisterer")
if err != nil {
return
}
registerer, ok := r.(ProxyRegisterer)
if !ok {
return fmt.Errorf("proxy plugin loader: unknown type")
}

if logger != nil {
if lr, ok := r.(LoggerRegisterer); ok {
lr.RegisterLogger(logger)
}
}

registerer.RegisterProxies(rpf)
return
}

func GetProxy(name string) (Handler, bool) {
r, ok := proxyRegister.Get(Namespace)
if !ok {
return nil, ok
}
p, ok := r.Get(name)
if !ok {
return nil, ok
}
res, ok := p.(Handler)
if !ok {
return nil, ok
}
return res, ok
}

// Plugin is the interface of the loaded plugins
type Plugin interface {
Lookup(name string) (plugin.Symbol, error)
}

// pluginOpener keeps the plugin open function in a var for easy testing
var pluginOpener = defaultPluginOpener

func defaultPluginOpener(name string) (Plugin, error) {
return plugin.Open(name)
}

type loaderError struct {
errors []error
}

// Error implements the error interface
func (l loaderError) Error() string {
msgs := make([]string, len(l.errors))
for i, err := range l.errors {
msgs[i] = err.Error()
}
return fmt.Sprintf("plugin loader found %d error(s): \n%s", len(msgs), strings.Join(msgs, "\n"))
}

func (l loaderError) Len() int {
return len(l.errors)
}

func (l loaderError) Errs() []error {
return l.errors
}
60 changes: 60 additions & 0 deletions proxy/pluginproxy/tests/error/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// SPDX-License-Identifier: Apache-2.0

package main

import (
"context"
"errors"
"io"
"net/http"
"net/url"
)

func main() {}

var ProxyRegisterer = registerer("error")

type (
Handler = func(context.Context, map[string]interface{}, ProxyWrapper) ProxyWrapper
ProxyWrapper = func(context.Context, RequestWrapper) (ResponseWrapper, error)
RequestWrapper = interface {
Params() map[string]string
Headers() map[string][]string
Body() io.ReadCloser
Method() string
URL() *url.URL
Query() url.Values
Path() string
}
ResponseWrapper = interface {
Data() map[string]interface{}
Io() io.Reader
IsComplete() bool
Headers() map[string][]string
StatusCode() int
}
)

type registerer string

func (r registerer) RegisterProxies(f func(name string, handler Handler)) {
f(string(r), r.registerProxies)
}

func (registerer) registerProxies(context.Context, map[string]interface{}, ProxyWrapper) ProxyWrapper {
return func(ctx context.Context, rw RequestWrapper) (ResponseWrapper, error) {
return nil, requestErr
}
}

type customError struct {
error
statusCode int
}

func (r customError) StatusCode() int { return r.statusCode }

var requestErr = customError{
error: errors.New("request rejected just because"),
statusCode: http.StatusTeapot,
}
Loading

0 comments on commit 687daae

Please sign in to comment.