Skip to content

Commit

Permalink
Add contexts to input bindings (#1831)
Browse files Browse the repository at this point in the history
This is the last part of the "shutdown sequence fix": it allows shutting down input bindings before output ones.

Signed-off-by: ItalyPaleAle <[email protected]>
  • Loading branch information
ItalyPaleAle authored Jul 12, 2022
1 parent 0742818 commit d38c786
Show file tree
Hide file tree
Showing 93 changed files with 977 additions and 851 deletions.
10 changes: 4 additions & 6 deletions bindings/alicloud/dingtalk/webhook/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (t *DingTalkWebhook) Init(metadata bindings.Metadata) error {
}

// Read triggers the outgoing webhook, not yet production ready.
func (t *DingTalkWebhook) Read(handler bindings.Handler) error {
func (t *DingTalkWebhook) Read(ctx context.Context, handler bindings.Handler) error {
t.logger.Debugf("dingtalk webhook: start read input binding")

webhooks.Lock()
Expand Down Expand Up @@ -218,9 +218,7 @@ func getPostURL(urlPath, secret string) (string, error) {
func sign(secret, timestamp string) (string, error) {
stringToSign := fmt.Sprintf("%s\n%s", timestamp, secret)
h := hmac.New(sha256.New, []byte(secret))
if _, err := io.WriteString(h, stringToSign); err != nil {
return "", fmt.Errorf("sign failed. %w", err)
}

return base64.StdEncoding.EncodeToString(h.Sum(nil)), nil
_, _ = h.Write([]byte(stringToSign))
dgst := h.Sum(nil)
return base64.StdEncoding.EncodeToString(dgst), nil
}
2 changes: 1 addition & 1 deletion bindings/alicloud/dingtalk/webhook/webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestBindingReadAndInvoke(t *testing.T) { //nolint:paralleltest
return nil, nil
}

err = d.Read(handler)
err = d.Read(context.Background(), handler)
require.NoError(t, err)

req := &bindings.InvokeRequest{Data: []byte(msg), Operation: bindings.GetOperation, Metadata: map[string]string{}}
Expand Down
82 changes: 53 additions & 29 deletions bindings/alicloud/nacos/nacos.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"net/url"
"strconv"
"strings"
"sync"
"time"

"github.com/nacos-group/nacos-sdk-go/v2/clients"
Expand Down Expand Up @@ -49,6 +50,7 @@ type configParam struct {
type Nacos struct {
settings Settings
config configParam
watchesLock sync.Mutex
watches []configParam
servers []constant.ServerConfig
logger logger.Logger
Expand All @@ -58,7 +60,10 @@ type Nacos struct {

// NewNacos returns a new Nacos instance.
func NewNacos(logger logger.Logger) *Nacos {
return &Nacos{logger: logger} //nolint:exhaustivestruct
return &Nacos{
logger: logger,
watchesLock: sync.Mutex{},
}
}

// Init implements InputBinding/OutputBinding's Init method.
Expand Down Expand Up @@ -140,19 +145,27 @@ func (n *Nacos) createConfigClient() error {
}

// Read implements InputBinding's Read method.
func (n *Nacos) Read(handler bindings.Handler) error {
func (n *Nacos) Read(ctx context.Context, handler bindings.Handler) error {
n.readHandler = handler

n.watchesLock.Lock()
for _, watch := range n.watches {
go n.startListen(watch)
go n.startListen(ctx, watch)
}
n.watchesLock.Unlock()

go func() {
// Cancel all listeners when the context is done
<-ctx.Done()
n.cancelAllListeners()
}()

return nil
}

// Close implements cancel all listeners, see https://github.com/dapr/components-contrib/issues/779
func (n *Nacos) Close() error {
n.cancelListener()
n.cancelAllListeners()

return nil
}
Expand All @@ -161,9 +174,9 @@ func (n *Nacos) Close() error {
func (n *Nacos) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
switch req.Operation {
case bindings.CreateOperation:
return n.publish(req)
return n.publish(ctx, req)
case bindings.GetOperation:
return n.fetch(req)
return n.fetch(ctx, req)
case bindings.DeleteOperation, bindings.ListOperation:
return nil, fmt.Errorf("nacos error: unsupported operation %s", req.Operation)
default:
Expand All @@ -176,12 +189,12 @@ func (n *Nacos) Operations() []bindings.OperationKind {
return []bindings.OperationKind{bindings.CreateOperation, bindings.GetOperation}
}

func (n *Nacos) startListen(config configParam) {
n.fetchAndNotify(config)
n.addListener(config)
func (n *Nacos) startListen(ctx context.Context, config configParam) {
n.fetchAndNotify(ctx, config)
n.addListener(ctx, config)
}

func (n *Nacos) fetchAndNotify(config configParam) {
func (n *Nacos) fetchAndNotify(ctx context.Context, config configParam) {
content, err := n.configClient.GetConfig(vo.ConfigParam{
DataId: config.dataID,
Group: config.group,
Expand All @@ -190,30 +203,31 @@ func (n *Nacos) fetchAndNotify(config configParam) {
})
if err != nil {
n.logger.Warnf("failed to receive nacos config %s:%s, error: %v", config.dataID, config.group, err)
} else {
n.notifyApp(config.group, config.dataID, content)
return
}
n.notifyApp(ctx, config.group, config.dataID, content)
}

func (n *Nacos) addListener(config configParam) {
func (n *Nacos) addListener(ctx context.Context, config configParam) {
err := n.configClient.ListenConfig(vo.ConfigParam{
DataId: config.dataID,
Group: config.group,
Content: "",
OnChange: n.listener,
OnChange: n.listener(ctx),
})
if err != nil {
n.logger.Warnf("failed to add nacos listener for %s:%s, error: %v", config.dataID, config.group, err)
return
}
}

func (n *Nacos) addListener4InputBinding(config configParam) {
func (n *Nacos) addListenerFoInputBinding(ctx context.Context, config configParam) {
if n.addToWatches(config) {
go n.addListener(config)
go n.addListener(ctx, config)
}
}

func (n *Nacos) publish(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
func (n *Nacos) publish(_ context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
nacosConfigParam, err := n.findConfig(req.Metadata)
if err != nil {
return nil, err
Expand All @@ -231,7 +245,7 @@ func (n *Nacos) publish(req *bindings.InvokeRequest) (*bindings.InvokeResponse,
return nil, nil
}

func (n *Nacos) fetch(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
func (n *Nacos) fetch(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
nacosConfigParam, err := n.findConfig(req.Metadata)
if err != nil {
return nil, err
Expand All @@ -248,13 +262,15 @@ func (n *Nacos) fetch(req *bindings.InvokeRequest) (*bindings.InvokeResponse, er
}

if onchange := req.Metadata[metadataConfigOnchange]; strings.EqualFold(onchange, "true") {
n.addListener4InputBinding(*nacosConfigParam)
n.addListenerFoInputBinding(ctx, *nacosConfigParam)
}

return &bindings.InvokeResponse{Data: []byte(rst), Metadata: map[string]string{}}, nil
}

func (n *Nacos) addToWatches(c configParam) bool {
n.watchesLock.Lock()
defer n.watchesLock.Unlock()
if n.watches != nil {
for _, watch := range n.watches {
if c.dataID == watch.dataID && c.group == watch.group {
Expand Down Expand Up @@ -286,30 +302,38 @@ func (n *Nacos) findConfig(md map[string]string) (*configParam, error) {
return &nacosConfigParam, nil
}

func (n *Nacos) listener(_, group, dataID, data string) {
n.notifyApp(group, dataID, data)
func (n *Nacos) listener(ctx context.Context) func(_, group, dataID, data string) {
return func(_, group, dataID, data string) {
n.notifyApp(ctx, group, dataID, data)
}
}

func (n *Nacos) cancelListener() {
func (n *Nacos) cancelAllListeners() {
n.watchesLock.Lock()
defer n.watchesLock.Unlock()
for _, configParam := range n.watches {
if err := n.configClient.CancelListenConfig(vo.ConfigParam{ //nolint:exhaustivestruct
DataId: configParam.dataID,
Group: configParam.group,
}); err != nil {
if err := n.cancelListener(configParam); err != nil {
n.logger.Warnf("nacos cancel listener failed err: %v", err)
}
}
}

func (n *Nacos) notifyApp(group, dataID, content string) {
func (n *Nacos) cancelListener(configParam configParam) error {
return n.configClient.CancelListenConfig(vo.ConfigParam{
DataId: configParam.dataID,
Group: configParam.group,
})
}

func (n *Nacos) notifyApp(ctx context.Context, group, dataID, content string) {
metadata := map[string]string{
metadataConfigID: dataID,
metadataConfigGroup: group,
}
var err error
if n.readHandler != nil {
n.logger.Debugf("binding-nacos read content to app")
_, err = n.readHandler(context.TODO(), &bindings.ReadResponse{Data: []byte(content), Metadata: metadata})
_, err = n.readHandler(ctx, &bindings.ReadResponse{Data: []byte(content), Metadata: metadata})
} else {
err = errors.New("nacos error: the InputBinding.Read handler not init")
}
Expand Down Expand Up @@ -372,7 +396,7 @@ func convertServers(ss string) ([]constant.ServerConfig, error) {
}

func parseServerURL(s string) (*constant.ServerConfig, error) {
if !strings.HasPrefix(s, "http") {
if !strings.HasPrefix(s, "http://") {
s = "http://" + s
}
u, err := url.Parse(s)
Expand Down
6 changes: 2 additions & 4 deletions bindings/alicloud/nacos/nacos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,8 @@ func TestInputBindingRead(t *testing.T) { //nolint:paralleltest
return nil, nil
}

go func() {
err = n.Read(handler)
require.NoError(t, err)
}()
err = n.Read(context.Background(), handler)
require.NoError(t, err)

select {
case <-ch:
Expand Down
19 changes: 7 additions & 12 deletions bindings/alicloud/oss/oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ package oss
import (
"bytes"
"context"
"encoding/json"

"github.com/aliyun/aliyun-oss-go-sdk/oss"
"github.com/google/uuid"
"github.com/mitchellh/mapstructure"

"github.com/dapr/components-contrib/bindings"
"github.com/dapr/kit/logger"
Expand All @@ -33,10 +33,10 @@ type AliCloudOSS struct {
}

type ossMetadata struct {
Endpoint string `json:"endpoint"`
AccessKeyID string `json:"accessKeyID"`
AccessKey string `json:"accessKey"`
Bucket string `json:"bucket"`
Endpoint string `json:"endpoint" mapstructure:"endpoint"`
AccessKeyID string `json:"accessKeyID" mapstructure:"accessKeyID"`
AccessKey string `json:"accessKey" mapstructure:"accessKey"`
Bucket string `json:"bucket" mapstructure:"bucket"`
}

// NewAliCloudOSS returns a new instance.
Expand Down Expand Up @@ -64,7 +64,7 @@ func (s *AliCloudOSS) Operations() []bindings.OperationKind {
return []bindings.OperationKind{bindings.CreateOperation}
}

func (s *AliCloudOSS) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
func (s *AliCloudOSS) Invoke(_ context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
key := ""
if val, ok := req.Metadata["key"]; ok && val != "" {
key = val
Expand All @@ -88,13 +88,8 @@ func (s *AliCloudOSS) Invoke(ctx context.Context, req *bindings.InvokeRequest) (
}

func (s *AliCloudOSS) parseMetadata(metadata bindings.Metadata) (*ossMetadata, error) {
b, err := json.Marshal(metadata.Properties)
if err != nil {
return nil, err
}

var m ossMetadata
err = json.Unmarshal(b, &m)
err := mapstructure.WeakDecode(metadata.Properties, &m)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit d38c786

Please sign in to comment.