Skip to content

Commit

Permalink
Add contexts to input bindings
Browse files Browse the repository at this point in the history
Adds support for contexts in input bindings.

This is the last part of the "shutdown sequence fix": it allows shutting down input bindings before output ones.

Signed-off-by: ItalyPaleAle <[email protected]>
  • Loading branch information
ItalyPaleAle committed Jul 12, 2022
1 parent 0742818 commit ab67673
Show file tree
Hide file tree
Showing 93 changed files with 977 additions and 851 deletions.
10 changes: 4 additions & 6 deletions bindings/alicloud/dingtalk/webhook/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (t *DingTalkWebhook) Init(metadata bindings.Metadata) error {
}

// Read triggers the outgoing webhook, not yet production ready.
func (t *DingTalkWebhook) Read(handler bindings.Handler) error {
func (t *DingTalkWebhook) Read(ctx context.Context, handler bindings.Handler) error {
t.logger.Debugf("dingtalk webhook: start read input binding")

webhooks.Lock()
Expand Down Expand Up @@ -218,9 +218,7 @@ func getPostURL(urlPath, secret string) (string, error) {
func sign(secret, timestamp string) (string, error) {
stringToSign := fmt.Sprintf("%s\n%s", timestamp, secret)
h := hmac.New(sha256.New, []byte(secret))
if _, err := io.WriteString(h, stringToSign); err != nil {
return "", fmt.Errorf("sign failed. %w", err)
}

return base64.StdEncoding.EncodeToString(h.Sum(nil)), nil
_, _ = h.Write([]byte(stringToSign))
dgst := h.Sum(nil)
return base64.StdEncoding.EncodeToString(dgst), nil
}
2 changes: 1 addition & 1 deletion bindings/alicloud/dingtalk/webhook/webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestBindingReadAndInvoke(t *testing.T) { //nolint:paralleltest
return nil, nil
}

err = d.Read(handler)
err = d.Read(context.Background(), handler)
require.NoError(t, err)

req := &bindings.InvokeRequest{Data: []byte(msg), Operation: bindings.GetOperation, Metadata: map[string]string{}}
Expand Down
82 changes: 53 additions & 29 deletions bindings/alicloud/nacos/nacos.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"net/url"
"strconv"
"strings"
"sync"
"time"

"github.com/nacos-group/nacos-sdk-go/v2/clients"
Expand Down Expand Up @@ -49,6 +50,7 @@ type configParam struct {
type Nacos struct {
settings Settings
config configParam
watchesLock sync.Mutex
watches []configParam
servers []constant.ServerConfig
logger logger.Logger
Expand All @@ -58,7 +60,10 @@ type Nacos struct {

// NewNacos returns a new Nacos instance.
func NewNacos(logger logger.Logger) *Nacos {
return &Nacos{logger: logger} //nolint:exhaustivestruct
return &Nacos{
logger: logger,
watchesLock: sync.Mutex{},
}
}

// Init implements InputBinding/OutputBinding's Init method.
Expand Down Expand Up @@ -140,19 +145,27 @@ func (n *Nacos) createConfigClient() error {
}

// Read implements InputBinding's Read method.
func (n *Nacos) Read(handler bindings.Handler) error {
func (n *Nacos) Read(ctx context.Context, handler bindings.Handler) error {
n.readHandler = handler

n.watchesLock.Lock()
for _, watch := range n.watches {
go n.startListen(watch)
go n.startListen(ctx, watch)
}
n.watchesLock.Unlock()

go func() {
// Cancel all listeners when the context is done
<-ctx.Done()
n.cancelAllListeners()
}()

return nil
}

// Close implements cancel all listeners, see https://github.com/dapr/components-contrib/issues/779
func (n *Nacos) Close() error {
n.cancelListener()
n.cancelAllListeners()

return nil
}
Expand All @@ -161,9 +174,9 @@ func (n *Nacos) Close() error {
func (n *Nacos) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
switch req.Operation {
case bindings.CreateOperation:
return n.publish(req)
return n.publish(ctx, req)
case bindings.GetOperation:
return n.fetch(req)
return n.fetch(ctx, req)
case bindings.DeleteOperation, bindings.ListOperation:
return nil, fmt.Errorf("nacos error: unsupported operation %s", req.Operation)
default:
Expand All @@ -176,12 +189,12 @@ func (n *Nacos) Operations() []bindings.OperationKind {
return []bindings.OperationKind{bindings.CreateOperation, bindings.GetOperation}
}

func (n *Nacos) startListen(config configParam) {
n.fetchAndNotify(config)
n.addListener(config)
func (n *Nacos) startListen(ctx context.Context, config configParam) {
n.fetchAndNotify(ctx, config)
n.addListener(ctx, config)
}

func (n *Nacos) fetchAndNotify(config configParam) {
func (n *Nacos) fetchAndNotify(ctx context.Context, config configParam) {
content, err := n.configClient.GetConfig(vo.ConfigParam{
DataId: config.dataID,
Group: config.group,
Expand All @@ -190,30 +203,31 @@ func (n *Nacos) fetchAndNotify(config configParam) {
})
if err != nil {
n.logger.Warnf("failed to receive nacos config %s:%s, error: %v", config.dataID, config.group, err)
} else {
n.notifyApp(config.group, config.dataID, content)
return
}
n.notifyApp(ctx, config.group, config.dataID, content)
}

func (n *Nacos) addListener(config configParam) {
func (n *Nacos) addListener(ctx context.Context, config configParam) {
err := n.configClient.ListenConfig(vo.ConfigParam{
DataId: config.dataID,
Group: config.group,
Content: "",
OnChange: n.listener,
OnChange: n.listener(ctx),
})
if err != nil {
n.logger.Warnf("failed to add nacos listener for %s:%s, error: %v", config.dataID, config.group, err)
return
}
}

func (n *Nacos) addListener4InputBinding(config configParam) {
func (n *Nacos) addListenerFoInputBinding(ctx context.Context, config configParam) {
if n.addToWatches(config) {
go n.addListener(config)
go n.addListener(ctx, config)
}
}

func (n *Nacos) publish(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
func (n *Nacos) publish(_ context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
nacosConfigParam, err := n.findConfig(req.Metadata)
if err != nil {
return nil, err
Expand All @@ -231,7 +245,7 @@ func (n *Nacos) publish(req *bindings.InvokeRequest) (*bindings.InvokeResponse,
return nil, nil
}

func (n *Nacos) fetch(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
func (n *Nacos) fetch(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
nacosConfigParam, err := n.findConfig(req.Metadata)
if err != nil {
return nil, err
Expand All @@ -248,13 +262,15 @@ func (n *Nacos) fetch(req *bindings.InvokeRequest) (*bindings.InvokeResponse, er
}

if onchange := req.Metadata[metadataConfigOnchange]; strings.EqualFold(onchange, "true") {
n.addListener4InputBinding(*nacosConfigParam)
n.addListenerFoInputBinding(ctx, *nacosConfigParam)
}

return &bindings.InvokeResponse{Data: []byte(rst), Metadata: map[string]string{}}, nil
}

func (n *Nacos) addToWatches(c configParam) bool {
n.watchesLock.Lock()
defer n.watchesLock.Unlock()
if n.watches != nil {
for _, watch := range n.watches {
if c.dataID == watch.dataID && c.group == watch.group {
Expand Down Expand Up @@ -286,30 +302,38 @@ func (n *Nacos) findConfig(md map[string]string) (*configParam, error) {
return &nacosConfigParam, nil
}

func (n *Nacos) listener(_, group, dataID, data string) {
n.notifyApp(group, dataID, data)
func (n *Nacos) listener(ctx context.Context) func(_, group, dataID, data string) {
return func(_, group, dataID, data string) {
n.notifyApp(ctx, group, dataID, data)
}
}

func (n *Nacos) cancelListener() {
func (n *Nacos) cancelAllListeners() {
n.watchesLock.Lock()
defer n.watchesLock.Unlock()
for _, configParam := range n.watches {
if err := n.configClient.CancelListenConfig(vo.ConfigParam{ //nolint:exhaustivestruct
DataId: configParam.dataID,
Group: configParam.group,
}); err != nil {
if err := n.cancelListener(configParam); err != nil {
n.logger.Warnf("nacos cancel listener failed err: %v", err)
}
}
}

func (n *Nacos) notifyApp(group, dataID, content string) {
func (n *Nacos) cancelListener(configParam configParam) error {
return n.configClient.CancelListenConfig(vo.ConfigParam{
DataId: configParam.dataID,
Group: configParam.group,
})
}

func (n *Nacos) notifyApp(ctx context.Context, group, dataID, content string) {
metadata := map[string]string{
metadataConfigID: dataID,
metadataConfigGroup: group,
}
var err error
if n.readHandler != nil {
n.logger.Debugf("binding-nacos read content to app")
_, err = n.readHandler(context.TODO(), &bindings.ReadResponse{Data: []byte(content), Metadata: metadata})
_, err = n.readHandler(ctx, &bindings.ReadResponse{Data: []byte(content), Metadata: metadata})
} else {
err = errors.New("nacos error: the InputBinding.Read handler not init")
}
Expand Down Expand Up @@ -372,7 +396,7 @@ func convertServers(ss string) ([]constant.ServerConfig, error) {
}

func parseServerURL(s string) (*constant.ServerConfig, error) {
if !strings.HasPrefix(s, "http") {
if !strings.HasPrefix(s, "http://") {
s = "http://" + s
}
u, err := url.Parse(s)
Expand Down
6 changes: 2 additions & 4 deletions bindings/alicloud/nacos/nacos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,8 @@ func TestInputBindingRead(t *testing.T) { //nolint:paralleltest
return nil, nil
}

go func() {
err = n.Read(handler)
require.NoError(t, err)
}()
err = n.Read(context.Background(), handler)
require.NoError(t, err)

select {
case <-ch:
Expand Down
19 changes: 7 additions & 12 deletions bindings/alicloud/oss/oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ package oss
import (
"bytes"
"context"
"encoding/json"

"github.com/aliyun/aliyun-oss-go-sdk/oss"
"github.com/google/uuid"
"github.com/mitchellh/mapstructure"

"github.com/dapr/components-contrib/bindings"
"github.com/dapr/kit/logger"
Expand All @@ -33,10 +33,10 @@ type AliCloudOSS struct {
}

type ossMetadata struct {
Endpoint string `json:"endpoint"`
AccessKeyID string `json:"accessKeyID"`
AccessKey string `json:"accessKey"`
Bucket string `json:"bucket"`
Endpoint string `json:"endpoint" mapstructure:"endpoint"`
AccessKeyID string `json:"accessKeyID" mapstructure:"accessKeyID"`
AccessKey string `json:"accessKey" mapstructure:"accessKey"`
Bucket string `json:"bucket" mapstructure:"bucket"`
}

// NewAliCloudOSS returns a new instance.
Expand Down Expand Up @@ -64,7 +64,7 @@ func (s *AliCloudOSS) Operations() []bindings.OperationKind {
return []bindings.OperationKind{bindings.CreateOperation}
}

func (s *AliCloudOSS) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
func (s *AliCloudOSS) Invoke(_ context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
key := ""
if val, ok := req.Metadata["key"]; ok && val != "" {
key = val
Expand All @@ -88,13 +88,8 @@ func (s *AliCloudOSS) Invoke(ctx context.Context, req *bindings.InvokeRequest) (
}

func (s *AliCloudOSS) parseMetadata(metadata bindings.Metadata) (*ossMetadata, error) {
b, err := json.Marshal(metadata.Properties)
if err != nil {
return nil, err
}

var m ossMetadata
err = json.Unmarshal(b, &m)
err := mapstructure.WeakDecode(metadata.Properties, &m)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit ab67673

Please sign in to comment.