Skip to content

Commit

Permalink
Added support for passing complete event payload from signal to trigg…
Browse files Browse the repository at this point in the history
…er. Updated docs (#94)
  • Loading branch information
VaibhavPage authored Oct 7, 2018
1 parent 3272f1f commit 9acb556
Show file tree
Hide file tree
Showing 55 changed files with 1,028 additions and 301 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ minikube start
eval $(minikube docker-env)
```

#### 5. Build the project & Docker images
#### 5. Build the project
```
make all
```
Expand Down
18 changes: 15 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ endif

# Build the project images
.DELETE_ON_ERROR:
all: sensor-linux sensor-controller-linux gateway-controller-linux gateway-transformer-linux webhook-linux calendar-linux artifact-linux nats-linux kafka-linux amqp-linux mqtt-linux
all: sensor-linux sensor-controller-linux gateway-controller-linux gateway-http-transformer-linux webhook-linux calendar-linux resource-linux artifact-linux nats-linux kafka-linux amqp-linux mqtt-linux gateway-processor-grpc-client-linux calendar-grpc-linux gateway-processor-http-client-linux calendar-http-linux

all-images: sensor-image sensor-controller-image gateway-controller-image gateway-http-transformer-image webhook-image calendar-image artifact-image nats-image kafka-image amqp-image mqtt-image gateway-processor-grpc-client-image calendar-grpc-image gateway-processor-http-client-image calendar-http-image
all-images: sensor-image sensor-controller-image gateway-controller-image gateway-http-transformer-image webhook-image calendar-image resource-image artifact-image nats-image kafka-image amqp-image mqtt-image gateway-processor-grpc-client-image calendar-grpc-image gateway-processor-http-client-image calendar-http-image

all-controller-images: sensor-controller-image gateway-controller-image

all-gateway-images: webhook-image calendar-image artifact-image nats-image kafka-image amqp-image mqtt-image
all-core-gateway-images: webhook-image calendar-image artifact-image nats-image kafka-image amqp-image mqtt-image resource-image

.PHONY: all clean test

Expand Down Expand Up @@ -115,6 +115,18 @@ calendar-image: calendar-linux
@if [ "$(DOCKER_PUSH)" = "true" ] ; then docker push $(IMAGE_PREFIX)calendar-gateway:$(IMAGE_TAG) ; fi


resource:
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -v -ldflags '${LDFLAGS}' -o ${DIST_DIR}/resource-gateway ./gateways/core/resource/

resource-linux:
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 make resource

resource-image: resource-linux
docker build -t $(IMAGE_PREFIX)resource-gateway:$(IMAGE_TAG) -f ./gateways/core/resource/Dockerfile .
@if [ "$(DOCKER_PUSH)" = "true" ] ; then docker push $(IMAGE_PREFIX)resource-gateway:$(IMAGE_TAG) ; fi



artifact:
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -v -ldflags '${LDFLAGS}' -o ${DIST_DIR}/artifact-gateway ./gateways/core/artifact/

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Argo Events is an open source event-based dependency manager for Kubernetes. The
- Runtime agnostic. The first runtime and package agnostic event framework for Kubernetes.
- Containers. Designed from the ground-up as Kubernetes-native.
- Extremely lightweight. All gateways, with the exception of calendar-based gateways, are event-driven, meaning that there is no polling involved.
- Configurable. Select gateways you want to support, deploy those to Kubernetes and configure them on the fly
- Configurable. Configure gateways at runtime
- Scalable & Resilient.
- Simple or Complex dependencies. Manage everything from simple, linear, real-time dependencies to complex, multi-source, batch job dependencies.

Expand Down
2 changes: 1 addition & 1 deletion cmd/controllers/gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func main() {

namespace, ok := os.LookupEnv(common.GatewayNamespace)
if !ok {
namespace = common.DefaultGatewayControllerNamespace
namespace = common.DefaultControllerNamespace
}

// create new gateway controller
Expand Down
2 changes: 1 addition & 1 deletion cmd/controllers/sensor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func main() {

namespace, ok := os.LookupEnv(common.SensorNamespace)
if !ok {
namespace = common.DefaultGatewayControllerNamespace
namespace = common.DefaultControllerNamespace
}

// create a new sensor controller
Expand Down
3 changes: 3 additions & 0 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ const (

// StandardTimeFormat is time format reference for golang
StandardTimeFormat = "2006-01-02 15:04:05"

// StandardYYYYMMDDFormat formats date in yyyy-mm-dd format
StandardYYYYMMDDFormat = "2006-01-02"
)

// SENSOR CONTROLLER CONSTANTS
Expand Down
16 changes: 6 additions & 10 deletions common/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,11 @@ import (
const namespacePath = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"

var (
// DefaultSensorControllerNamespace is the default namespace where the sensor sensor-controller is installed
DefaultSensorControllerNamespace = "argo-events"

// Todo: Does this even have to be separate
// DefaultGatewayControllerNamespace is the default namespace where the sensor sensor-controller is installed
DefaultGatewayControllerNamespace = "argo-events"
// DefaultControllerNamespace is the default namespace where the sensor and gateways controllers are installed
DefaultControllerNamespace = "argo-events"

// ErrReadNamespace occurs when the namespace cannot be read from a Kubernetes pod's service account token
ErrReadNamespace = errors.New("Could not read namespace from service account secret")
ErrReadNamespace = errors.New("could not read namespace from service account secret")
)

func init() {
Expand Down Expand Up @@ -61,16 +57,16 @@ func RefreshNamespace() {
// 1 - env variable
nm, ok := os.LookupEnv(EnvVarNamespace)
if ok {
DefaultSensorControllerNamespace = nm
DefaultControllerNamespace = nm
return
}

// 2 - pod service account token
nm, err := detectNamespace()
if err == nil {
DefaultSensorControllerNamespace = nm
DefaultControllerNamespace = nm
}

// 3 - use the DefaultSensorControllerNamespace
// 3 - use the DefaultControllerNamespace
return
}
5 changes: 2 additions & 3 deletions common/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ import (
func TestResolveNamespace(t *testing.T) {
defer os.Unsetenv(EnvVarNamespace)

RefreshNamespace()
assert.Equal(t, "argo-events", DefaultSensorControllerNamespace)
assert.Equal(t, "argo-events", DefaultControllerNamespace)

// TODO: now write the namespace file

Expand All @@ -37,5 +36,5 @@ func TestResolveNamespace(t *testing.T) {
}

RefreshNamespace()
assert.Equal(t, "test", DefaultSensorControllerNamespace)
assert.Equal(t, "test", DefaultControllerNamespace)
}
2 changes: 1 addition & 1 deletion controllers/gateway/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (c *GatewayController) updateConfig(cm *apiv1.ConfigMap) error {
return err
}
if config.Namespace == "" {
config.Namespace = common.DefaultGatewayControllerNamespace
config.Namespace = common.DefaultControllerNamespace
}
c.Config = config
return nil
Expand Down
2 changes: 1 addition & 1 deletion controllers/gateway/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,5 @@ func TestGatewayController_ResyncConfig(t *testing.T) {
assert.NotNil(t, cm)
assert.NotNil(t, gc.Config)
assert.NotEqual(t, gc.Config.Namespace, gc.ConfigMapNS)
assert.Equal(t, gc.Config.Namespace, common.DefaultGatewayControllerNamespace)
assert.Equal(t, gc.Config.Namespace, common.DefaultControllerNamespace)
}
2 changes: 1 addition & 1 deletion controllers/sensor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (c *SensorController) updateConfig(cm *apiv1.ConfigMap) error {
return err
}
if config.Namespace == "" {
config.Namespace = common.DefaultSensorControllerNamespace
config.Namespace = common.DefaultControllerNamespace
}
c.Config = config
return nil
Expand Down
6 changes: 2 additions & 4 deletions controllers/sensor/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,18 @@ import (
"context"
"errors"
"time"

"fmt"
"log"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"

base "github.com/argoproj/argo-events"
"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
sensorclientset "github.com/argoproj/argo-events/pkg/client/sensor/clientset/versioned"
"fmt"
"log"
)

const (
Expand Down
1 change: 0 additions & 1 deletion controllers/sensor/notification-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ func (se *sensorExecutionCtx) processSignal(gwEventWrapper *sensorEventWrapper)

// apply filters if any.
// error is thrown if some problem occurs during filtering the signal
// apply filters if any
ok, err := se.filterEvent(gwEventWrapper.signal.Filters, gwEventWrapper.event)
if err != nil {
se.log.Error().Err(err).Str("signal-name", gwEventWrapper.event.Context.Source.Host).Err(err).Msg("failed to apply filter")
Expand Down
10 changes: 3 additions & 7 deletions controllers/sensor/signal-filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,9 @@ func (se *sensorExecutionCtx) filterTime(timeFilter *v1alpha1.TimeFilter, eventT
if timeFilter != nil {
se.log.Info().Str("event-time", eventTime.String()).Msg("event time")
currentT := time.Now().UTC()
se.log.Info().Str("current-time", currentT.String()).Msg("current time")
currentMonth := fmt.Sprintf("%d", int(currentT.Month()))
if int(currentT.Month()) < 10 {
currentMonth = "0" + currentMonth
}
currentTStr := fmt.Sprintf("%d-%s-%d", currentT.Year(), currentMonth, currentT.Day())

currentT = time.Date(currentT.Year(), currentT.Month(), currentT.Day(), 0, 0, 0, 0, time.UTC)
currentTStr := currentT.Format(common.StandardYYYYMMDDFormat)
se.log.Info().Str("date", currentTStr).Msg("current date")
if timeFilter.Start != "" && timeFilter.Stop != "" {
se.log.Info().Str("start time format", currentTStr+" "+timeFilter.Start).Msg("start time format")
startTime, err := time.Parse(common.StandardTimeFormat, currentTStr+" "+timeFilter.Start)
Expand Down
9 changes: 3 additions & 6 deletions controllers/sensor/signal-filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ limitations under the License.
package sensor

import (
"fmt"
"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
"github.com/stretchr/testify/assert"
Expand All @@ -36,12 +35,10 @@ func Test_filterTime(t *testing.T) {
Start: "10:11:00",
}
event := getCloudEvent()

currentT := time.Now().UTC()
currentMonth := fmt.Sprintf("%d", int(currentT.Month()))
if int(currentT.Month()) < 10 {
currentMonth = "0" + currentMonth
}
currentTStr := fmt.Sprintf("%d-%s-%d", currentT.Year(), currentMonth, currentT.Day())
currentT = time.Date(currentT.Year(), currentT.Month(), currentT.Day(), 0, 0, 0, 0, time.UTC)
currentTStr := currentT.Format(common.StandardYYYYMMDDFormat)
parsedTime, err := time.Parse(common.StandardTimeFormat, currentTStr+" 16:36:34")
assert.Nil(t, err)
event.Context.EventTime = metav1.MicroTime{
Expand Down
4 changes: 4 additions & 0 deletions controllers/sensor/trigger-params.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ func resolveParamValue(src *v1alpha1.ResourceParameterSource, events map[string]
}
return "", err
}
// check if complete payload needs to be passed to the trigger
if src.Path == "" {
return string(js), nil
}
res := gjson.GetBytes(js, src.Path)
if res.Exists() {
return res.String(), nil
Expand Down
8 changes: 4 additions & 4 deletions controllers/sensor/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// ValidateSensor accepts a sensor and performs validation against it
Expand Down Expand Up @@ -83,8 +82,9 @@ func validateSignalFilter(filter v1alpha1.SignalFilter) error {
}

func validateSignalTimeFilter(tFilter *v1alpha1.TimeFilter) error {
currentT := metav1.Time{Time: time.Now().UTC()}
currentTStr := fmt.Sprintf("%d-%d-%d", currentT.Year(), int(currentT.Month()), currentT.Day())
currentT := time.Now().UTC()
currentT = time.Date(currentT.Year(), currentT.Month(), currentT.Day(), 0, 0, 0, 0, time.UTC)
currentTStr := currentT.Format(common.StandardYYYYMMDDFormat)
if tFilter.Start != "" && tFilter.Stop != "" {
startTime, err := time.Parse(common.StandardTimeFormat, currentTStr+" "+tFilter.Start)
if err != nil {
Expand All @@ -104,7 +104,7 @@ func validateSignalTimeFilter(tFilter *v1alpha1.TimeFilter) error {
return err
}
stopTime = stopTime.UTC()
if stopTime.Before(currentT.Time) {
if stopTime.Before(currentT.UTC()) {
return fmt.Errorf("invalid signal time filter: stop '%s' is before the current time '%s'", tFilter.Stop, currentT)
}
}
Expand Down
4 changes: 1 addition & 3 deletions docs/artifact-guide.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# Artifact Guide
This is a guide for help in utilizing artifacts within Argo Events. Sensors use artifacts for two purposes:
1. Object notifications for use in `Artifact` signals. (currently S3 bucket notifications are only supported)
2. A Resource Object store for use in `Resource` triggers
This is a guide for help in utilizing artifacts within Argo Events. Sensors use artifacts for Resource Object store for use in `Resource` triggers

## Inline
Inlined artifacts are included directly within the sensor resource and decoded as a string.
Expand Down
Loading

0 comments on commit 9acb556

Please sign in to comment.