From e6cc0aa27e8be3e4b133455b3c1085dbff27ee8a Mon Sep 17 00:00:00 2001 From: mmonier Date: Thu, 28 Nov 2019 11:44:36 +0200 Subject: [PATCH 1/4] add kafka and nats executer --- Dockerfile | 8 +- builtin/bins/dkron-executor-kafka/kafka.go | 93 +++++++++++++++++++ .../bins/dkron-executor-kafka/kafka_test.go | 27 ++++++ builtin/bins/dkron-executor-kafka/main.go | 18 ++++ builtin/bins/dkron-executor-nats/main.go | 18 ++++ builtin/bins/dkron-executor-nats/nats.go | 76 +++++++++++++++ builtin/bins/dkron-executor-nats/nats_test.go | 27 ++++++ docker-compose.yml | 2 +- go.mod | 2 + go.sum | 46 +++++++++ 10 files changed, 313 insertions(+), 4 deletions(-) create mode 100644 builtin/bins/dkron-executor-kafka/kafka.go create mode 100644 builtin/bins/dkron-executor-kafka/kafka_test.go create mode 100644 builtin/bins/dkron-executor-kafka/main.go create mode 100644 builtin/bins/dkron-executor-nats/main.go create mode 100644 builtin/bins/dkron-executor-nats/nats.go create mode 100644 builtin/bins/dkron-executor-nats/nats_test.go diff --git a/Dockerfile b/Dockerfile index 4c0e30b22..4d87fb0bf 100644 --- a/Dockerfile +++ b/Dockerfile @@ -2,7 +2,7 @@ FROM golang:1.13 LABEL maintainer="Victor Castell " EXPOSE 8080 8946 - +USER root RUN mkdir -p /app WORKDIR /app @@ -12,6 +12,8 @@ COPY go.sum go.sum RUN go mod download COPY . . -#RUN go install ./... -#CMD ["dkron"] +RUN go install ./... +RUN chmod -R 777 /app + +# CMD ["dkron","agent","--server"] diff --git a/builtin/bins/dkron-executor-kafka/kafka.go b/builtin/bins/dkron-executor-kafka/kafka.go new file mode 100644 index 000000000..cb4b438b4 --- /dev/null +++ b/builtin/bins/dkron-executor-kafka/kafka.go @@ -0,0 +1,93 @@ +package main + +import ( + "errors" + "log" + "github.com/Shopify/sarama" + "github.com/armon/circbuf" + "github.com/distribworks/dkron/v2/dkron" +) + +const ( + // maxBufSize limits how much data we collect from a handler. + // This is to prevent Serf's memory from growing to an enormous + // amount due to a faulty handler. + maxBufSize = 500000 +) + +// Kafka process kafka request +type Kafka struct { +} + +// Execute Process method of the plugin +// "executor": "kafka", +// "executor_config": { +// "url": "http://example.com", // kafka server url +// "message": "", // +// "topic": "publishTopic", // +// } +func (s *Kafka) Execute(args *dkron.ExecuteRequest) (*dkron.ExecuteResponse, error) { + + out, err := s.ExecuteImpl(args) + resp := &dkron.ExecuteResponse{Output: out} + if err != nil { + resp.Error = err.Error() + } + return resp, nil +} + +// ExecuteImpl do http request +func (s *Kafka) ExecuteImpl(args *dkron.ExecuteRequest) ([]byte, error) { + + output, _ := circbuf.NewBuffer(maxBufSize) + + var debug bool + if args.Config["debug"] != "" { + debug = true + log.Printf("config %#v\n\n", args.Config) + } + + if args.Config["url"] == "" { + + return output.Bytes(), errors.New("url is empty") + } + + if args.Config["topic"] == "" { + return output.Bytes(), errors.New("topic is empty") + } + config := sarama.NewConfig() + config.Producer.RequiredAcks = sarama.WaitForAll + config.Producer.Retry.Max = 5 + config.Producer.Return.Successes = true + config.Producer.Return.Errors = true + + // brokers := []string{"192.168.59.103:9092"} + brokers := []string{args.Config["url"]} + producer, err := sarama.NewSyncProducer(brokers, config) + if err != nil { + // Should not reach here + + if debug { + log.Printf("request %#v\n\n", config) + } + return output.Bytes(), err + } + + topic := args.Config["topic"] + msg := &sarama.ProducerMessage{ + Topic: topic, + Value: sarama.StringEncoder(args.Config["message"]), + } + + _, _, err = producer.SendMessage(msg) + + if err != nil { + return output.Bytes(), err + } + defer func() { + producer.Close() + }() + + output.Write([]byte("Result: success to publish data\n")) + return output.Bytes(), nil +} diff --git a/builtin/bins/dkron-executor-kafka/kafka_test.go b/builtin/bins/dkron-executor-kafka/kafka_test.go new file mode 100644 index 000000000..fc4378807 --- /dev/null +++ b/builtin/bins/dkron-executor-kafka/kafka_test.go @@ -0,0 +1,27 @@ +package main + +import ( + "fmt" + "testing" + + "github.com/distribworks/dkron/v2/dkron" +) + +func TestPublishExecute(t *testing.T) { + pa := &dkron.ExecuteRequest{ + JobName: "testJob", + Config: map[string]string{ + "topic": "test", + "url": "tesr", + "message": "{\"hello\":11}", + "debug": "true", + }, + } + kafka := &Kafka{} + output, err := kafka.Execute(pa) + fmt.Println(string(output.Output)) + fmt.Println(err) + if err != nil { + t.Fatal(err) + } +} diff --git a/builtin/bins/dkron-executor-kafka/main.go b/builtin/bins/dkron-executor-kafka/main.go new file mode 100644 index 000000000..a298e098b --- /dev/null +++ b/builtin/bins/dkron-executor-kafka/main.go @@ -0,0 +1,18 @@ +package main + +import ( + dkplugin "github.com/distribworks/dkron/v2/plugin" + "github.com/hashicorp/go-plugin" +) + +func main() { + plugin.Serve(&plugin.ServeConfig{ + HandshakeConfig: dkplugin.Handshake, + Plugins: map[string]plugin.Plugin{ + "executor": &dkplugin.ExecutorPlugin{Executor: &Kafka{}}, + }, + + // A non-nil value here enables gRPC serving for this plugin... + GRPCServer: plugin.DefaultGRPCServer, + }) +} diff --git a/builtin/bins/dkron-executor-nats/main.go b/builtin/bins/dkron-executor-nats/main.go new file mode 100644 index 000000000..1589d9513 --- /dev/null +++ b/builtin/bins/dkron-executor-nats/main.go @@ -0,0 +1,18 @@ +package main + +import ( + dkplugin "github.com/distribworks/dkron/v2/plugin" + "github.com/hashicorp/go-plugin" +) + +func main() { + plugin.Serve(&plugin.ServeConfig{ + HandshakeConfig: dkplugin.Handshake, + Plugins: map[string]plugin.Plugin{ + "executor": &dkplugin.ExecutorPlugin{Executor: &Nats{}}, + }, + + // A non-nil value here enables gRPC serving for this plugin... + GRPCServer: plugin.DefaultGRPCServer, + }) +} diff --git a/builtin/bins/dkron-executor-nats/nats.go b/builtin/bins/dkron-executor-nats/nats.go new file mode 100644 index 000000000..2606a4eb9 --- /dev/null +++ b/builtin/bins/dkron-executor-nats/nats.go @@ -0,0 +1,76 @@ +package main + +import ( + "errors" + "log" + + "github.com/armon/circbuf" + "github.com/distribworks/dkron/v2/dkron" + "github.com/nats-io/nats.go" +) + +const ( + // maxBufSize limits how much data we collect from a handler. + // This is to prevent Serf's memory from growing to an enormous + // amount due to a faulty handler. + maxBufSize = 500000 +) + +// Nats process http request +type Nats struct { +} + +// Execute Process method of the plugin +// "executor": "nats", +// "executor_config": { +// "url": "http://example.com", // nats server url +// "message": "", // +// "topic": "publishTopic", // +// "userName":"test@hbh.dfg", +// "password":"dfdffs" +// } +func (s *Nats) Execute(args *dkron.ExecuteRequest) (*dkron.ExecuteResponse, error) { + + out, err := s.ExecuteImpl(args) + resp := &dkron.ExecuteResponse{Output: out} + if err != nil { + resp.Error = err.Error() + } + return resp, nil +} + +// ExecuteImpl do http request +func (s *Nats) ExecuteImpl(args *dkron.ExecuteRequest) ([]byte, error) { + + output, _ := circbuf.NewBuffer(maxBufSize) + + var debug bool + if args.Config["debug"] != "" { + debug = true + log.Printf("config %#v\n\n", args.Config) + } + + if args.Config["url"] == "" { + + return output.Bytes(), errors.New("url is empty") + } + + if args.Config["topic"] == "" { + return output.Bytes(), errors.New("topic is empty") + } + nc, err := nats.Connect(args.Config["url"], nats.UserInfo(args.Config["userName"], args.Config["password"])) + + if err != nil { + return output.Bytes(), errors.New("Error At Nats Connection") + } + + nc.Publish(args.Config["topic"], []byte(args.Config["message"])) + + output.Write([]byte("Result: success to publish data\n")) + + if debug { + log.Printf("request %#v\n\n", nc) + } + + return output.Bytes(), nil +} diff --git a/builtin/bins/dkron-executor-nats/nats_test.go b/builtin/bins/dkron-executor-nats/nats_test.go new file mode 100644 index 000000000..30d8507fe --- /dev/null +++ b/builtin/bins/dkron-executor-nats/nats_test.go @@ -0,0 +1,27 @@ +package main + +import ( + "fmt" + "testing" + + "github.com/distribworks/dkron/v2/dkron" +) + +func TestPublishExecute(t *testing.T) { + pa := &dkron.ExecuteRequest{ + JobName: "testJob", + Config: map[string]string{ + "topic": "opcuaReadRequest", + "url": "localhost:4222", + "message": "{\"hello\":11}", + "debug": "true", + }, + } + nats := &Nats{} + output, err := nats.Execute(pa) + fmt.Println(string(output.Output)) + fmt.Println(err) + if err != nil { + t.Fatal(err) + } +} diff --git a/docker-compose.yml b/docker-compose.yml index f13e28687..665002ff4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -25,4 +25,4 @@ services: - "8946" environment: - GODEBUG=netdns=go - command: dkron agent --retry-join=dkron:8946 --log-level=debug + command: dkron agent --retry-join=dkron:8946 --log-level=debug --tag="my_role=agent" diff --git a/go.mod b/go.mod index 3c565adf1..200d7871f 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,7 @@ module github.com/distribworks/dkron/v2 require ( github.com/DataDog/datadog-go v0.0.0-20170427165718-0ddda6bee211 // indirect + github.com/Shopify/sarama v1.24.1 github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da github.com/aws/aws-sdk-go v1.16.23 // indirect @@ -33,6 +34,7 @@ require ( github.com/kardianos/osext v0.0.0-20170510131534-ae77be60afb1 github.com/mattn/go-shellwords v0.0.0-20160315040826-525bedee691b github.com/mitchellh/go-testing-interface v1.0.0 // indirect + github.com/nats-io/nats.go v1.9.1 github.com/pascaldekloe/goe v0.1.0 // indirect github.com/ryanuber/columnize v2.1.0+incompatible github.com/sirupsen/logrus v1.2.0 diff --git a/go.sum b/go.sum index c06b0bcfa..70153fa2a 100644 --- a/go.sum +++ b/go.sum @@ -12,6 +12,9 @@ github.com/DataDog/zstd v1.4.1 h1:3oxKN3wbHibqx897utPC2LTQU4J+IHWWJO+glkAkpFM= github.com/DataDog/zstd v1.4.1/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= +github.com/Shopify/sarama v1.24.1 h1:svn9vfN3R1Hz21WR2Gj0VW9ehaDGkiOS+VqlIcZOkMI= +github.com/Shopify/sarama v1.24.1/go.mod h1:fGP8eQ6PugKEI0iUETYYtnP6d1pH/bdDMTel1X5ajsU= +github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/abdullin/seq v0.0.0-20160510034733-d5467c17e7af h1:DBNMBMuMiWYu0b+8KMJuWmfCkcxl09JwdlqwDZZ6U14= github.com/abdullin/seq v0.0.0-20160510034733-d5467c17e7af/go.mod h1:5Jv4cbFiHJMsVxt52+i0Ha45fjshj6wxYr1r19tB9bw= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e h1:QEF07wC0T1rKkctt1RINW/+RMTVmiwxETico2l3gxJA= @@ -55,7 +58,15 @@ github.com/dnaeon/go-vcr v1.0.1 h1:r8L/HqC0Hje5AXMu1ooW8oyQyOFv4GxqpL0nRP7SLLY= github.com/dnaeon/go-vcr v1.0.1/go.mod h1:aBB1+wY4s93YsC3HHjMBMrwTj2R9FHDzUr9KyGc8n1E= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/eapache/go-resiliency v1.1.0 h1:1NtRmCAqadE2FN4ZcN6g90TP3uk8cg9rn9eNK2197aU= +github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/frankban/quicktest v1.4.1/go.mod h1:36zfPVQyHxymz4cH7wlDmVwDrJuljRB60qkgn7rorfQ= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= @@ -87,6 +98,7 @@ github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c h1:964Od4U6p2jUkFxvCydnIczKteheJEzHRToSGK3Bnlw= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-querystring v0.0.0-20170111101155-53e6ce116135 h1:zLTLjkaOFEFIOxY5BWLFLwh+cL8vOBW4XJ2aqLE/Tf0= github.com/google/go-querystring v0.0.0-20170111101155-53e6ce116135/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf h1:+RRA9JqSOZFfKrOeqr2z77+8R2RKyh8PG66dcu1V0ck= @@ -155,6 +167,8 @@ github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NH github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/jarcoal/httpmock v0.0.0-20180424175123-9c70cfe4a1da h1:FjHUJJ7oBW4G/9j1KzlHaXL09LyMVM9rupS39lncbXk= github.com/jarcoal/httpmock v0.0.0-20180424175123-9c70cfe4a1da/go.mod h1:ks+b9deReOc7jgqp+e7LuFiCBH6Rm5hL32cLcEAArb4= +github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03 h1:FUwcHNlEqkqLjLBdCp5PRlCFijNjvcYANOZXzCfXwCM= +github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= @@ -169,6 +183,8 @@ github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCV github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/kardianos/osext v0.0.0-20170510131534-ae77be60afb1 h1:PJPDf8OUfOK1bb/NeTKd4f1QXZItOX389VN3B6qC8ro= github.com/kardianos/osext v0.0.0-20170510131534-ae77be60afb1/go.mod h1:1NbS8ALrpOvjt0rHPNLyCIeMtbizbir8U//inJ+zuB8= +github.com/klauspost/compress v1.8.2 h1:Bx0qjetmNjdFXASH02NSAREKpiaDwkO1DRZ3dV2KCcs= +github.com/klauspost/compress v1.8.2/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= @@ -206,6 +222,14 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/nats-io/jwt v0.3.0 h1:xdnzwFETV++jNc4W1mw//qFyJGb2ABOombmZJQS4+Qo= +github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg= +github.com/nats-io/nats.go v1.9.1 h1:ik3HbLhZ0YABLto7iX80pZLPw/6dx3T+++MZJwLnMrQ= +github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w= +github.com/nats-io/nkeys v0.1.0 h1:qMd4+pRHgdr1nAClu+2h/2a5F2TmKcCzjCDazVgRoX4= +github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nicolai86/scaleway-sdk v1.10.2-0.20180628010248-798f60e20bb2 h1:BQ1HW7hr4IVovMwWg0E0PYcyW8CzqDcVmaew9cujU4s= github.com/nicolai86/scaleway-sdk v1.10.2-0.20180628010248-798f60e20bb2/go.mod h1:TLb2Sg7HQcgGdloNxkrmtgDNR9uVYF3lfdFIN4Ro6Sk= github.com/oklog/run v1.0.0 h1:Ru7dDtJNOyC66gQ5dQmaCa0qIsAUFY3sFpK1Xk8igrw= @@ -222,12 +246,16 @@ github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181 github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/peterbourgon/diskv v2.0.1+incompatible h1:UBdAOUP5p4RWqPBg048CAvpKN+vxiaj6gdUUzhl4XmI= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= +github.com/pierrec/lz4 v2.2.6+incompatible h1:6aCX4/YZ9v8q69hTyiR7dNLnTA3fgtKHVVW5BCd5Znw= +github.com/pierrec/lz4 v2.2.6+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= +github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ= +github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/renier/xmlrpc v0.0.0-20170708154548-ce4a1a486c03 h1:Wdi9nwnhFNAlseAOekn6B5G/+GMtks9UKbvRU/CMM/o= github.com/renier/xmlrpc v0.0.0-20170708154548-ce4a1a486c03/go.mod h1:gRAiPF5C5Nd0eyyRdqIu9qTiFSoZzpTq727b5B8fkkU= github.com/russross/blackfriday v1.5.2 h1:HyvC0ARfnZBqnXwABFeSZHpKvJHJJfPz81GNueLj0oo= @@ -281,12 +309,18 @@ github.com/ugorji/go/codec v1.1.5-pre h1:5YV9PsFAN+ndcCtTM7s60no7nY7eTG3LPtxhSwu github.com/ugorji/go/codec v1.1.5-pre/go.mod h1:tULtS6Gy1AE1yCENaw4Vb//HLH5njI2tfCQDUqRd8fI= github.com/vmware/govmomi v0.18.0 h1:f7QxSmP7meCtoAmiKZogvVbLInT+CZx6Px6K5rYsJZo= github.com/vmware/govmomi v0.18.0/go.mod h1:URlwyTFZX72RmxtxuaFL2Uj3fD1JTvZdx59bHWk6aFU= +github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= +github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5 h1:bselrhR0Or1vomJZC8ZIjWtbDmn9OYFLX5Ik9alpJpE= +golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= +golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 h1:HuIa8hRrWRSrqYzx1qI49NNxhdi2PrY7gxVSq1JjLDc= +golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= @@ -298,6 +332,7 @@ golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd h1:HuTn7WObtcDo9uEEU7rEqL0jYthdXAmZ6PP+meazmaU= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/oauth2 v0.0.0-20170807180024-9a379c6b3e95/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -320,6 +355,8 @@ golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190129075346-302c3dd5f1cc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb h1:fgwFCsaw9buMuxNd6+DQfAuSFqbNiQZpcgJQAgJsK6k= golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= @@ -357,6 +394,15 @@ gopkg.in/go-playground/validator.v8 v8.18.2 h1:lFB4DoMU6B626w8ny76MV7VX6W2VHct2G gopkg.in/go-playground/validator.v8 v8.18.2/go.mod h1:RX2a/7Ha8BgOhfk7j780h4/u/RRjR0eouCJSH80/M2Y= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= +gopkg.in/jcmturner/aescts.v1 v1.0.1 h1:cVVZBK2b1zY26haWB4vbBiZrfFQnfbTVrE3xZq6hrEw= +gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo= +gopkg.in/jcmturner/dnsutils.v1 v1.0.1 h1:cIuC1OLRGZrld+16ZJvvZxVJeKPsvd5eUIvxfoN5hSM= +gopkg.in/jcmturner/dnsutils.v1 v1.0.1/go.mod h1:m3v+5svpVOhtFAP/wSz+yzh4Mc0Fg7eRhxkJMWSIz9Q= +gopkg.in/jcmturner/goidentity.v3 v3.0.0/go.mod h1:oG2kH0IvSYNIu80dVAyu/yoefjq1mNfM5bm88whjWx4= +gopkg.in/jcmturner/gokrb5.v7 v7.2.3 h1:hHMV/yKPwMnJhPuPx7pH2Uw/3Qyf+thJYlisUc44010= +gopkg.in/jcmturner/gokrb5.v7 v7.2.3/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM= +gopkg.in/jcmturner/rpc.v1 v1.1.0 h1:QHIUxTX1ISuAv9dD2wJ9HWQVuWDX/Zc0PfeC2tjc4rU= +gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8= gopkg.in/resty.v1 v1.12.0 h1:CuXP0Pjfw9rOuY6EP+UvtNvt5DSqHpIxILZKT/quCZI= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= From 27246e7106aaeb4d4ff05b575be411153b7d62f7 Mon Sep 17 00:00:00 2001 From: Yuri van Oers Date: Fri, 20 Nov 2020 22:51:26 +0100 Subject: [PATCH 2/4] Update nats and kafka interfaces to dkron v3 --- Dockerfile | 2 +- builtin/bins/dkron-executor-kafka/kafka.go | 11 +++++++---- builtin/bins/dkron-executor-kafka/kafka_test.go | 6 +++--- builtin/bins/dkron-executor-kafka/main.go | 2 +- builtin/bins/dkron-executor-nats/main.go | 2 +- builtin/bins/dkron-executor-nats/nats.go | 10 ++++++---- builtin/bins/dkron-executor-nats/nats_test.go | 6 +++--- docker-compose.yml | 2 +- go.mod | 2 ++ go.sum | 11 +++++++++++ 10 files changed, 36 insertions(+), 18 deletions(-) diff --git a/Dockerfile b/Dockerfile index fde985410..9047a518d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -2,7 +2,7 @@ FROM golang:1.15 LABEL maintainer="Victor Castell " EXPOSE 8080 8946 -USER root + RUN mkdir -p /app WORKDIR /app diff --git a/builtin/bins/dkron-executor-kafka/kafka.go b/builtin/bins/dkron-executor-kafka/kafka.go index cb4b438b4..b12ee0ad5 100644 --- a/builtin/bins/dkron-executor-kafka/kafka.go +++ b/builtin/bins/dkron-executor-kafka/kafka.go @@ -3,9 +3,12 @@ package main import ( "errors" "log" + "github.com/Shopify/sarama" "github.com/armon/circbuf" - "github.com/distribworks/dkron/v2/dkron" + + dkplugin "github.com/distribworks/dkron/v3/plugin" + dktypes "github.com/distribworks/dkron/v3/plugin/types" ) const ( @@ -26,10 +29,10 @@ type Kafka struct { // "message": "", // // "topic": "publishTopic", // // } -func (s *Kafka) Execute(args *dkron.ExecuteRequest) (*dkron.ExecuteResponse, error) { +func (s *Kafka) Execute(args *dktypes.ExecuteRequest, cb dkplugin.StatusHelper) (*dktypes.ExecuteResponse, error) { out, err := s.ExecuteImpl(args) - resp := &dkron.ExecuteResponse{Output: out} + resp := &dktypes.ExecuteResponse{Output: out} if err != nil { resp.Error = err.Error() } @@ -37,7 +40,7 @@ func (s *Kafka) Execute(args *dkron.ExecuteRequest) (*dkron.ExecuteResponse, err } // ExecuteImpl do http request -func (s *Kafka) ExecuteImpl(args *dkron.ExecuteRequest) ([]byte, error) { +func (s *Kafka) ExecuteImpl(args *dktypes.ExecuteRequest) ([]byte, error) { output, _ := circbuf.NewBuffer(maxBufSize) diff --git a/builtin/bins/dkron-executor-kafka/kafka_test.go b/builtin/bins/dkron-executor-kafka/kafka_test.go index fc4378807..1a746307a 100644 --- a/builtin/bins/dkron-executor-kafka/kafka_test.go +++ b/builtin/bins/dkron-executor-kafka/kafka_test.go @@ -4,11 +4,11 @@ import ( "fmt" "testing" - "github.com/distribworks/dkron/v2/dkron" + dktypes "github.com/distribworks/dkron/v3/plugin/types" ) func TestPublishExecute(t *testing.T) { - pa := &dkron.ExecuteRequest{ + pa := &dktypes.ExecuteRequest{ JobName: "testJob", Config: map[string]string{ "topic": "test", @@ -18,7 +18,7 @@ func TestPublishExecute(t *testing.T) { }, } kafka := &Kafka{} - output, err := kafka.Execute(pa) + output, err := kafka.Execute(pa, nil) fmt.Println(string(output.Output)) fmt.Println(err) if err != nil { diff --git a/builtin/bins/dkron-executor-kafka/main.go b/builtin/bins/dkron-executor-kafka/main.go index a298e098b..5754d0499 100644 --- a/builtin/bins/dkron-executor-kafka/main.go +++ b/builtin/bins/dkron-executor-kafka/main.go @@ -1,7 +1,7 @@ package main import ( - dkplugin "github.com/distribworks/dkron/v2/plugin" + dkplugin "github.com/distribworks/dkron/v3/plugin" "github.com/hashicorp/go-plugin" ) diff --git a/builtin/bins/dkron-executor-nats/main.go b/builtin/bins/dkron-executor-nats/main.go index 1589d9513..d0c63db8c 100644 --- a/builtin/bins/dkron-executor-nats/main.go +++ b/builtin/bins/dkron-executor-nats/main.go @@ -1,7 +1,7 @@ package main import ( - dkplugin "github.com/distribworks/dkron/v2/plugin" + dkplugin "github.com/distribworks/dkron/v3/plugin" "github.com/hashicorp/go-plugin" ) diff --git a/builtin/bins/dkron-executor-nats/nats.go b/builtin/bins/dkron-executor-nats/nats.go index 2606a4eb9..f8c398f2e 100644 --- a/builtin/bins/dkron-executor-nats/nats.go +++ b/builtin/bins/dkron-executor-nats/nats.go @@ -5,8 +5,10 @@ import ( "log" "github.com/armon/circbuf" - "github.com/distribworks/dkron/v2/dkron" "github.com/nats-io/nats.go" + + dkplugin "github.com/distribworks/dkron/v3/plugin" + dktypes "github.com/distribworks/dkron/v3/plugin/types" ) const ( @@ -29,10 +31,10 @@ type Nats struct { // "userName":"test@hbh.dfg", // "password":"dfdffs" // } -func (s *Nats) Execute(args *dkron.ExecuteRequest) (*dkron.ExecuteResponse, error) { +func (s *Nats) Execute(args *dktypes.ExecuteRequest, cb dkplugin.StatusHelper) (*dktypes.ExecuteResponse, error) { out, err := s.ExecuteImpl(args) - resp := &dkron.ExecuteResponse{Output: out} + resp := &dktypes.ExecuteResponse{Output: out} if err != nil { resp.Error = err.Error() } @@ -40,7 +42,7 @@ func (s *Nats) Execute(args *dkron.ExecuteRequest) (*dkron.ExecuteResponse, erro } // ExecuteImpl do http request -func (s *Nats) ExecuteImpl(args *dkron.ExecuteRequest) ([]byte, error) { +func (s *Nats) ExecuteImpl(args *dktypes.ExecuteRequest) ([]byte, error) { output, _ := circbuf.NewBuffer(maxBufSize) diff --git a/builtin/bins/dkron-executor-nats/nats_test.go b/builtin/bins/dkron-executor-nats/nats_test.go index 30d8507fe..94a4889da 100644 --- a/builtin/bins/dkron-executor-nats/nats_test.go +++ b/builtin/bins/dkron-executor-nats/nats_test.go @@ -4,11 +4,11 @@ import ( "fmt" "testing" - "github.com/distribworks/dkron/v2/dkron" + dktypes "github.com/distribworks/dkron/v3/plugin/types" ) func TestPublishExecute(t *testing.T) { - pa := &dkron.ExecuteRequest{ + pa := &dktypes.ExecuteRequest{ JobName: "testJob", Config: map[string]string{ "topic": "opcuaReadRequest", @@ -18,7 +18,7 @@ func TestPublishExecute(t *testing.T) { }, } nats := &Nats{} - output, err := nats.Execute(pa) + output, err := nats.Execute(pa, nil) fmt.Println(string(output.Output)) fmt.Println(err) if err != nil { diff --git a/docker-compose.yml b/docker-compose.yml index 15cf17d3b..851dc4149 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -26,4 +26,4 @@ services: - "6868" environment: - GODEBUG=netdns=go - command: dkron agent --retry-join=dkron:8946 --log-level=debug --tag="my_role=agent" + command: dkron agent --retry-join=dkron:8946 --log-level=debug diff --git a/go.mod b/go.mod index 0a7713c70..b21d06e95 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,7 @@ module github.com/distribworks/dkron/v3 require ( github.com/DataDog/datadog-go v4.0.0+incompatible // indirect + github.com/Shopify/sarama v1.19.0 github.com/armon/circbuf v0.0.0-20190214190532-5111143e8da2 github.com/armon/go-metrics v0.3.4 github.com/aws/aws-sdk-go v1.34.17 // indirect @@ -29,6 +30,7 @@ require ( github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 github.com/mattn/go-shellwords v1.0.10 github.com/mitchellh/go-testing-interface v1.14.1 // indirect + github.com/nats-io/nats.go v1.9.1 github.com/philhofer/fwd v1.0.0 // indirect github.com/prometheus/client_golang v1.8.0 github.com/robfig/cron/v3 v3.0.1 diff --git a/go.sum b/go.sum index c6c4b4e84..86a450bed 100644 --- a/go.sum +++ b/go.sum @@ -30,6 +30,7 @@ github.com/DataDog/datadog-go v4.0.0+incompatible h1:Dq8Dr+4sV1gBO1sHDWdW+4G+Pds github.com/DataDog/datadog-go v4.0.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= +github.com/Shopify/sarama v1.19.0 h1:9oksLxC6uxVPHPVYUmq6xhr1BOF/hHobWH2UzO67z1s= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g= @@ -121,8 +122,11 @@ github.com/distribworks/dkron/v2 v2.2.2/go.mod h1:xVjBklxBOSI9Zq4a/DV40T++j9LI2A github.com/dnaeon/go-vcr v1.0.1 h1:r8L/HqC0Hje5AXMu1ooW8oyQyOFv4GxqpL0nRP7SLLY= github.com/dnaeon/go-vcr v1.0.1/go.mod h1:aBB1+wY4s93YsC3HHjMBMrwTj2R9FHDzUr9KyGc8n1E= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/eapache/go-resiliency v1.1.0 h1:1NtRmCAqadE2FN4ZcN6g90TP3uk8cg9rn9eNK2197aU= github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g= @@ -211,6 +215,7 @@ github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0 github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0 h1:0udJVsspx3VBr5FwtLhQQtuAsVc79tTq0ocGIPAU6qo= @@ -464,11 +469,15 @@ github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3Rllmb github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg= +github.com/nats-io/jwt v0.3.2 h1:+RB5hMpXUUA2dfxuhBTEkMOrYmM+gKIZYS1KjSostMI= github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU= github.com/nats-io/nats-server/v2 v2.1.2/go.mod h1:Afk+wRZqkMQs/p45uXdrVLuab3gwv3Z8C4HTBu8GD/k= +github.com/nats-io/nats.go v1.9.1 h1:ik3HbLhZ0YABLto7iX80pZLPw/6dx3T+++MZJwLnMrQ= github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w= github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= +github.com/nats-io/nkeys v0.1.3 h1:6JrEfig+HzTH85yxzhSVbjHRJv9cn0p6n3IngIcM5/k= github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nicolai86/scaleway-sdk v1.10.2-0.20180628010248-798f60e20bb2 h1:BQ1HW7hr4IVovMwWg0E0PYcyW8CzqDcVmaew9cujU4s= github.com/nicolai86/scaleway-sdk v1.10.2-0.20180628010248-798f60e20bb2/go.mod h1:TLb2Sg7HQcgGdloNxkrmtgDNR9uVYF3lfdFIN4Ro6Sk= @@ -505,6 +514,7 @@ github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR github.com/philhofer/fwd v1.0.0 h1:UbZqGr5Y38ApvM/V/jEljVxwocdweyH+vmYvRPBnbqQ= github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc= +github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -560,6 +570,7 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O github.com/prometheus/procfs v0.2.0 h1:wH4vA7pcjKuZzjF7lM8awk4fnuJO6idemZXoKnULUx4= github.com/prometheus/procfs v0.2.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= +github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/renier/xmlrpc v0.0.0-20170708154548-ce4a1a486c03 h1:Wdi9nwnhFNAlseAOekn6B5G/+GMtks9UKbvRU/CMM/o= github.com/renier/xmlrpc v0.0.0-20170708154548-ce4a1a486c03/go.mod h1:gRAiPF5C5Nd0eyyRdqIu9qTiFSoZzpTq727b5B8fkkU= From bcc8874b7d74eadf45955be59785a59aa9f92445 Mon Sep 17 00:00:00 2001 From: Yuri van Oers Date: Tue, 2 Mar 2021 22:39:03 +0100 Subject: [PATCH 3/4] Add nats and kafka executors to goreleaser Add documentation for nats and kafka executors Minor improvements to naming, log messages and documentation --- .goreleaser.yml | 10 ++++++ builtin/bins/dkron-executor-kafka/kafka.go | 26 ++++++-------- .../bins/dkron-executor-kafka/kafka_test.go | 10 +++--- builtin/bins/dkron-executor-nats/nats.go | 16 ++++----- builtin/bins/dkron-executor-nats/nats_test.go | 2 +- website/content/usage/executors/_index.md | 4 +-- website/content/usage/executors/http.md | 18 +++++----- website/content/usage/executors/kafka.md | 28 +++++++++++++++ website/content/usage/executors/nats.md | 35 +++++++++++++++++++ 9 files changed, 109 insertions(+), 40 deletions(-) create mode 100644 website/content/usage/executors/kafka.md create mode 100644 website/content/usage/executors/nats.md diff --git a/.goreleaser.yml b/.goreleaser.yml index 21aeb3e7e..ca26b537b 100644 --- a/.goreleaser.yml +++ b/.goreleaser.yml @@ -34,6 +34,16 @@ builds: id: dkron-executor-rabbitmq binary: dkron-executor-rabbitmq + - <<: *xbuild + main: ./builtin/bins/dkron-executor-nats/ + id: dkron-executor-nats + binary: dkron-executor-nats + + - <<: *xbuild + main: ./builtin/bins/dkron-executor-kafka/ + id: dkron-executor-kafka + binary: dkron-executor-nats + - <<: *xbuild main: ./builtin/bins/dkron-processor-files/ id: dkron-processor-files diff --git a/builtin/bins/dkron-executor-kafka/kafka.go b/builtin/bins/dkron-executor-kafka/kafka.go index b12ee0ad5..6dea74950 100644 --- a/builtin/bins/dkron-executor-kafka/kafka.go +++ b/builtin/bins/dkron-executor-kafka/kafka.go @@ -25,9 +25,9 @@ type Kafka struct { // Execute Process method of the plugin // "executor": "kafka", // "executor_config": { -// "url": "http://example.com", // kafka server url -// "message": "", // -// "topic": "publishTopic", // +// "brokerAddress": "192.168.59.103:9092", // kafka broker url +// "message": "", +// "topic": "publishTopic" // } func (s *Kafka) Execute(args *dktypes.ExecuteRequest, cb dkplugin.StatusHelper) (*dktypes.ExecuteResponse, error) { @@ -39,7 +39,7 @@ func (s *Kafka) Execute(args *dktypes.ExecuteRequest, cb dkplugin.StatusHelper) return resp, nil } -// ExecuteImpl do http request +// ExecuteImpl produce message on Kafka broker func (s *Kafka) ExecuteImpl(args *dktypes.ExecuteRequest) ([]byte, error) { output, _ := circbuf.NewBuffer(maxBufSize) @@ -50,9 +50,9 @@ func (s *Kafka) ExecuteImpl(args *dktypes.ExecuteRequest) ([]byte, error) { log.Printf("config %#v\n\n", args.Config) } - if args.Config["url"] == "" { + if args.Config["brokerAddress"] == "" { - return output.Bytes(), errors.New("url is empty") + return output.Bytes(), errors.New("brokerAddress is empty") } if args.Config["topic"] == "" { @@ -64,21 +64,20 @@ func (s *Kafka) ExecuteImpl(args *dktypes.ExecuteRequest) ([]byte, error) { config.Producer.Return.Successes = true config.Producer.Return.Errors = true - // brokers := []string{"192.168.59.103:9092"} - brokers := []string{args.Config["url"]} + brokers := []string{args.Config["brokerAddress"]} producer, err := sarama.NewSyncProducer(brokers, config) if err != nil { // Should not reach here if debug { - log.Printf("request %#v\n\n", config) + log.Printf("sarama %#v\n\n", config) } return output.Bytes(), err } + defer producer.Close() - topic := args.Config["topic"] msg := &sarama.ProducerMessage{ - Topic: topic, + Topic: args.Config["topic"], Value: sarama.StringEncoder(args.Config["message"]), } @@ -87,10 +86,7 @@ func (s *Kafka) ExecuteImpl(args *dktypes.ExecuteRequest) ([]byte, error) { if err != nil { return output.Bytes(), err } - defer func() { - producer.Close() - }() - output.Write([]byte("Result: success to publish data\n")) + output.Write([]byte("Result: successfully produced the message on Kafka broker\n")) return output.Bytes(), nil } diff --git a/builtin/bins/dkron-executor-kafka/kafka_test.go b/builtin/bins/dkron-executor-kafka/kafka_test.go index 1a746307a..79b171f46 100644 --- a/builtin/bins/dkron-executor-kafka/kafka_test.go +++ b/builtin/bins/dkron-executor-kafka/kafka_test.go @@ -7,14 +7,14 @@ import ( dktypes "github.com/distribworks/dkron/v3/plugin/types" ) -func TestPublishExecute(t *testing.T) { +func TestProduceExecute(t *testing.T) { pa := &dktypes.ExecuteRequest{ JobName: "testJob", Config: map[string]string{ - "topic": "test", - "url": "tesr", - "message": "{\"hello\":11}", - "debug": "true", + "topic": "test", + "brokerAddress": "testaddress", + "message": "{\"hello\":11}", + "debug": "true", }, } kafka := &Kafka{} diff --git a/builtin/bins/dkron-executor-nats/nats.go b/builtin/bins/dkron-executor-nats/nats.go index f8c398f2e..e918a0216 100644 --- a/builtin/bins/dkron-executor-nats/nats.go +++ b/builtin/bins/dkron-executor-nats/nats.go @@ -25,9 +25,9 @@ type Nats struct { // Execute Process method of the plugin // "executor": "nats", // "executor_config": { -// "url": "http://example.com", // nats server url -// "message": "", // -// "topic": "publishTopic", // +// "url": "tls://nats.demo.io:4443", // nats server url +// "message": "", +// "subject": "Subject", // "userName":"test@hbh.dfg", // "password":"dfdffs" // } @@ -57,18 +57,18 @@ func (s *Nats) ExecuteImpl(args *dktypes.ExecuteRequest) ([]byte, error) { return output.Bytes(), errors.New("url is empty") } - if args.Config["topic"] == "" { - return output.Bytes(), errors.New("topic is empty") + if args.Config["subject"] == "" { + return output.Bytes(), errors.New("subject is empty") } nc, err := nats.Connect(args.Config["url"], nats.UserInfo(args.Config["userName"], args.Config["password"])) if err != nil { - return output.Bytes(), errors.New("Error At Nats Connection") + return output.Bytes(), errors.New("error connecting to NATS") } - nc.Publish(args.Config["topic"], []byte(args.Config["message"])) + nc.Publish(args.Config["subject"], []byte(args.Config["message"])) - output.Write([]byte("Result: success to publish data\n")) + output.Write([]byte("Result: Message successfully sent\n")) if debug { log.Printf("request %#v\n\n", nc) diff --git a/builtin/bins/dkron-executor-nats/nats_test.go b/builtin/bins/dkron-executor-nats/nats_test.go index 94a4889da..2958dad78 100644 --- a/builtin/bins/dkron-executor-nats/nats_test.go +++ b/builtin/bins/dkron-executor-nats/nats_test.go @@ -11,7 +11,7 @@ func TestPublishExecute(t *testing.T) { pa := &dktypes.ExecuteRequest{ JobName: "testJob", Config: map[string]string{ - "topic": "opcuaReadRequest", + "subject": "opcuaReadRequest", "url": "localhost:4222", "message": "{\"hello\":11}", "debug": "true", diff --git a/website/content/usage/executors/_index.md b/website/content/usage/executors/_index.md index e66345263..610ed0c1f 100644 --- a/website/content/usage/executors/_index.md +++ b/website/content/usage/executors/_index.md @@ -7,9 +7,9 @@ weight: 30 Executor plugins are the main mechanism of execution in Dkron. They implement different "types" of jobs in the sense that they can perform the most diverse actions on the target nodes. -For example, the built-in `shell` executor, will run the indicated command in the target node. +For example, the built-in `shell` executor, will run the indicated command on the target node. -New plugins will be added, or you can create new ones, to perform different tasks, as HTTP requests, Docker runs, anything that you can imagine. +New plugins will be added, or you can create new ones, to perform different tasks, such as HTTP requests, Docker runs, anything that you can imagine. {{% children %}} diff --git a/website/content/usage/executors/http.md b/website/content/usage/executors/http.md index 7dfad8f33..fc9da1608 100644 --- a/website/content/usage/executors/http.md +++ b/website/content/usage/executors/http.md @@ -9,18 +9,18 @@ HTTP executor can send a request to an HTTP endpoint Params: ``` -method: Request method in uppercase -url: Request url -headers: Json string, such as "[\"Content-Type: application/json\"]" -body: POST body -timeout: Request timeout, unit seconds +method: Request method in uppercase +url: Request url +headers: Json string, such as "[\"Content-Type: application/json\"]" +body: POST body +timeout: Request timeout, unit seconds expectCode: Expect response code, such as 200,206 expectBody: Expect response body, support regexp, such as /success/ -debug: Debug option, will log everything when this option is not empty -tlsNoVerifyPeer: false (default) or true. If true, disables verification of the remote SSL certificate's validity. -tlsCertificateFile: Path to the PEM file containing the client certificate. Optional. +debug: Debug option, will log everything when this option is not empty +tlsNoVerifyPeer: false (default) or true. If true, disables verification of the remote SSL certificate's validity. +tlsCertificateFile: Path to the PEM file containing the client certificate. Optional. tlsCertificateKeyFile: Path to the PEM file containing the client certificate private key. Optional. -tlsRootCAsFile: Path to the PEM file containing certificates to use as root CAs. Optional. +tlsRootCAsFile: Path to the PEM file containing certificates to use as root CAs. Optional. ``` Example diff --git a/website/content/usage/executors/kafka.md b/website/content/usage/executors/kafka.md new file mode 100644 index 000000000..01c62925a --- /dev/null +++ b/website/content/usage/executors/kafka.md @@ -0,0 +1,28 @@ + +--- +title: Kafka Executor +--- + +A basic Kafka executor that produces a message on a Kafka broker. + +## Configuration + +Params + +``` +brokerAddress: "IP:port" of the broker +message: The message to produce +topic: The Kafka topic for this message +debug: Turns on debugging output if not empty +``` + +Example + +```json +"executor": "kafka", +"executor_config": { + "brokerAddress": "localhost:9092", + "message": "My message", + "topic": "my_topic" +} +``` diff --git a/website/content/usage/executors/nats.md b/website/content/usage/executors/nats.md new file mode 100644 index 000000000..da3176fac --- /dev/null +++ b/website/content/usage/executors/nats.md @@ -0,0 +1,35 @@ + +--- +title: Shell Executor +--- + +The NATS executor sends a message to a NATS server/cluster. + +Currently, only username/password authentication is supported. + +## Configuration + +Params + +``` +url: Comma separated list of NATS server URLs +message: The message to send +subject: The subject to send the message to +userName: username for authentication +password: password for authentication +debug: If not empty, turns on debugging. Will log the NATS specific job config and the request sent. +``` + +Example + +```json +{ + "executor": "nats", + "executor_config": { + "url": "tls://nats.demo.io:4443", + "message": "the message", + "subject": "myfavoritesubject", + "userName":"someusername", + "password":"somepassword" +} +``` From fd511559e7b7f700f70f4dc0650513eaac4d05e1 Mon Sep 17 00:00:00 2001 From: Victor Castell Date: Tue, 2 Mar 2021 23:21:24 +0100 Subject: [PATCH 4/4] Update .goreleaser.yml --- .goreleaser.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.goreleaser.yml b/.goreleaser.yml index ca26b537b..a08ff7013 100644 --- a/.goreleaser.yml +++ b/.goreleaser.yml @@ -42,7 +42,7 @@ builds: - <<: *xbuild main: ./builtin/bins/dkron-executor-kafka/ id: dkron-executor-kafka - binary: dkron-executor-nats + binary: dkron-executor-kafka - <<: *xbuild main: ./builtin/bins/dkron-processor-files/