Skip to content

Commit

Permalink
feat: jetstream eventbus controller implementation (#1705)
Browse files Browse the repository at this point in the history
  • Loading branch information
whynowy authored Mar 8, 2022
1 parent 4970987 commit 475baac
Show file tree
Hide file tree
Showing 42 changed files with 5,555 additions and 1,182 deletions.
360 changes: 360 additions & 0 deletions api/event-bus.html

Large diffs are not rendered by default.

350 changes: 350 additions & 0 deletions api/event-bus.md

Large diffs are not rendered by default.

109 changes: 109 additions & 0 deletions api/jsonschema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,9 @@
"io.argoproj.eventbus.v1alpha1.BusConfig": {
"description": "BusConfig has the finalized configuration for EventBus",
"properties": {
"jetstream": {
"$ref": "#/definitions/io.argoproj.eventbus.v1alpha1.JetStreamConfig"
},
"nats": {
"$ref": "#/definitions/io.argoproj.eventbus.v1alpha1.NATSConfig"
}
Expand Down Expand Up @@ -348,6 +351,9 @@
"io.argoproj.eventbus.v1alpha1.EventBusSpec": {
"description": "EventBusSpec refers to specification of eventbus resource",
"properties": {
"jetstream": {
"$ref": "#/definitions/io.argoproj.eventbus.v1alpha1.JetStreamBus"
},
"nats": {
"$ref": "#/definitions/io.argoproj.eventbus.v1alpha1.NATSBus",
"description": "NATS eventbus"
Expand All @@ -374,6 +380,109 @@
},
"type": "object"
},
"io.argoproj.eventbus.v1alpha1.JetStreamAuth": {
"properties": {
"token": {
"$ref": "#/definitions/io.k8s.api.core.v1.SecretKeySelector",
"description": "Secret for auth token"
}
},
"type": "object"
},
"io.argoproj.eventbus.v1alpha1.JetStreamBus": {
"description": "JetStreamBus holds the JetStream EventBus information",
"properties": {
"affinity": {
"$ref": "#/definitions/io.k8s.api.core.v1.Affinity",
"description": "The pod's scheduling constraints More info: https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/"
},
"containerTemplate": {
"$ref": "#/definitions/io.argoproj.eventbus.v1alpha1.ContainerTemplate",
"description": "ContainerTemplate contains customized spec for Nats JetStream container"
},
"imagePullSecrets": {
"description": "ImagePullSecrets is an optional list of references to secrets in the same namespace to use for pulling any of the images used by this PodSpec. If specified, these secrets will be passed to individual puller implementations for them to use. For example, in the case of docker, only DockerConfig type secrets are honored. More info: https://kubernetes.io/docs/concepts/containers/images#specifying-imagepullsecrets-on-a-pod",
"items": {
"$ref": "#/definitions/io.k8s.api.core.v1.LocalObjectReference"
},
"type": "array",
"x-kubernetes-patch-merge-key": "name",
"x-kubernetes-patch-strategy": "merge"
},
"metadata": {
"$ref": "#/definitions/io.argoproj.common.Metadata",
"description": "Metadata sets the pods's metadata, i.e. annotations and labels"
},
"metricsContainerTemplate": {
"$ref": "#/definitions/io.argoproj.eventbus.v1alpha1.ContainerTemplate",
"description": "MetricsContainerTemplate contains customized spec for metrics container"
},
"nodeSelector": {
"additionalProperties": {
"type": "string"
},
"description": "NodeSelector is a selector which must be true for the pod to fit on a node. Selector which must match a node's labels for the pod to be scheduled on that node. More info: https://kubernetes.io/docs/concepts/configuration/assign-pod-node/",
"type": "object"
},
"persistence": {
"$ref": "#/definitions/io.argoproj.eventbus.v1alpha1.PersistenceStrategy"
},
"priority": {
"description": "The priority value. Various system components use this field to find the priority of the Redis pod. When Priority Admission Controller is enabled, it prevents users from setting this field. The admission controller populates this field from PriorityClassName. The higher the value, the higher the priority. More info: https://kubernetes.io/docs/concepts/configuration/pod-priority-preemption/",
"format": "int32",
"type": "integer"
},
"priorityClassName": {
"description": "If specified, indicates the Redis pod's priority. \"system-node-critical\" and \"system-cluster-critical\" are two special keywords which indicate the highest priorities with the former being the highest priority. Any other name must be defined by creating a PriorityClass object with that name. If not specified, the pod priority will be default or zero if there is no default. More info: https://kubernetes.io/docs/concepts/configuration/pod-priority-preemption/",
"type": "string"
},
"reloaderContainerTemplate": {
"$ref": "#/definitions/io.argoproj.eventbus.v1alpha1.ContainerTemplate",
"description": "ReloaderContainerTemplate contains customized spec for config reloader container"
},
"replicas": {
"description": "Redis StatefulSet size",
"format": "int32",
"type": "integer"
},
"securityContext": {
"$ref": "#/definitions/io.k8s.api.core.v1.PodSecurityContext",
"description": "SecurityContext holds pod-level security attributes and common container settings. Optional: Defaults to empty. See type description for default values of each field."
},
"serviceAccountName": {
"description": "ServiceAccountName to apply to the StatefulSet",
"type": "string"
},
"settings": {
"description": "JetStream configuration, if not specified, global settings in controller-config will be used. See https://docs.nats.io/running-a-nats-service/configuration#jetstream. Only configure \"max_memory_store\" or \"max_file_store\", do not set \"store_dir\" as it has been hardcoded.",
"type": "string"
},
"tolerations": {
"description": "If specified, the pod's tolerations.",
"items": {
"$ref": "#/definitions/io.k8s.api.core.v1.Toleration"
},
"type": "array"
},
"version": {
"description": "JetStream version, such as \"2.7.3\"",
"type": "string"
}
},
"type": "object"
},
"io.argoproj.eventbus.v1alpha1.JetStreamConfig": {
"properties": {
"auth": {
"$ref": "#/definitions/io.argoproj.eventbus.v1alpha1.JetStreamAuth"
},
"url": {
"description": "JetStream (Nats) URL",
"type": "string"
}
},
"type": "object"
},
"io.argoproj.eventbus.v1alpha1.NATSBus": {
"description": "NATSBus holds the NATS eventbus information",
"properties": {
Expand Down
109 changes: 109 additions & 0 deletions api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,16 @@ const (
NATSStreamingRaftLeaseTimeout = "1s"
// Default NATS Streaming RAFT commit timeout
NATSStreamingRaftCommitTimeout = "100ms"

// Default EventBus name
DefaultEventBusName = "default"

// key of server auth secret
JetStreamServerAuthSecretKey = "auth"
// key of client auth secret
JetStreamClientAuthSecretKey = "client-auth"
// key of nats-js.conf in the configmap
JetStreamConfigMapKey = "nats-js"
)

// Sensor constants
Expand Down
21 changes: 21 additions & 0 deletions common/hash.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package common

import (
"crypto/sha256"
"encoding/hex"
)

func MustHash(v interface{}) string {
switch data := v.(type) {
case []byte:
hash := sha256.New()
if _, err := hash.Write(data); err != nil {
panic(err)
}
return hex.EncodeToString(hash.Sum(nil))
case string:
return MustHash([]byte(data))
default:
return MustHash([]byte(MustJSON(v)))
}
}
17 changes: 17 additions & 0 deletions common/hash_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package common

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestMustHash(t *testing.T) {
assert.Equal(t, "ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad", MustHash([]byte("abc")))
assert.Equal(t, "d4ffe8e9ee0b48eba716706123a7187f32eae3bdcb0e7763e41e533267bd8a53", MustHash("efg"))
assert.Equal(t, "a8e084ec42eff43acd61526bef35e33ddf7a8135d6aba3b140a5cae4c8c5e10b", MustHash(
struct {
A string
B string
}{A: "aAa", B: "bBb"}))
}
27 changes: 27 additions & 0 deletions common/json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package common

import "encoding/json"

func MustJSON(in interface{}) string {
if data, err := json.Marshal(in); err != nil {
panic(err)
} else {
return string(data)
}
}

// MustUnJSON unmarshalls JSON or panics.
// v - must be []byte or string
// in - must be a pointer.
func MustUnJSON(v interface{}, in interface{}) {
switch data := v.(type) {
case []byte:
if err := json.Unmarshal(data, in); err != nil {
panic(err)
}
case string:
MustUnJSON([]byte(data), in)
default:
panic("unknown type")
}
}
17 changes: 17 additions & 0 deletions common/json_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package common

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestMustJson(t *testing.T) {
assert.Equal(t, "1", MustJSON(1))
}

func TestUnJSON(t *testing.T) {
var in int
MustUnJSON("1", &in)
assert.Equal(t, 1, in)
}
17 changes: 17 additions & 0 deletions common/string.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package common

import (
"crypto/rand"
"math/big"
)

// generate a random string with given length
func RandomString(length int) string {
seeds := "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
result := make([]byte, length)
for i := 0; i < length; i++ {
num, _ := rand.Int(rand.Reader, big.NewInt(int64(len(seeds))))
result[i] = seeds[num.Int64()]
}
return string(result)
}
12 changes: 12 additions & 0 deletions common/string_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package common

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestRandomString(t *testing.T) {
str := RandomString(20)
assert.Equal(t, 20, len(str))
}
Loading

0 comments on commit 475baac

Please sign in to comment.