From f808d96d2e317808c087ffa64959c201b94b852c Mon Sep 17 00:00:00 2001 From: Vaibhav Date: Sun, 8 Mar 2020 21:27:31 -0400 Subject: [PATCH] Kafka trigger (#530) * feat(kafka-trigger): added kafka trigger * feat(kafka-trigger): added kafka producers to sensor context * feat(kafka-trigger): added kafka trigger tests --- Gopkg.lock | 24 +- Gopkg.toml | 6 +- Makefile | 2 +- api/event-source.html | 28 +- api/event-source.md | 49 +- api/gateway.html | 10 +- api/gateway.md | 16 +- api/sensor.html | 253 +++++++++-- api/sensor.md | 419 +++++++++++++++--- .../v1alpha1/openapi_generated.go | 22 +- .../gateway/v1alpha1/openapi_generated.go | 8 +- pkg/apis/sensor/v1alpha1/openapi_generated.go | 200 +++++++-- pkg/apis/sensor/v1alpha1/types.go | 48 +- .../sensor/v1alpha1/zz_generated.deepcopy.go | 75 +++- .../clientset/versioned/clientset.go | 7 + .../eventsources/interface.go | 2 +- .../gateway/clientset/versioned/clientset.go | 7 + .../externalversions/gateway/interface.go | 2 +- .../sensor/clientset/versioned/clientset.go | 7 + .../externalversions/sensor/interface.go | 2 +- sensors/context.go | 4 + sensors/trigger.go | 10 + sensors/triggers/kafka/kafka.go | 180 ++++++++ sensors/triggers/kafka/kafka_test.go | 172 +++++++ 24 files changed, 1359 insertions(+), 194 deletions(-) create mode 100644 sensors/triggers/kafka/kafka.go create mode 100644 sensors/triggers/kafka/kafka_test.go diff --git a/Gopkg.lock b/Gopkg.lock index 5180964c03..1e43067f34 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -16,7 +16,7 @@ "pubsub/internal/distribution", ] pruneopts = "UT" - revision = "386c9b99ffb6a082e5de780ccd38dd8ce5f2b994" + revision = "52020a63249f2e4201e59458009f5af55acf483d" [[projects]] digest = "1:487dc37a77bbba996bf4ddae0bff1c69fde98027d507e75eca317ca7c94483c3" @@ -111,10 +111,13 @@ revision = "de5bf2ad457846296e2031421a34e2568e304e35" [[projects]] - digest = "1:648c705cb6dfd4c285ad15eced5237a658a07b7de4c41c506a6e4a2a8ee93cb5" + digest = "1:c3fe4ae203a7fd972a43357906375909e0ea096a15f07588c72b1e1dbe6e587e" name = "github.com/Shopify/sarama" - packages = ["."] - pruneopts = "UT" + packages = [ + ".", + "mocks", + ] + pruneopts = "T" revision = "b3a812117be917e2dd4f8cbcf4ff861e3ae7ef38" version = "v1.26.1" @@ -357,20 +360,20 @@ version = "v0.19.3" [[projects]] - digest = "1:97899a475b12f80513ede1ea1806dc3bd91380e233d463aff8360a2ac1bfb3b4" + digest = "1:9e6992ea4c7005c711e3460e90d20e0c40cce6743bbdc65d12e8fe45e5b6beaf" name = "github.com/go-openapi/spec" packages = ["."] pruneopts = "UT" - revision = "dce82e4362c708e636da7feb0831bd29efb5cd97" - version = "v0.19.6" + revision = "1297e9a4ddf9325269fe013d7c1300aac3985f92" + version = "v0.19.7" [[projects]] - digest = "1:46055af5313a0deb06c423d849283aac853f554df5a3c42e705a81b8ebf7f1c4" + digest = "1:8b59d79dc97889c333cdad6fcbd9c47d4d9abc19a5ec0683bfa40e5a36d1b1b7" name = "github.com/go-openapi/swag" packages = ["."] pruneopts = "UT" - revision = "6a1eb9830f1c6b090dcd01cee6c3983604a2306f" - version = "v0.19.7" + revision = "59a9232e9392613952a0a4c90523c40c99140043" + version = "v0.19.8" [[projects]] digest = "1:e17320223c7c866dfbec405efe8abc8e383ccad131fb76ca7ff70580dafb9067" @@ -2034,6 +2037,7 @@ "github.com/Azure/azure-event-hubs-go", "github.com/Knetic/govaluate", "github.com/Shopify/sarama", + "github.com/Shopify/sarama/mocks", "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1", "github.com/aws/aws-sdk-go/aws", diff --git a/Gopkg.toml b/Gopkg.toml index 9840429636..478148b3d9 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -78,7 +78,7 @@ required = [ [[constraint]] name = "github.com/Shopify/sarama" - version = "v1.23.0" + version = "v1.26.1" [[constraint]] name = "github.com/stretchr/testify" @@ -160,6 +160,10 @@ required = [ name = "k8s.io/gengo" unused-packages = false + [[prune.project]] + name = "github.com/Shopify/sarama" + unused-packages = false + [[constraint]] name = "github.com/google/go-github" revision = "50be09d24ee31a2b0868265e76c24b9545a6eb7a" diff --git a/Makefile b/Makefile index fa161dca09..0abd7c944d 100644 --- a/Makefile +++ b/Makefile @@ -17,7 +17,7 @@ override LDFLAGS += \ # docker image publishing options DOCKER_PUSH?=true IMAGE_NAMESPACE?=argoproj -IMAGE_TAG?=v0.13.0-rc +IMAGE_TAG?=v0.13.0 BUILD_BINARY?=true ifeq (${DOCKER_PUSH},true) diff --git a/api/event-source.html b/api/event-source.html index 571ffb5d7f..6d81a3b64d 100644 --- a/api/event-source.html +++ b/api/event-source.html @@ -722,7 +722,7 @@

EventSourceSpec type
-github.com/argoproj/argo-events/pkg/apis/common.EventSourceType +Argo Events common.EventSourceType @@ -1932,6 +1932,18 @@

SNSEventSource

Region is AWS region

+ + +roleARN
+ +string + + + +(Optional) +

RoleARN is the Amazon Resource Name (ARN) of the role to assume.

+ +

SQSEventSource @@ -2023,6 +2035,18 @@

SQSEventSource

Namespace refers to Kubernetes namespace to read access related secret from.

+ + +roleARN
+ +string + + + +(Optional) +

RoleARN is the Amazon Resource Name (ARN) of the role to assume.

+ +

SlackEventSource @@ -2272,5 +2296,5 @@

StripeEventSource

Generated with gen-crd-api-reference-docs -on git commit 95e393b. +on git commit d0d11e4.

diff --git a/api/event-source.md b/api/event-source.md index bd9c92bd9e..f0ec3fc1ce 100644 --- a/api/event-source.md +++ b/api/event-source.md @@ -1397,8 +1397,7 @@ Generic event source -type
-github.com/argoproj/argo-events/pkg/apis/common.EventSourceType +type
Argo Events common.EventSourceType @@ -3833,6 +3832,28 @@ Region is AWS region + + + + +roleARN
string + + + + + +(Optional) + +

+ +RoleARN is the Amazon Resource Name (ARN) of the role to assume. + +

+ + + + + @@ -4012,6 +4033,28 @@ from. + + + + +roleARN
string + + + + + +(Optional) + +

+ +RoleARN is the Amazon Resource Name (ARN) of the role to assume. + +

+ + + + + @@ -4527,6 +4570,6 @@ all types of events will be processed. More info at

Generated with gen-crd-api-reference-docs on git -commit 95e393b. +commit d0d11e4.

diff --git a/api/gateway.html b/api/gateway.html index a74c260afe..435e76ee24 100644 --- a/api/gateway.html +++ b/api/gateway.html @@ -136,7 +136,7 @@

Gateway type
-github.com/argoproj/argo-events/pkg/apis/common.EventSourceType +Argo Events common.EventSourceType @@ -186,7 +186,7 @@

Gateway eventProtocol
-github.com/argoproj/argo-events/pkg/apis/common.EventProtocol +Argo Events common.EventProtocol @@ -304,7 +304,7 @@

GatewaySpec type
-github.com/argoproj/argo-events/pkg/apis/common.EventSourceType +Argo Events common.EventSourceType @@ -354,7 +354,7 @@

GatewaySpec eventProtocol
-github.com/argoproj/argo-events/pkg/apis/common.EventProtocol +Argo Events common.EventProtocol @@ -671,5 +671,5 @@

Subscribers

Generated with gen-crd-api-reference-docs -on git commit 95e393b. +on git commit d0d11e4.

diff --git a/api/gateway.md b/api/gateway.md index 08836c5f10..341aea9f68 100644 --- a/api/gateway.md +++ b/api/gateway.md @@ -270,8 +270,7 @@ configurations for the gateway -type
-github.com/argoproj/argo-events/pkg/apis/common.EventSourceType +type
Argo Events common.EventSourceType @@ -358,8 +357,8 @@ Port on which the gateway event source processor is running on. -eventProtocol
-github.com/argoproj/argo-events/pkg/apis/common.EventProtocol +eventProtocol
Argo Events common.EventProtocol + @@ -601,8 +600,7 @@ configurations for the gateway -type
-github.com/argoproj/argo-events/pkg/apis/common.EventSourceType +type
Argo Events common.EventSourceType @@ -689,8 +687,8 @@ Port on which the gateway event source processor is running on. -eventProtocol
-github.com/argoproj/argo-events/pkg/apis/common.EventProtocol +eventProtocol
Argo Events common.EventProtocol + @@ -1323,6 +1321,6 @@ NATS refers to the subscribers over NATS protocol.

Generated with gen-crd-api-reference-docs on git -commit 95e393b. +commit d0d11e4.

diff --git a/api/sensor.html b/api/sensor.html index c7e5a69cd1..c7536a4085 100644 --- a/api/sensor.html +++ b/api/sensor.html @@ -225,7 +225,7 @@

ArtifactLocation s3
-github.com/argoproj/argo-events/pkg/apis/common.S3Artifact +Argo Events common.S3Artifact @@ -735,7 +735,7 @@

EventDependencyFilter context
-github.com/argoproj/argo-events/pkg/apis/common.EventContext +Argo Events common.EventContext @@ -1090,8 +1090,8 @@

HTTPTrigger tls
- -HTTPTriggerTLS + +TLSConfig @@ -1143,14 +1143,23 @@

HTTPTrigger -

HTTPTriggerTLS +

JSONType +(string alias)

+

+(Appears on: +DataFilter) +

+

+

JSONType contains the supported JSON types for data filtering

+

+

K8sResourcePolicy

(Appears on: -HTTPTrigger) +TriggerPolicy)

-

HTTPTriggerTLS refers to TLS configuration for the HTTP client

+

K8sResourcePolicy refers to the policy used to check the state of K8s based triggers using using labels

@@ -1162,56 +1171,48 @@

HTTPTriggerTLS

-caCertPath
+labels
-string +map[string]string
-

CACertPath refers the file path that contains the CA cert.

+

Labels required to identify whether a resource is in success state

-clientCertPath
+backoff
-string +k8s.io/apimachinery/pkg/util/wait.Backoff
-

ClientCertPath refers the file path that contains client cert.

+

Backoff before checking resource state

-clientKeyPath
+errorOnBackoffTimeout
-string +bool
-

ClientKeyPath refers the file path that contains client key.

+

ErrorOnBackoffTimeout determines whether sensor should transition to error state if the trigger policy is unable to determine +the state of the resource

-

JSONType -(string alias)

-

-(Appears on: -DataFilter) -

-

-

JSONType contains the supported JSON types for data filtering

-

-

K8sResourcePolicy +

KafkaTrigger

(Appears on: -TriggerPolicy) +TriggerTemplate)

-

K8sResourcePolicy refers to the policy used to check the state of K8s based triggers using using labels

+

KafkaTrigger refers to the specification of the Kafka trigger.

@@ -1223,36 +1224,127 @@

K8sResourcePolicy

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -1436,7 +1528,7 @@

NodeStatus

+ + + +
-labels
+url
-map[string]string +string
-

Labels required to identify whether a resource is in success state

+

URL of the Kafka broker.

-backoff
+topic
-k8s.io/apimachinery/pkg/util/wait.Backoff +string
-

Backoff before checking resource state

+

Name of the topic. +More info at https://kafka.apache.org/documentation/#intro_topics

-errorOnBackoffTimeout
+partition
+ +int + +
+

Partition to write data to.

+
+parameters
+ + +[]TriggerParameter + + +
+

Parameters is the list of parameters that is applied to resolved Kafka trigger object.

+
+requiredAcks
+ +int + +
+

RequiredAcks used in producer to tell the broker how many replica acknowledgements +Defaults to 1 (Only wait for the leader to ack).

+
+compress
bool
-

ErrorOnBackoffTimeout determines whether sensor should transition to error state if the trigger policy is unable to determine -the state of the resource

+(Optional) +

Compress determines whether to compress message or not. +Defaults to false. +If set to true, compresses message using snappy compression.

+
+flushFrequency
+ +int + +
+(Optional) +

FlushFrequency refers to the frequency in milliseconds to flush batches. +Defaults to 500 milliseconds.

+
+tls
+ + +TLSConfig + + +
+(Optional) +

TLS configuration for the Kafka producer.

+
+payload
+ + +[]TriggerParameter + + +
+

Payload is the list of key-value extracted from an event payload to construct the request payload.

+
+partitioningKey
+ +string + +
+

The partitioning key for the messages put on the Kafka topic. +Defaults to broker url.

event
-github.com/argoproj/argo-events/pkg/apis/common.Event +Argo Events common.Event
@@ -1544,6 +1636,20 @@

OpenFaasTrigger

+username
+ + +Kubernetes core/v1.SecretKeySelector + + +
+(Optional) +

Username refers to the Kubernetes secret that holds the username required to log into the gateway.

+
password
@@ -2217,6 +2323,59 @@

Subscription

+

TLSConfig +

+

+(Appears on: +HTTPTrigger, +KafkaTrigger) +

+

+

TLSConfig refers to TLS configuration for the HTTP client

+

+ + + + + + + + + + + + + + + + + + + + + +
FieldDescription
+caCertPath
+ +string + +
+

CACertPath refers the file path that contains the CA cert.

+
+clientCertPath
+ +string + +
+

ClientCertPath refers the file path that contains client cert.

+
+clientKeyPath
+ +string + +
+

ClientKeyPath refers the file path that contains client key.

+

TimeFilter

@@ -2340,6 +2499,7 @@

TriggerParameter ArgoWorkflowTrigger, CustomTrigger, HTTPTrigger, +KafkaTrigger, OpenFaasTrigger, StandardK8sTrigger, Trigger) @@ -2681,7 +2841,7 @@

TriggerTemplate -customTrigger
+custom
CustomTrigger @@ -2693,6 +2853,19 @@

TriggerTemplate

CustomTrigger refers to the trigger designed to connect to a gRPC trigger server and execute a custom trigger.

+ + +kafka
+ +
+KafkaTrigger + + + + +

Kafka refers to the trigger designed to place messages on Kafka topic.

+ +

URLArtifact @@ -2739,5 +2912,5 @@

URLArtifact

Generated with gen-crd-api-reference-docs -on git commit 95e393b. +on git commit d0d11e4.

diff --git a/api/sensor.md b/api/sensor.md index 7ba1e5eb47..20819beab8 100644 --- a/api/sensor.md +++ b/api/sensor.md @@ -473,8 +473,7 @@ Description -s3
-github.com/argoproj/argo-events/pkg/apis/common.S3Artifact +s3
Argo Events common.S3Artifact @@ -1496,8 +1495,7 @@ Time filter on the event with escalation -context
-github.com/argoproj/argo-events/pkg/apis/common.EventContext +context
Argo Events common.EventContext @@ -2219,9 +2217,8 @@ construct the HTTP request payload. -tls
- HTTPTriggerTLS - +tls
+TLSConfig @@ -2313,16 +2310,41 @@ Timeout refers to the HTTP request timeout in seconds. Default value is -

+

+ +JSONType (string alias) + +

+ +

+ +

+ +(Appears on: +DataFilter) + +

+ +

+ +

+ +JSONType contains the supported JSON types for data filtering + +

+ +

+ +

-HTTPTriggerTLS +K8sResourcePolicy

(Appears on: -HTTPTrigger) +TriggerPolicy)

@@ -2330,7 +2352,8 @@ HTTPTriggerTLS

-HTTPTriggerTLS refers to TLS configuration for the HTTP client +K8sResourcePolicy refers to the policy used to check the state of K8s +based triggers using using labels

@@ -2364,7 +2387,7 @@ Description -caCertPath
string +labels
map\[string\]string @@ -2372,7 +2395,7 @@ Description

-CACertPath refers the file path that contains the CA cert. +Labels required to identify whether a resource is in success state

@@ -2384,7 +2407,8 @@ CACertPath refers the file path that contains the CA cert. -clientCertPath
string +backoff
k8s.io/apimachinery/pkg/util/wait.Backoff + @@ -2392,7 +2416,7 @@ CACertPath refers the file path that contains the CA cert.

-ClientCertPath refers the file path that contains client cert. +Backoff before checking resource state

@@ -2404,7 +2428,7 @@ ClientCertPath refers the file path that contains client cert. -clientKeyPath
string +errorOnBackoffTimeout
bool @@ -2412,7 +2436,9 @@ ClientCertPath refers the file path that contains client cert.

-ClientKeyPath refers the file path that contains client key. +ErrorOnBackoffTimeout determines whether sensor should transition to +error state if the trigger policy is unable to determine the state of +the resource

@@ -2424,18 +2450,16 @@ ClientKeyPath refers the file path that contains client key. -

- -JSONType (string alias) +

-

+KafkaTrigger

(Appears on: -DataFilter) +TriggerTemplate)

@@ -2443,73 +2467,180 @@ JSONType (string alias)

-JSONType contains the supported JSON types for data filtering +KafkaTrigger refers to the specification of the Kafka trigger.

-

+ -K8sResourcePolicy + - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -5402,6 +5694,27 @@ trigger server and execute a custom trigger. + + + + + + + +
+ +Field + + + +Description + +
+ +url
string + +

-(Appears on: -TriggerPolicy) +URL of the Kafka broker.

+
+ +topic
string + +
+

+Name of the topic. More info at +https://kafka.apache.org/documentation/\#intro\_topics + +

+ +
+ +partition
int + +
+

-K8sResourcePolicy refers to the policy used to check the state of K8s -based triggers using using labels +Partition to write data to.

+
+ +parameters
+ \[\]TriggerParameter + + +
+ +

+ +Parameters is the list of parameters that is applied to resolved Kafka +trigger object. +

- + - + - - - + - + + + + + @@ -2550,9 +2685,29 @@ Backoff before checking resource state

-ErrorOnBackoffTimeout determines whether sensor should transition to -error state if the trigger policy is unable to determine the state of -the resource +Payload is the list of key-value extracted from an event payload to +construct the request payload. + +

+ + + + + + + + + + @@ -3158,6 +3312,31 @@ are applied to the HTTP trigger resource. + + + + + + + +
+ -Field +requiredAcks
int - +
+ -Description +

- +RequiredAcks used in producer to tell the broker how many replica +acknowledgements Defaults to 1 (Only wait for the leader to ack). + +

+ +
+ +compress
bool + +
+ +(Optional) + +

+ +Compress determines whether to compress message or not. Defaults to +false. If set to true, compresses message using snappy compression. + +

+ +
-labels
map\[string\]string +flushFrequency
int
+(Optional) +

-Labels required to identify whether a resource is in success state +FlushFrequency refers to the frequency in milliseconds to flush batches. +Defaults to 500 milliseconds.

@@ -2521,16 +2652,18 @@ Labels required to identify whether a resource is in success state
-backoff
k8s.io/apimachinery/pkg/util/wait.Backoff - +tls
+TLSConfig
+(Optional) +

-Backoff before checking resource state +TLS configuration for the Kafka producer.

@@ -2542,7 +2675,9 @@ Backoff before checking resource state
-errorOnBackoffTimeout
bool +payload
+ \[\]TriggerParameter +
+ +partitioningKey
string + +
+ +

+ +The partitioning key for the messages put on the Kafka topic. Defaults +to broker url.

@@ -2928,8 +3083,7 @@ events
-event
-github.com/argoproj/argo-events/pkg/apis/common.Event +event
Argo Events common.Event
+username
+ +Kubernetes core/v1.SecretKeySelector + +
+ +(Optional) + +

+ +Username refers to the Kubernetes secret that holds the username +required to log into the gateway. + +

+ +
+ password
Kubernetes core/v1.SecretKeySelector @@ -4436,6 +4615,118 @@ NATS refers to the NATS subscription of events for the sensor
+

+ +TLSConfig + +

+ +

+ +(Appears on: +HTTPTrigger, +KafkaTrigger) + +

+ +

+ +

+ +TLSConfig refers to TLS configuration for the HTTP client + +

+ +

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
+ +Field + + + +Description + +
+ +caCertPath
string + +
+ +

+ +CACertPath refers the file path that contains the CA cert. + +

+ +
+ +clientCertPath
string + +
+ +

+ +ClientCertPath refers the file path that contains client cert. + +

+ +
+ +clientKeyPath
string + +
+ +

+ +ClientKeyPath refers the file path that contains client key. + +

+ +
+

TimeFilter @@ -4687,6 +4978,7 @@ TriggerParameter ArgoWorkflowTrigger, CustomTrigger, HTTPTrigger, +KafkaTrigger, OpenFaasTrigger, StandardK8sTrigger, Trigger) @@ -5382,7 +5674,7 @@ with with on-the-fly constructable payload.

-customTrigger
+custom
CustomTrigger
+ +kafka
+ KafkaTrigger + +
+ +

+ +Kafka refers to the trigger designed to place messages on Kafka topic. + +

+ +
@@ -5502,6 +5815,6 @@ VerifyCert decides whether the connection is secure or not

Generated with gen-crd-api-reference-docs on git -commit 95e393b. +commit d0d11e4.

diff --git a/pkg/apis/eventsources/v1alpha1/openapi_generated.go b/pkg/apis/eventsources/v1alpha1/openapi_generated.go index 2da7f12003..9ca3218c36 100644 --- a/pkg/apis/eventsources/v1alpha1/openapi_generated.go +++ b/pkg/apis/eventsources/v1alpha1/openapi_generated.go @@ -289,14 +289,14 @@ func schema_pkg_apis_eventsources_v1alpha1_EventSource(ref common.ReferenceCallb Properties: map[string]spec.Schema{ "kind": { SchemaProps: spec.SchemaProps{ - Description: "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#types-kinds", + Description: "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds", Type: []string{"string"}, Format: "", }, }, "apiVersion": { SchemaProps: spec.SchemaProps{ - Description: "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#resources", + Description: "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources", Type: []string{"string"}, Format: "", }, @@ -334,14 +334,14 @@ func schema_pkg_apis_eventsources_v1alpha1_EventSourceList(ref common.ReferenceC Properties: map[string]spec.Schema{ "kind": { SchemaProps: spec.SchemaProps{ - Description: "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#types-kinds", + Description: "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds", Type: []string{"string"}, Format: "", }, }, "apiVersion": { SchemaProps: spec.SchemaProps{ - Description: "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#resources", + Description: "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources", Type: []string{"string"}, Format: "", }, @@ -1538,6 +1538,13 @@ func schema_pkg_apis_eventsources_v1alpha1_SNSEventSource(ref common.ReferenceCa Format: "", }, }, + "roleARN": { + SchemaProps: spec.SchemaProps{ + Description: "RoleARN is the Amazon Resource Name (ARN) of the role to assume.", + Type: []string{"string"}, + Format: "", + }, + }, }, Required: []string{"webhook", "topicArn", "region"}, }, @@ -1594,6 +1601,13 @@ func schema_pkg_apis_eventsources_v1alpha1_SQSEventSource(ref common.ReferenceCa Format: "", }, }, + "roleARN": { + SchemaProps: spec.SchemaProps{ + Description: "RoleARN is the Amazon Resource Name (ARN) of the role to assume.", + Type: []string{"string"}, + Format: "", + }, + }, }, Required: []string{"region", "queue", "waitTimeSeconds"}, }, diff --git a/pkg/apis/gateway/v1alpha1/openapi_generated.go b/pkg/apis/gateway/v1alpha1/openapi_generated.go index 255d1b8e76..b90ecdded4 100644 --- a/pkg/apis/gateway/v1alpha1/openapi_generated.go +++ b/pkg/apis/gateway/v1alpha1/openapi_generated.go @@ -78,14 +78,14 @@ func schema_pkg_apis_gateway_v1alpha1_Gateway(ref common.ReferenceCallback) comm Properties: map[string]spec.Schema{ "kind": { SchemaProps: spec.SchemaProps{ - Description: "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#types-kinds", + Description: "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds", Type: []string{"string"}, Format: "", }, }, "apiVersion": { SchemaProps: spec.SchemaProps{ - Description: "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#resources", + Description: "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources", Type: []string{"string"}, Format: "", }, @@ -123,14 +123,14 @@ func schema_pkg_apis_gateway_v1alpha1_GatewayList(ref common.ReferenceCallback) Properties: map[string]spec.Schema{ "kind": { SchemaProps: spec.SchemaProps{ - Description: "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#types-kinds", + Description: "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds", Type: []string{"string"}, Format: "", }, }, "apiVersion": { SchemaProps: spec.SchemaProps{ - Description: "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#resources", + Description: "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources", Type: []string{"string"}, Format: "", }, diff --git a/pkg/apis/sensor/v1alpha1/openapi_generated.go b/pkg/apis/sensor/v1alpha1/openapi_generated.go index 104f9b5305..69d282f364 100644 --- a/pkg/apis/sensor/v1alpha1/openapi_generated.go +++ b/pkg/apis/sensor/v1alpha1/openapi_generated.go @@ -45,8 +45,8 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.GitRemoteConfig": schema_pkg_apis_sensor_v1alpha1_GitRemoteConfig(ref), "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.HTTPSubscription": schema_pkg_apis_sensor_v1alpha1_HTTPSubscription(ref), "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.HTTPTrigger": schema_pkg_apis_sensor_v1alpha1_HTTPTrigger(ref), - "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.HTTPTriggerTLS": schema_pkg_apis_sensor_v1alpha1_HTTPTriggerTLS(ref), "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.K8sResourcePolicy": schema_pkg_apis_sensor_v1alpha1_K8sResourcePolicy(ref), + "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.KafkaTrigger": schema_pkg_apis_sensor_v1alpha1_KafkaTrigger(ref), "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.NATSSubscription": schema_pkg_apis_sensor_v1alpha1_NATSSubscription(ref), "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.NodeStatus": schema_pkg_apis_sensor_v1alpha1_NodeStatus(ref), "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.OpenFaasTrigger": schema_pkg_apis_sensor_v1alpha1_OpenFaasTrigger(ref), @@ -58,6 +58,7 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.StandardK8sTrigger": schema_pkg_apis_sensor_v1alpha1_StandardK8sTrigger(ref), "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.StatusPolicy": schema_pkg_apis_sensor_v1alpha1_StatusPolicy(ref), "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.Subscription": schema_pkg_apis_sensor_v1alpha1_Subscription(ref), + "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.TLSConfig": schema_pkg_apis_sensor_v1alpha1_TLSConfig(ref), "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.TimeFilter": schema_pkg_apis_sensor_v1alpha1_TimeFilter(ref), "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.Trigger": schema_pkg_apis_sensor_v1alpha1_Trigger(ref), "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.TriggerParameter": schema_pkg_apis_sensor_v1alpha1_TriggerParameter(ref), @@ -831,7 +832,7 @@ func schema_pkg_apis_sensor_v1alpha1_HTTPTrigger(ref common.ReferenceCallback) c "tls": { SchemaProps: spec.SchemaProps{ Description: "TLS configuration for the HTTP client.", - Ref: ref("github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.HTTPTriggerTLS"), + Ref: ref("github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.TLSConfig"), }, }, "method": { @@ -871,86 +872,158 @@ func schema_pkg_apis_sensor_v1alpha1_HTTPTrigger(ref common.ReferenceCallback) c }, }, Dependencies: []string{ - "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.HTTPTriggerTLS", "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.TriggerParameter"}, + "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.TLSConfig", "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.TriggerParameter"}, } } -func schema_pkg_apis_sensor_v1alpha1_HTTPTriggerTLS(ref common.ReferenceCallback) common.OpenAPIDefinition { +func schema_pkg_apis_sensor_v1alpha1_K8sResourcePolicy(ref common.ReferenceCallback) common.OpenAPIDefinition { return common.OpenAPIDefinition{ Schema: spec.Schema{ SchemaProps: spec.SchemaProps{ - Description: "HTTPTriggerTLS refers to TLS configuration for the HTTP client", + Description: "K8sResourcePolicy refers to the policy used to check the state of K8s based triggers using using labels", Type: []string{"object"}, Properties: map[string]spec.Schema{ - "caCertPath": { + "labels": { SchemaProps: spec.SchemaProps{ - Description: "CACertPath refers the file path that contains the CA cert.", - Type: []string{"string"}, - Format: "", + Description: "Labels required to identify whether a resource is in success state", + Type: []string{"object"}, + AdditionalProperties: &spec.SchemaOrBool{ + Allows: true, + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, + }, }, }, - "clientCertPath": { + "backoff": { SchemaProps: spec.SchemaProps{ - Description: "ClientCertPath refers the file path that contains client cert.", - Type: []string{"string"}, - Format: "", + Description: "Backoff before checking resource state", + Ref: ref("k8s.io/apimachinery/pkg/util/wait.Backoff"), }, }, - "clientKeyPath": { + "errorOnBackoffTimeout": { SchemaProps: spec.SchemaProps{ - Description: "ClientKeyPath refers the file path that contains client key.", - Type: []string{"string"}, + Description: "ErrorOnBackoffTimeout determines whether sensor should transition to error state if the trigger policy is unable to determine the state of the resource", + Type: []string{"boolean"}, Format: "", }, }, }, - Required: []string{"caCertPath", "clientCertPath", "clientKeyPath"}, + Required: []string{"labels", "backoff", "errorOnBackoffTimeout"}, }, }, + Dependencies: []string{ + "k8s.io/apimachinery/pkg/util/wait.Backoff"}, } } -func schema_pkg_apis_sensor_v1alpha1_K8sResourcePolicy(ref common.ReferenceCallback) common.OpenAPIDefinition { +func schema_pkg_apis_sensor_v1alpha1_KafkaTrigger(ref common.ReferenceCallback) common.OpenAPIDefinition { return common.OpenAPIDefinition{ Schema: spec.Schema{ SchemaProps: spec.SchemaProps{ - Description: "K8sResourcePolicy refers to the policy used to check the state of K8s based triggers using using labels", + Description: "KafkaTrigger refers to the specification of the Kafka trigger.", Type: []string{"object"}, Properties: map[string]spec.Schema{ - "labels": { + "url": { SchemaProps: spec.SchemaProps{ - Description: "Labels required to identify whether a resource is in success state", - Type: []string{"object"}, - AdditionalProperties: &spec.SchemaOrBool{ - Allows: true, + Description: "URL of the Kafka broker.", + Type: []string{"string"}, + Format: "", + }, + }, + "topic": { + SchemaProps: spec.SchemaProps{ + Description: "Name of the topic. More info at https://kafka.apache.org/documentation/#intro_topics", + Type: []string{"string"}, + Format: "", + }, + }, + "partition": { + SchemaProps: spec.SchemaProps{ + Description: "Partition to write data to.", + Type: []string{"integer"}, + Format: "int32", + }, + }, + "parameters": { + VendorExtensible: spec.VendorExtensible{ + Extensions: spec.Extensions{ + "x-kubernetes-list-type": "triggerParameters", + }, + }, + SchemaProps: spec.SchemaProps{ + Description: "Parameters is the list of parameters that is applied to resolved Kafka trigger object.", + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ Schema: &spec.Schema{ SchemaProps: spec.SchemaProps{ - Type: []string{"string"}, - Format: "", + Ref: ref("github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.TriggerParameter"), }, }, }, }, }, - "backoff": { + "requiredAcks": { SchemaProps: spec.SchemaProps{ - Description: "Backoff before checking resource state", - Ref: ref("k8s.io/apimachinery/pkg/util/wait.Backoff"), + Description: "RequiredAcks used in producer to tell the broker how many replica acknowledgements Defaults to 1 (Only wait for the leader to ack).", + Type: []string{"integer"}, + Format: "int32", }, }, - "errorOnBackoffTimeout": { + "compress": { SchemaProps: spec.SchemaProps{ - Description: "ErrorOnBackoffTimeout determines whether sensor should transition to error state if the trigger policy is unable to determine the state of the resource", + Description: "Compress determines whether to compress message or not. Defaults to false. If set to true, compresses message using snappy compression.", Type: []string{"boolean"}, Format: "", }, }, + "flushFrequency": { + SchemaProps: spec.SchemaProps{ + Description: "FlushFrequency refers to the frequency in milliseconds to flush batches. Defaults to 500 milliseconds.", + Type: []string{"integer"}, + Format: "int32", + }, + }, + "tls": { + SchemaProps: spec.SchemaProps{ + Description: "TLS configuration for the Kafka producer.", + Ref: ref("github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.TLSConfig"), + }, + }, + "payload": { + VendorExtensible: spec.VendorExtensible{ + Extensions: spec.Extensions{ + "x-kubernetes-list-type": "payloadParameters", + }, + }, + SchemaProps: spec.SchemaProps{ + Description: "Payload is the list of key-value extracted from an event payload to construct the request payload.", + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Ref: ref("github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.TriggerParameter"), + }, + }, + }, + }, + }, + "partitioningKey": { + SchemaProps: spec.SchemaProps{ + Description: "The partitioning key for the messages put on the Kafka topic. Defaults to broker url.", + Type: []string{"string"}, + Format: "", + }, + }, }, - Required: []string{"labels", "backoff", "errorOnBackoffTimeout"}, + Required: []string{"url", "topic", "partition", "payload"}, }, }, Dependencies: []string{ - "k8s.io/apimachinery/pkg/util/wait.Backoff"}, + "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.TLSConfig", "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.TriggerParameter"}, } } @@ -1120,6 +1193,12 @@ func schema_pkg_apis_sensor_v1alpha1_OpenFaasTrigger(ref common.ReferenceCallbac }, }, }, + "username": { + SchemaProps: spec.SchemaProps{ + Description: "Username refers to the Kubernetes secret that holds the username required to log into the gateway.", + Ref: ref("k8s.io/api/core/v1.SecretKeySelector"), + }, + }, "password": { SchemaProps: spec.SchemaProps{ Description: "Password refers to the Kubernetes secret that holds the password required to log into the gateway.", @@ -1158,14 +1237,14 @@ func schema_pkg_apis_sensor_v1alpha1_Sensor(ref common.ReferenceCallback) common Properties: map[string]spec.Schema{ "kind": { SchemaProps: spec.SchemaProps{ - Description: "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#types-kinds", + Description: "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds", Type: []string{"string"}, Format: "", }, }, "apiVersion": { SchemaProps: spec.SchemaProps{ - Description: "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#resources", + Description: "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources", Type: []string{"string"}, Format: "", }, @@ -1203,14 +1282,14 @@ func schema_pkg_apis_sensor_v1alpha1_SensorList(ref common.ReferenceCallback) co Properties: map[string]spec.Schema{ "kind": { SchemaProps: spec.SchemaProps{ - Description: "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#types-kinds", + Description: "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds", Type: []string{"string"}, Format: "", }, }, "apiVersion": { SchemaProps: spec.SchemaProps{ - Description: "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#resources", + Description: "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources", Type: []string{"string"}, Format: "", }, @@ -1589,6 +1668,41 @@ func schema_pkg_apis_sensor_v1alpha1_Subscription(ref common.ReferenceCallback) } } +func schema_pkg_apis_sensor_v1alpha1_TLSConfig(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "TLSConfig refers to TLS configuration for the HTTP client", + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "caCertPath": { + SchemaProps: spec.SchemaProps{ + Description: "CACertPath refers the file path that contains the CA cert.", + Type: []string{"string"}, + Format: "", + }, + }, + "clientCertPath": { + SchemaProps: spec.SchemaProps{ + Description: "ClientCertPath refers the file path that contains client cert.", + Type: []string{"string"}, + Format: "", + }, + }, + "clientKeyPath": { + SchemaProps: spec.SchemaProps{ + Description: "ClientKeyPath refers the file path that contains client key.", + Type: []string{"string"}, + Format: "", + }, + }, + }, + Required: []string{"caCertPath", "clientCertPath", "clientKeyPath"}, + }, + }, + } +} + func schema_pkg_apis_sensor_v1alpha1_TimeFilter(ref common.ReferenceCallback) common.OpenAPIDefinition { return common.OpenAPIDefinition{ Schema: spec.Schema{ @@ -1868,18 +1982,24 @@ func schema_pkg_apis_sensor_v1alpha1_TriggerTemplate(ref common.ReferenceCallbac Ref: ref("github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.AWSLambdaTrigger"), }, }, - "customTrigger": { + "custom": { SchemaProps: spec.SchemaProps{ Description: "CustomTrigger refers to the trigger designed to connect to a gRPC trigger server and execute a custom trigger.", Ref: ref("github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.CustomTrigger"), }, }, + "kafka": { + SchemaProps: spec.SchemaProps{ + Description: "Kafka refers to the trigger designed to place messages on Kafka topic.", + Ref: ref("github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.KafkaTrigger"), + }, + }, }, - Required: []string{"name"}, + Required: []string{"name", "kafka"}, }, }, Dependencies: []string{ - "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.AWSLambdaTrigger", "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.ArgoWorkflowTrigger", "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.CustomTrigger", "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.HTTPTrigger", "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.OpenFaasTrigger", "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.StandardK8sTrigger", "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.TriggerSwitch"}, + "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.AWSLambdaTrigger", "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.ArgoWorkflowTrigger", "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.CustomTrigger", "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.HTTPTrigger", "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.KafkaTrigger", "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.OpenFaasTrigger", "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.StandardK8sTrigger", "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.TriggerSwitch"}, } } diff --git a/pkg/apis/sensor/v1alpha1/types.go b/pkg/apis/sensor/v1alpha1/types.go index e93c9346d1..4776500746 100644 --- a/pkg/apis/sensor/v1alpha1/types.go +++ b/pkg/apis/sensor/v1alpha1/types.go @@ -279,7 +279,10 @@ type TriggerTemplate struct { AWSLambda *AWSLambdaTrigger `json:"awsLambda,omitempty" protobuf:"bytes,6,opt,name=awsLambda"` // CustomTrigger refers to the trigger designed to connect to a gRPC trigger server and execute a custom trigger. // +optional - CustomTrigger *CustomTrigger `json:"customTrigger,omitempty" protobuf:"bytes,7,opt,name=customTrigger"` + CustomTrigger *CustomTrigger `json:"custom,omitempty" protobuf:"bytes,7,opt,name=custom"` + // Kafka refers to the trigger designed to place messages on Kafka topic. + // +optional. + Kafka *KafkaTrigger `json:"kafka" ` } // TriggerSwitch describes condition which must be satisfied in order to execute a trigger. @@ -332,7 +335,7 @@ type HTTPTrigger struct { Payload []TriggerParameter `json:"payload" protobuf:"bytes,2,rep,name=payload"` // TLS configuration for the HTTP client. // +optional - TLS *HTTPTriggerTLS `json:"tls,omitempty" protobuf:"bytes,3,opt,name=tls"` + TLS *TLSConfig `json:"tls,omitempty" protobuf:"bytes,3,opt,name=tls"` // Method refers to the type of the HTTP request. // Refer https://golang.org/src/net/http/method.go for more info. // Default value is POST. @@ -348,8 +351,8 @@ type HTTPTrigger struct { Timeout int `json:"timeout,omitempty" protobuf:"bytes,6,opt,name=timeout"` } -// HTTPTriggerTLS refers to TLS configuration for the HTTP client -type HTTPTriggerTLS struct { +// TLSConfig refers to TLS configuration for the HTTP client +type TLSConfig struct { // CACertPath refers the file path that contains the CA cert. CACertPath string `json:"caCertPath" protobuf:"bytes,1,name=caCertPath"` // ClientCertPath refers the file path that contains client cert. @@ -409,6 +412,43 @@ type AWSLambdaTrigger struct { Parameters []TriggerParameter `json:"parameters,omitempty" protobuf:"bytes,7,rep,name=parameters"` } +// KafkaTrigger refers to the specification of the Kafka trigger. +type KafkaTrigger struct { + // URL of the Kafka broker. + URL string `json:"url" protobuf:"bytes,1,name=url"` + // Name of the topic. + // More info at https://kafka.apache.org/documentation/#intro_topics + Topic string `json:"topic" protobuf:"bytes,2,name=topic"` + // Partition to write data to. + Partition int `json:"partition" protobuf:"bytes,3,name=partition"` + // Parameters is the list of parameters that is applied to resolved Kafka trigger object. + // +listType=triggerParameters + Parameters []TriggerParameter `json:"parameters,omitempty" protobuf:"bytes,4,rep,name=parameters"` + // RequiredAcks used in producer to tell the broker how many replica acknowledgements + // Defaults to 1 (Only wait for the leader to ack). + // +optional. + RequiredAcks int `json:"requiredAcks,omitempty" protobuf:"bytes,5,opt,name=requiredAcks"` + // Compress determines whether to compress message or not. + // Defaults to false. + // If set to true, compresses message using snappy compression. + // +optional + Compress bool `json:"compress,omitempty" protobuf:"bytes,6,opt,name=compress"` + // FlushFrequency refers to the frequency in milliseconds to flush batches. + // Defaults to 500 milliseconds. + // +optional + FlushFrequency int `json:"flushFrequency,omitempty" protobuf:"bytes,7,opt,name=flushFrequency"` + // TLS configuration for the Kafka producer. + // +optional + TLS *TLSConfig `json:"tls,omitempty" protobuf:"bytes,8,opt,name=tls"` + // Payload is the list of key-value extracted from an event payload to construct the request payload. + // +listType=payloadParameters + Payload []TriggerParameter `json:"payload" protobuf:"bytes,9,rep,name=payload"` + // The partitioning key for the messages put on the Kafka topic. + // Defaults to broker url. + // +optional. + PartitioningKey string `json:"partitioningKey,omitempty" protobuf:"bytes,10,opt,name=partitioningKey"` +} + // CustomTrigger refers to the specification of the custom trigger. type CustomTrigger struct { // ServerURL is the url of the gRPC server that executes custom trigger diff --git a/pkg/apis/sensor/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/sensor/v1alpha1/zz_generated.deepcopy.go index c0ab76216b..65165d0465 100644 --- a/pkg/apis/sensor/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/sensor/v1alpha1/zz_generated.deepcopy.go @@ -425,7 +425,7 @@ func (in *HTTPTrigger) DeepCopyInto(out *HTTPTrigger) { } if in.TLS != nil { in, out := &in.TLS, &out.TLS - *out = new(HTTPTriggerTLS) + *out = new(TLSConfig) **out = **in } if in.Parameters != nil { @@ -449,41 +449,60 @@ func (in *HTTPTrigger) DeepCopy() *HTTPTrigger { } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *HTTPTriggerTLS) DeepCopyInto(out *HTTPTriggerTLS) { +func (in *K8sResourcePolicy) DeepCopyInto(out *K8sResourcePolicy) { *out = *in + if in.Labels != nil { + in, out := &in.Labels, &out.Labels + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + out.Backoff = in.Backoff return } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new HTTPTriggerTLS. -func (in *HTTPTriggerTLS) DeepCopy() *HTTPTriggerTLS { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new K8sResourcePolicy. +func (in *K8sResourcePolicy) DeepCopy() *K8sResourcePolicy { if in == nil { return nil } - out := new(HTTPTriggerTLS) + out := new(K8sResourcePolicy) in.DeepCopyInto(out) return out } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *K8sResourcePolicy) DeepCopyInto(out *K8sResourcePolicy) { +func (in *KafkaTrigger) DeepCopyInto(out *KafkaTrigger) { *out = *in - if in.Labels != nil { - in, out := &in.Labels, &out.Labels - *out = make(map[string]string, len(*in)) - for key, val := range *in { - (*out)[key] = val + if in.Parameters != nil { + in, out := &in.Parameters, &out.Parameters + *out = make([]TriggerParameter, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + if in.TLS != nil { + in, out := &in.TLS, &out.TLS + *out = new(TLSConfig) + **out = **in + } + if in.Payload != nil { + in, out := &in.Payload, &out.Payload + *out = make([]TriggerParameter, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) } } - out.Backoff = in.Backoff return } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new K8sResourcePolicy. -func (in *K8sResourcePolicy) DeepCopy() *K8sResourcePolicy { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KafkaTrigger. +func (in *KafkaTrigger) DeepCopy() *KafkaTrigger { if in == nil { return nil } - out := new(K8sResourcePolicy) + out := new(KafkaTrigger) in.DeepCopyInto(out) return out } @@ -546,6 +565,11 @@ func (in *OpenFaasTrigger) DeepCopyInto(out *OpenFaasTrigger) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.Username != nil { + in, out := &in.Username, &out.Username + *out = new(v1.SecretKeySelector) + (*in).DeepCopyInto(*out) + } if in.Password != nil { in, out := &in.Password, &out.Password *out = new(v1.SecretKeySelector) @@ -823,6 +847,22 @@ func (in *Subscription) DeepCopy() *Subscription { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TLSConfig) DeepCopyInto(out *TLSConfig) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TLSConfig. +func (in *TLSConfig) DeepCopy() *TLSConfig { + if in == nil { + return nil + } + out := new(TLSConfig) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TimeFilter) DeepCopyInto(out *TimeFilter) { *out = *in @@ -1004,6 +1044,11 @@ func (in *TriggerTemplate) DeepCopyInto(out *TriggerTemplate) { *out = new(CustomTrigger) (*in).DeepCopyInto(*out) } + if in.Kafka != nil { + in, out := &in.Kafka, &out.Kafka + *out = new(KafkaTrigger) + (*in).DeepCopyInto(*out) + } return } diff --git a/pkg/client/eventsources/clientset/versioned/clientset.go b/pkg/client/eventsources/clientset/versioned/clientset.go index 05e021eb56..7b26a8343c 100644 --- a/pkg/client/eventsources/clientset/versioned/clientset.go +++ b/pkg/client/eventsources/clientset/versioned/clientset.go @@ -19,6 +19,8 @@ limitations under the License. package versioned import ( + "fmt" + argoprojv1alpha1 "github.com/argoproj/argo-events/pkg/client/eventsources/clientset/versioned/typed/eventsources/v1alpha1" discovery "k8s.io/client-go/discovery" rest "k8s.io/client-go/rest" @@ -51,9 +53,14 @@ func (c *Clientset) Discovery() discovery.DiscoveryInterface { } // NewForConfig creates a new Clientset for the given config. +// If config's RateLimiter is not set and QPS and Burst are acceptable, +// NewForConfig will generate a rate-limiter in configShallowCopy. func NewForConfig(c *rest.Config) (*Clientset, error) { configShallowCopy := *c if configShallowCopy.RateLimiter == nil && configShallowCopy.QPS > 0 { + if configShallowCopy.Burst <= 0 { + return nil, fmt.Errorf("Burst is required to be greater than 0 when RateLimiter is not set and QPS is set to greater than 0") + } configShallowCopy.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(configShallowCopy.QPS, configShallowCopy.Burst) } var cs Clientset diff --git a/pkg/client/eventsources/informers/externalversions/eventsources/interface.go b/pkg/client/eventsources/informers/externalversions/eventsources/interface.go index 18b8d46292..bfcb4acf8e 100644 --- a/pkg/client/eventsources/informers/externalversions/eventsources/interface.go +++ b/pkg/client/eventsources/informers/externalversions/eventsources/interface.go @@ -16,7 +16,7 @@ limitations under the License. // Code generated by informer-gen. DO NOT EDIT. -package argoproj +package eventsources import ( v1alpha1 "github.com/argoproj/argo-events/pkg/client/eventsources/informers/externalversions/eventsources/v1alpha1" diff --git a/pkg/client/gateway/clientset/versioned/clientset.go b/pkg/client/gateway/clientset/versioned/clientset.go index 1f83220edd..a898c171d0 100644 --- a/pkg/client/gateway/clientset/versioned/clientset.go +++ b/pkg/client/gateway/clientset/versioned/clientset.go @@ -19,6 +19,8 @@ limitations under the License. package versioned import ( + "fmt" + argoprojv1alpha1 "github.com/argoproj/argo-events/pkg/client/gateway/clientset/versioned/typed/gateway/v1alpha1" discovery "k8s.io/client-go/discovery" rest "k8s.io/client-go/rest" @@ -51,9 +53,14 @@ func (c *Clientset) Discovery() discovery.DiscoveryInterface { } // NewForConfig creates a new Clientset for the given config. +// If config's RateLimiter is not set and QPS and Burst are acceptable, +// NewForConfig will generate a rate-limiter in configShallowCopy. func NewForConfig(c *rest.Config) (*Clientset, error) { configShallowCopy := *c if configShallowCopy.RateLimiter == nil && configShallowCopy.QPS > 0 { + if configShallowCopy.Burst <= 0 { + return nil, fmt.Errorf("Burst is required to be greater than 0 when RateLimiter is not set and QPS is set to greater than 0") + } configShallowCopy.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(configShallowCopy.QPS, configShallowCopy.Burst) } var cs Clientset diff --git a/pkg/client/gateway/informers/externalversions/gateway/interface.go b/pkg/client/gateway/informers/externalversions/gateway/interface.go index 7a05bf37f6..e9d8ebdaf8 100644 --- a/pkg/client/gateway/informers/externalversions/gateway/interface.go +++ b/pkg/client/gateway/informers/externalversions/gateway/interface.go @@ -16,7 +16,7 @@ limitations under the License. // Code generated by informer-gen. DO NOT EDIT. -package argoproj +package gateway import ( v1alpha1 "github.com/argoproj/argo-events/pkg/client/gateway/informers/externalversions/gateway/v1alpha1" diff --git a/pkg/client/sensor/clientset/versioned/clientset.go b/pkg/client/sensor/clientset/versioned/clientset.go index 1e30aae3fa..1bc973daac 100644 --- a/pkg/client/sensor/clientset/versioned/clientset.go +++ b/pkg/client/sensor/clientset/versioned/clientset.go @@ -19,6 +19,8 @@ limitations under the License. package versioned import ( + "fmt" + argoprojv1alpha1 "github.com/argoproj/argo-events/pkg/client/sensor/clientset/versioned/typed/sensor/v1alpha1" discovery "k8s.io/client-go/discovery" rest "k8s.io/client-go/rest" @@ -51,9 +53,14 @@ func (c *Clientset) Discovery() discovery.DiscoveryInterface { } // NewForConfig creates a new Clientset for the given config. +// If config's RateLimiter is not set and QPS and Burst are acceptable, +// NewForConfig will generate a rate-limiter in configShallowCopy. func NewForConfig(c *rest.Config) (*Clientset, error) { configShallowCopy := *c if configShallowCopy.RateLimiter == nil && configShallowCopy.QPS > 0 { + if configShallowCopy.Burst <= 0 { + return nil, fmt.Errorf("Burst is required to be greater than 0 when RateLimiter is not set and QPS is set to greater than 0") + } configShallowCopy.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(configShallowCopy.QPS, configShallowCopy.Burst) } var cs Clientset diff --git a/pkg/client/sensor/informers/externalversions/sensor/interface.go b/pkg/client/sensor/informers/externalversions/sensor/interface.go index 4bd95f6cb1..d958b095ec 100644 --- a/pkg/client/sensor/informers/externalversions/sensor/interface.go +++ b/pkg/client/sensor/informers/externalversions/sensor/interface.go @@ -16,7 +16,7 @@ limitations under the License. // Code generated by informer-gen. DO NOT EDIT. -package argoproj +package sensor import ( internalinterfaces "github.com/argoproj/argo-events/pkg/client/sensor/informers/externalversions/internalinterfaces" diff --git a/sensors/context.go b/sensors/context.go index de916d0af7..51caf3131f 100644 --- a/sensors/context.go +++ b/sensors/context.go @@ -16,6 +16,7 @@ limitations under the License. package sensors import ( + "github.com/Shopify/sarama" "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1" sensorclientset "github.com/argoproj/argo-events/pkg/client/sensor/clientset/versioned" @@ -50,6 +51,8 @@ type SensorContext struct { customTriggerClients map[string]*grpc.ClientConn // http client to invoke openfaas functions. openfaasHttpClient *http.Client + // kafkaProducers holds references to the active kafka producers + kafkaProducers map[string]sarama.AsyncProducer } // NewSensorContext returns a new sensor execution context. @@ -66,5 +69,6 @@ func NewSensorContext(sensorClient sensorclientset.Interface, kubeClient kuberne openfaasHttpClient: &http.Client{ Timeout: time.Minute * 5, }, + kafkaProducers: make(map[string]sarama.AsyncProducer), } } diff --git a/sensors/trigger.go b/sensors/trigger.go index 2f77be4cdd..e464f7421a 100644 --- a/sensors/trigger.go +++ b/sensors/trigger.go @@ -21,6 +21,7 @@ import ( awslambda "github.com/argoproj/argo-events/sensors/triggers/aws-lambda" customtrigger "github.com/argoproj/argo-events/sensors/triggers/custom-trigger" "github.com/argoproj/argo-events/sensors/triggers/http" + "github.com/argoproj/argo-events/sensors/triggers/kafka" "github.com/argoproj/argo-events/sensors/triggers/openfaas" standardk8s "github.com/argoproj/argo-events/sensors/triggers/standard-k8s" ) @@ -64,6 +65,15 @@ func (sensorCtx *SensorContext) GetTrigger(trigger *v1alpha1.Trigger) Trigger { return awslambda.NewAWSLambdaTrigger(sensorCtx.KubeClient, sensorCtx.Sensor, trigger, sensorCtx.Logger) } + if trigger.Template.Kafka != nil { + result, err := kafka.NewKafkaTrigger(sensorCtx.Sensor, trigger, sensorCtx.kafkaProducers, sensorCtx.Logger) + if err != nil { + sensorCtx.Logger.WithError(err).WithField("trigger", trigger.Template.Name).Errorln("failed to invoke the trigger") + return nil + } + return result + } + if trigger.Template.CustomTrigger != nil { result, err := customtrigger.NewCustomTrigger(sensorCtx.Sensor, trigger, sensorCtx.Logger, sensorCtx.customTriggerClients) if err != nil { diff --git a/sensors/triggers/kafka/kafka.go b/sensors/triggers/kafka/kafka.go new file mode 100644 index 0000000000..8f4d8539c1 --- /dev/null +++ b/sensors/triggers/kafka/kafka.go @@ -0,0 +1,180 @@ +/* +Copyright 2020 BlackRock, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package kafka + +import ( + "crypto/tls" + "crypto/x509" + "encoding/json" + "io/ioutil" + "time" + + "github.com/Shopify/sarama" + "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1" + "github.com/argoproj/argo-events/sensors/triggers" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +// KafkaTrigger describes the trigger to place messages on Kafka topic using a producer +type KafkaTrigger struct { + // Sensor object + Sensor *v1alpha1.Sensor + // Trigger reference + Trigger *v1alpha1.Trigger + // Kafka async producer + Producer sarama.AsyncProducer + // Logger to log stuff + Logger *logrus.Logger +} + +// NewKafkaTrigger returns a new kafka trigger context. +func NewKafkaTrigger(sensor *v1alpha1.Sensor, trigger *v1alpha1.Trigger, kafkaProducers map[string]sarama.AsyncProducer, logger *logrus.Logger) (*KafkaTrigger, error) { + kafkatrigger := trigger.Template.Kafka + + producer, ok := kafkaProducers[trigger.Template.Name] + if !ok { + config := sarama.NewConfig() + + if kafkatrigger.TLS != nil { + if kafkatrigger.TLS.ClientCertPath != "" && kafkatrigger.TLS.ClientKeyPath != "" && kafkatrigger.TLS.CACertPath != "" { + cert, err := tls.LoadX509KeyPair(kafkatrigger.TLS.ClientCertPath, kafkatrigger.TLS.ClientKeyPath) + if err != nil { + return nil, err + } + + caCert, err := ioutil.ReadFile(kafkatrigger.TLS.CACertPath) + if err != nil { + return nil, err + } + + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + + config.Net.TLS.Config = &tls.Config{ + Certificates: []tls.Certificate{cert}, + RootCAs: caCertPool, + InsecureSkipVerify: true, + } + config.Net.TLS.Enable = true + } + } + + if kafkatrigger.Compress { + config.Producer.Compression = sarama.CompressionSnappy + } + + ff := 500 + if kafkatrigger.FlushFrequency != 0 { + ff = kafkatrigger.FlushFrequency + } + config.Producer.Flush.Frequency = time.Duration(ff) + + ra := sarama.WaitForAll + if kafkatrigger.RequiredAcks != 0 { + ra = sarama.RequiredAcks(kafkatrigger.RequiredAcks) + } + config.Producer.RequiredAcks = ra + + producer, err := sarama.NewAsyncProducer([]string{kafkatrigger.URL}, config) + if err != nil { + return nil, err + } + + kafkaProducers[trigger.Template.Name] = producer + } + + return &KafkaTrigger{ + Sensor: sensor, + Trigger: trigger, + Producer: producer, + Logger: logger, + }, nil +} + +// FetchResource fetches the trigger. As the Kafka trigger is simply a Kafka producer, there +// is no need to fetch any resource from external source +func (t *KafkaTrigger) FetchResource() (interface{}, error) { + return t.Trigger.Template.Kafka, nil +} + +// ApplyResourceParameters applies parameters to the trigger resource +func (t *KafkaTrigger) ApplyResourceParameters(sensor *v1alpha1.Sensor, resource interface{}) (interface{}, error) { + fetchedResource, ok := resource.(*v1alpha1.KafkaTrigger) + if !ok { + return nil, errors.New("failed to interpret the fetched trigger resource") + } + + resourceBytes, err := json.Marshal(fetchedResource) + if err != nil { + return nil, errors.Wrap(err, "failed to marshal the kafka trigger resource") + } + parameters := fetchedResource.Parameters + if parameters != nil { + updatedResourceBytes, err := triggers.ApplyParams(resourceBytes, parameters, triggers.ExtractEvents(sensor, parameters)) + if err != nil { + return nil, err + } + var ht *v1alpha1.KafkaTrigger + if err := json.Unmarshal(updatedResourceBytes, &ht); err != nil { + return nil, errors.Wrap(err, "failed to unmarshal the updated kafka trigger resource after applying resource parameters") + } + return ht, nil + } + return resource, nil +} + +// Execute executes the trigger +func (t *KafkaTrigger) Execute(resource interface{}) (interface{}, error) { + trigger, ok := resource.(*v1alpha1.KafkaTrigger) + if !ok { + return nil, errors.New("failed to interpret the trigger resource") + } + + if trigger.Payload == nil { + return nil, errors.New("payload parameters are not specified") + } + + payload, err := triggers.ConstructPayload(t.Sensor, trigger.Payload) + if err != nil { + return nil, err + } + + pk := trigger.PartitioningKey + if pk == "" { + pk = trigger.URL + } + + t.Producer.Input() <- &sarama.ProducerMessage{ + Topic: trigger.Topic, + Key: sarama.StringEncoder(pk), + Value: sarama.ByteEncoder(payload), + Partition: int32(trigger.Partition), + Timestamp: time.Now().UTC(), + } + + t.Logger.WithFields(map[string]interface{}{ + "topic": trigger.Topic, + "partition": trigger.Partition, + }).Infoln("successfully produced a message") + + return nil, nil +} + +// ApplyPolicy applies policy on the trigger +func (t *KafkaTrigger) ApplyPolicy(resource interface{}) error { + return nil +} diff --git a/sensors/triggers/kafka/kafka_test.go b/sensors/triggers/kafka/kafka_test.go new file mode 100644 index 0000000000..040812639a --- /dev/null +++ b/sensors/triggers/kafka/kafka_test.go @@ -0,0 +1,172 @@ +/* +Copyright 2020 BlackRock, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package kafka + +import ( + "testing" + + "github.com/Shopify/sarama" + "github.com/Shopify/sarama/mocks" + "github.com/argoproj/argo-events/common" + apicommon "github.com/argoproj/argo-events/pkg/apis/common" + "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1" + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var sensorObj = &v1alpha1.Sensor{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fake-sensor", + Namespace: "fake", + }, + Spec: v1alpha1.SensorSpec{ + Triggers: []v1alpha1.Trigger{ + { + Template: &v1alpha1.TriggerTemplate{ + Name: "fake-trigger", + Kafka: &v1alpha1.KafkaTrigger{ + URL: "fake-kafka-url", + Topic: "fake-topic", + Partition: 0, + Parameters: nil, + RequiredAcks: 1, + Compress: false, + FlushFrequency: 0, + TLS: nil, + Payload: nil, + PartitioningKey: "", + }, + }, + }, + }, + }, +} + +func getFakeKafkaTrigger(producers map[string]sarama.AsyncProducer) (*KafkaTrigger, error) { + return NewKafkaTrigger(sensorObj.DeepCopy(), sensorObj.Spec.Triggers[0].DeepCopy(), producers, common.NewArgoEventsLogger()) +} + +func TestKafkaTrigger_FetchResource(t *testing.T) { + producer := mocks.NewAsyncProducer(t, nil) + trigger, err := getFakeKafkaTrigger(map[string]sarama.AsyncProducer{ + "fake-trigger": producer, + }) + assert.Nil(t, err) + obj, err := trigger.FetchResource() + assert.Nil(t, err) + assert.NotNil(t, obj) + trigger1, ok := obj.(*v1alpha1.KafkaTrigger) + assert.Equal(t, true, ok) + assert.Equal(t, trigger.Trigger.Template.Kafka.URL, trigger1.URL) +} + +func TestKafkaTrigger_ApplyResourceParameters(t *testing.T) { + producer := mocks.NewAsyncProducer(t, nil) + trigger, err := getFakeKafkaTrigger(map[string]sarama.AsyncProducer{ + "fake-trigger": producer, + }) + assert.Nil(t, err) + id := trigger.Sensor.NodeID("fake-dependency") + trigger.Sensor.Status = v1alpha1.SensorStatus{ + Nodes: map[string]v1alpha1.NodeStatus{ + id: { + Name: "fake-dependency", + Type: v1alpha1.NodeTypeEventDependency, + ID: id, + Event: &apicommon.Event{ + Context: apicommon.EventContext{ + ID: "1", + Type: "webhook", + Source: "webhook-gateway", + DataContentType: "application/json", + SpecVersion: "0.3", + Subject: "example-1", + }, + Data: []byte(`{"url": "another-fake-kafka-url"}`), + }, + }, + }, + } + + defaultValue := "http://default.com" + + trigger.Trigger.Template.Kafka.Parameters = []v1alpha1.TriggerParameter{ + { + Src: &v1alpha1.TriggerParameterSource{ + DependencyName: "fake-dependency", + DataKey: "url", + Value: &defaultValue, + }, + Dest: "url", + }, + } + + resource, err := trigger.ApplyResourceParameters(trigger.Sensor, trigger.Trigger.Template.Kafka) + assert.Nil(t, err) + assert.NotNil(t, resource) + + updatedTrigger, ok := resource.(*v1alpha1.KafkaTrigger) + assert.Nil(t, err) + assert.Equal(t, true, ok) + assert.Equal(t, "another-fake-kafka-url", updatedTrigger.URL) +} + +func TestKafkaTrigger_Execute(t *testing.T) { + producer := mocks.NewAsyncProducer(t, nil) + trigger, err := getFakeKafkaTrigger(map[string]sarama.AsyncProducer{ + "fake-trigger": producer, + }) + assert.Nil(t, err) + id := trigger.Sensor.NodeID("fake-dependency") + trigger.Sensor.Status = v1alpha1.SensorStatus{ + Nodes: map[string]v1alpha1.NodeStatus{ + id: { + Name: "fake-dependency", + Type: v1alpha1.NodeTypeEventDependency, + ID: id, + Event: &apicommon.Event{ + Context: apicommon.EventContext{ + ID: "1", + Type: "webhook", + Source: "webhook-gateway", + DataContentType: "application/json", + SpecVersion: "0.3", + Subject: "example-1", + }, + Data: []byte(`{"message": "world"}`), + }, + }, + }, + } + defaultValue := "hello" + + trigger.Trigger.Template.Kafka.Payload = []v1alpha1.TriggerParameter{ + { + Src: &v1alpha1.TriggerParameterSource{ + DependencyName: "fake-dependency", + DataKey: "message", + Value: &defaultValue, + }, + Dest: "message", + }, + } + + producer.ExpectInputAndSucceed() + + result, err := trigger.Execute(trigger.Trigger.Template.Kafka) + assert.Nil(t, err) + assert.Nil(t, result) +}