diff --git a/CONTRIBUTING.adoc b/CONTRIBUTING.adoc index a4e97ba28..c3b9a06b3 100644 --- a/CONTRIBUTING.adoc +++ b/CONTRIBUTING.adoc @@ -111,9 +111,10 @@ Accessing the provided "address" in your web browser should display the Jaeger U ==== Storage configuration -There's a template under the `test` directory that can be used to setup an Elasticsearch cluster. Alternatively, the following command can be executed to install it: +There are a set of templates under the `test` directory that can be used to setup an Elasticsearch and/or Cassandra cluster. Alternatively, the following commands can be executed to install it: [source,bash] ---- make es +make cassandra ---- diff --git a/Makefile b/Makefile index ef10d8a3e..fd72a90c1 100644 --- a/Makefile +++ b/Makefile @@ -53,7 +53,7 @@ unit-tests: @go test $(PACKAGES) -cover -coverprofile=cover.out .PHONY: e2e-tests -e2e-tests: es crd build docker push +e2e-tests: cassandra es crd build docker push @echo Running end-to-end tests... @cp deploy/rbac.yaml deploy/test/namespace-manifests.yaml @echo "---" >> deploy/test/namespace-manifests.yaml @@ -68,6 +68,10 @@ run: crd es: @kubectl create -f ./test/elasticsearch.yml 2>&1 | grep -v "already exists" || true +.PHONY: cassandra +cassandra: + @kubectl create -f ./test/cassandra.yml 2>&1 | grep -v "already exists" || true + .PHONY: crd crd: @kubectl create -f deploy/crd.yaml 2>&1 | grep -v "already exists" || true diff --git a/README.adoc b/README.adoc index b46b35fb8..ff165c105 100644 --- a/README.adoc +++ b/README.adoc @@ -242,6 +242,53 @@ spec: fieldPath: status.hostIP ---- +== Schema migration + +=== Cassandra + +When the storage type is set to Cassandra, the operator will automatically create a batch job that creates the required schema for Jaeger to run. This batch job will block the Jaeger installation, so that it starts only after the schema is successfuly created. The creation of this batch job can be disabled by setting the `enabled` property to `false`: + +[source,yaml] +---- +apiVersion: io.jaegertracing/v1alpha1 +kind: Jaeger +metadata: + name: cassandra-without-create-schema +spec: + strategy: all-in-one + storage: + type: cassandra + cassandra-create-schema: + enabled: false # <1> +---- +<1> Defaults to `true` + +Further aspects of the batch job can be configured as well. An example with all the possible options is shown below: + +[source,yaml] +---- +apiVersion: io.jaegertracing/v1alpha1 +kind: Jaeger +metadata: + name: cassandra-with-create-schema +spec: + strategy: all-in-one # <1> + storage: + type: cassandra + options: # <2> + cassandra: + servers: cassandra + keyspace: jaeger_v1_datacenter3 + cassandra-create-schema: # <3> + datacenter: "datacenter3" + mode: "test" +---- +<1> The same works for `production` +<2> These options are for the regular Jaeger components, like `collector` and `query` +<3> The options for the `create-schema` job + +NOTE: the default create-schema job uses `MODE=prod`, which implies a replication factor of `2`, using `NetworkTopologyStrategy` as the class, effectively meaning that at least 3 nodes are required in the Cassandra cluster. If a `SimpleStrategy` is desired, set the mode to `test`, which then sets the replication factor of `1`. Refer to the link:https://github.com/jaegertracing/jaeger/blob/v1.7.0/plugin/storage/cassandra/schema/create.sh[create-schema script] for more details. + == Removing an instance To remove an instance, just use the `delete` command with the file used for the instance creation: diff --git a/deploy/examples/with-cassandra.yaml b/deploy/examples/with-cassandra.yaml new file mode 100644 index 000000000..584c51c1b --- /dev/null +++ b/deploy/examples/with-cassandra.yaml @@ -0,0 +1,15 @@ +apiVersion: io.jaegertracing/v1alpha1 +kind: Jaeger +metadata: + name: with-cassandra +spec: + strategy: all-in-one + storage: + type: cassandra + options: + cassandra: + servers: cassandra + keyspace: jaeger_v1_datacenter3 + cassandra-create-schema: + datacenter: "datacenter3" + mode: "test" diff --git a/deploy/rbac.yaml b/deploy/rbac.yaml index 05f4f7fae..f647aee6c 100644 --- a/deploy/rbac.yaml +++ b/deploy/rbac.yaml @@ -36,7 +36,12 @@ rules: - ingresses verbs: - "*" - +- apiGroups: + - batch + resources: + - jobs + verbs: + - "*" --- kind: RoleBinding diff --git a/pkg/apis/io/v1alpha1/options.go b/pkg/apis/io/v1alpha1/options.go index 196deaaa2..8dd09f19a 100644 --- a/pkg/apis/io/v1alpha1/options.go +++ b/pkg/apis/io/v1alpha1/options.go @@ -68,3 +68,9 @@ func (o *Options) ToArgs() []string { return nil } + +// Map returns a map representing the option entries. Items are flattened, with dots as separators. For instance +// an option "cassandra" with a nested "servers" object becomes an entry with the key "cassandra.servers" +func (o *Options) Map() map[string]string { + return o.opts +} diff --git a/pkg/apis/io/v1alpha1/options_test.go b/pkg/apis/io/v1alpha1/options_test.go index 4883f4783..4ad33fe6e 100644 --- a/pkg/apis/io/v1alpha1/options_test.go +++ b/pkg/apis/io/v1alpha1/options_test.go @@ -52,3 +52,9 @@ func TestMultipleSubValues(t *testing.T) { args := o.ToArgs() assert.Len(t, args, 3) } + +func TestExposedMap(t *testing.T) { + o := NewOptions(nil) + o.UnmarshalJSON([]byte(`{"cassandra": {"servers": "cassandra:9042"}}`)) + assert.Equal(t, "cassandra:9042", o.Map()["cassandra.servers"]) +} diff --git a/pkg/apis/io/v1alpha1/types.go b/pkg/apis/io/v1alpha1/types.go index 81bf8d683..24a65f1b8 100644 --- a/pkg/apis/io/v1alpha1/types.go +++ b/pkg/apis/io/v1alpha1/types.go @@ -74,6 +74,15 @@ type JaegerAgentSpec struct { // JaegerStorageSpec defines the common storage options to be used for the query and collector type JaegerStorageSpec struct { - Type string `json:"type"` // can be `memory` (default), `cassandra`, `elasticsearch`, `kafka` or `managed` - Options Options `json:"options"` + Type string `json:"type"` // can be `memory` (default), `cassandra`, `elasticsearch`, `kafka` or `managed` + Options Options `json:"options"` + CassandraCreateSchema JaegerCassandraCreateSchemaSpec `json:"cassandra-create-schema"` +} + +// JaegerCassandraCreateSchemaSpec holds the options related to the create-schema batch job +type JaegerCassandraCreateSchemaSpec struct { + Enabled *bool `json:"enabled"` + Image string `json:"image"` + Datacenter string `json:"datacenter"` + Mode string `json:"mode"` } diff --git a/pkg/apis/io/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/io/v1alpha1/zz_generated.deepcopy.go index bead568c4..34d2456b5 100644 --- a/pkg/apis/io/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/io/v1alpha1/zz_generated.deepcopy.go @@ -71,6 +71,22 @@ func (in *JaegerAllInOneSpec) DeepCopy() *JaegerAllInOneSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *JaegerCassandraCreateSchemaSpec) DeepCopyInto(out *JaegerCassandraCreateSchemaSpec) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new JaegerCassandraCreateSchemaSpec. +func (in *JaegerCassandraCreateSchemaSpec) DeepCopy() *JaegerCassandraCreateSchemaSpec { + if in == nil { + return nil + } + out := new(JaegerCassandraCreateSchemaSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *JaegerCollectorSpec) DeepCopyInto(out *JaegerCollectorSpec) { *out = *in @@ -201,6 +217,7 @@ func (in *JaegerStatus) DeepCopy() *JaegerStatus { func (in *JaegerStorageSpec) DeepCopyInto(out *JaegerStorageSpec) { *out = *in in.Options.DeepCopyInto(&out.Options) + out.CassandraCreateSchema = in.CassandraCreateSchema return } diff --git a/pkg/cmd/start/main.go b/pkg/cmd/start/main.go index 7684da59c..58bcc47b0 100644 --- a/pkg/cmd/start/main.go +++ b/pkg/cmd/start/main.go @@ -44,6 +44,9 @@ func NewStartCommand() *cobra.Command { cmd.Flags().StringP("jaeger-all-in-one-image", "", "jaegertracing/all-in-one", "The Docker image for the Jaeger all-in-one") viper.BindPFlag("jaeger-all-in-one-image", cmd.Flags().Lookup("jaeger-all-in-one-image")) + cmd.Flags().StringP("jaeger-cassandra-schema-image", "", "jaegertracing/jaeger-cassandra-schema", "The Docker image for the Jaeger Cassandra Schema") + viper.BindPFlag("jaeger-cassandra-schema-image", cmd.Flags().Lookup("jaeger-cassandra-schema-image")) + return cmd } diff --git a/pkg/controller/all-in-one.go b/pkg/controller/all-in-one.go index 97682c442..ffcab13fb 100644 --- a/pkg/controller/all-in-one.go +++ b/pkg/controller/all-in-one.go @@ -5,9 +5,11 @@ import ( "github.com/operator-framework/operator-sdk/pkg/sdk" "github.com/sirupsen/logrus" + batchv1 "k8s.io/api/batch/v1" "github.com/jaegertracing/jaeger-operator/pkg/apis/io/v1alpha1" "github.com/jaegertracing/jaeger-operator/pkg/deployment" + "github.com/jaegertracing/jaeger-operator/pkg/storage" ) type allInOneController struct { @@ -22,7 +24,7 @@ func newAllInOneController(ctx context.Context, jaeger *v1alpha1.Jaeger) *allInO } } -func (c allInOneController) Create() []sdk.Object { +func (c *allInOneController) Create() []sdk.Object { logrus.Debugf("Creating all-in-one for '%v'", c.jaeger.Name) dep := deployment.NewAllInOne(c.jaeger) @@ -43,7 +45,11 @@ func (c allInOneController) Create() []sdk.Object { return os } -func (c allInOneController) Update() []sdk.Object { +func (c *allInOneController) Update() []sdk.Object { logrus.Debug("Update isn't available for all-in-one") return []sdk.Object{} } + +func (c *allInOneController) Dependencies() []batchv1.Job { + return storage.Dependencies(c.jaeger) +} diff --git a/pkg/controller/all-in-one_test.go b/pkg/controller/all-in-one_test.go index 6d53e1eac..799c97ec3 100644 --- a/pkg/controller/all-in-one_test.go +++ b/pkg/controller/all-in-one_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/jaegertracing/jaeger-operator/pkg/apis/io/v1alpha1" + "github.com/jaegertracing/jaeger-operator/pkg/storage" ) func init() { @@ -41,6 +42,12 @@ func TestUpdateAllInOneDeployment(t *testing.T) { assert.Len(t, objs, 0) } +func TestDelegateAllInOneDepedencies(t *testing.T) { + // for now, we just have storage dependencies + c := newAllInOneController(context.TODO(), v1alpha1.NewJaeger("TestDelegateAllInOneDepedencies")) + assert.Equal(t, c.Dependencies(), storage.Dependencies(c.jaeger)) +} + func assertDeploymentsAndServicesForAllInOne(t *testing.T, name string, objs []sdk.Object, hasDaemonSet bool) { if hasDaemonSet { assert.Len(t, objs, 7) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 20c5cfe8d..d7e6ad37f 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -6,12 +6,14 @@ import ( "github.com/operator-framework/operator-sdk/pkg/sdk" "github.com/sirupsen/logrus" + batchv1 "k8s.io/api/batch/v1" "github.com/jaegertracing/jaeger-operator/pkg/apis/io/v1alpha1" ) // Controller knows what type of deployments to build based on a given spec type Controller interface { + Dependencies() []batchv1.Job Create() []sdk.Object Update() []sdk.Object } diff --git a/pkg/controller/production.go b/pkg/controller/production.go index 566872c5b..871665cef 100644 --- a/pkg/controller/production.go +++ b/pkg/controller/production.go @@ -5,9 +5,11 @@ import ( "github.com/operator-framework/operator-sdk/pkg/sdk" "github.com/sirupsen/logrus" + batchv1 "k8s.io/api/batch/v1" "github.com/jaegertracing/jaeger-operator/pkg/apis/io/v1alpha1" "github.com/jaegertracing/jaeger-operator/pkg/deployment" + "github.com/jaegertracing/jaeger-operator/pkg/storage" ) type productionController struct { @@ -56,3 +58,7 @@ func (c *productionController) Update() []sdk.Object { logrus.Debug("Update isn't yet available") return []sdk.Object{} } + +func (c *productionController) Dependencies() []batchv1.Job { + return storage.Dependencies(c.jaeger) +} diff --git a/pkg/controller/production_test.go b/pkg/controller/production_test.go index a3054a14f..83d6ccf73 100644 --- a/pkg/controller/production_test.go +++ b/pkg/controller/production_test.go @@ -11,6 +11,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/jaegertracing/jaeger-operator/pkg/apis/io/v1alpha1" + "github.com/jaegertracing/jaeger-operator/pkg/storage" ) func init() { @@ -77,6 +78,12 @@ func TestOptionsArePassed(t *testing.T) { } } +func TestDelegateProductionDepedencies(t *testing.T) { + // for now, we just have storage dependencies + c := newProductionController(context.TODO(), v1alpha1.NewJaeger("TestDelegateProductionDepedencies")) + assert.Equal(t, c.Dependencies(), storage.Dependencies(c.jaeger)) +} + func assertDeploymentsAndServicesForProduction(t *testing.T, name string, objs []sdk.Object, hasDaemonSet bool) { if hasDaemonSet { assert.Len(t, objs, 7) diff --git a/pkg/storage/cassandra_dependencies.go b/pkg/storage/cassandra_dependencies.go new file mode 100644 index 000000000..631cb342d --- /dev/null +++ b/pkg/storage/cassandra_dependencies.go @@ -0,0 +1,108 @@ +package storage + +import ( + "fmt" + + "github.com/sirupsen/logrus" + "github.com/spf13/viper" + batchv1 "k8s.io/api/batch/v1" + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/jaegertracing/jaeger-operator/pkg/apis/io/v1alpha1" +) + +func cassandraDeps(jaeger *v1alpha1.Jaeger) []batchv1.Job { + trueVar := true + + if jaeger.Spec.Storage.CassandraCreateSchema.Enabled == nil { + jaeger.Spec.Storage.CassandraCreateSchema.Enabled = &trueVar + } + + // if the create-schema job is disabled, return an empty list + if !*jaeger.Spec.Storage.CassandraCreateSchema.Enabled { + return []batchv1.Job{} + } + + if jaeger.Spec.Storage.CassandraCreateSchema.Datacenter == "" { + // the default in the create-schema is "dc1", but the default in Jaeger is "test"! We align with Jaeger here + logrus.WithField("instance", jaeger.Name).Info("Datacenter not specified. Using 'test' for the cassandra-create-schema job.") + jaeger.Spec.Storage.CassandraCreateSchema.Datacenter = "test" + } + + if jaeger.Spec.Storage.CassandraCreateSchema.Mode == "" { + logrus.WithField("instance", jaeger.Name).Info("Mode not specified. Using 'prod' for the cassandra-create-schema job.") + jaeger.Spec.Storage.CassandraCreateSchema.Mode = "prod" + } + + if jaeger.Spec.Storage.CassandraCreateSchema.Image == "" { + jaeger.Spec.Storage.CassandraCreateSchema.Image = fmt.Sprintf("%s:%s", viper.GetString("jaeger-cassandra-schema-image"), viper.GetString("jaeger-version")) + } + + host := jaeger.Spec.Storage.Options.Map()["cassandra.servers"] + if host == "" { + logrus.WithField("instance", jaeger.Name).Info("Cassandra hostname not specified. Using 'cassandra' for the cassandra-create-schema job.") + host = "cassandra" // this is the default in the image + } + + annotations := map[string]string{ + "prometheus.io/scrape": "false", + "sidecar.istio.io/inject": "false", + } + + // TODO: should this be configurable? Would we ever think that 2 minutes is OK for this job to complete? + deadline := int64(120) + + return []batchv1.Job{ + batchv1.Job{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "batch/v1", + Kind: "Job", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-cassandra-schema-job", jaeger.Name), + Namespace: jaeger.Namespace, + OwnerReferences: []metav1.OwnerReference{ + metav1.OwnerReference{ + APIVersion: jaeger.APIVersion, + Kind: jaeger.Kind, + Name: jaeger.Name, + UID: jaeger.UID, + Controller: &trueVar, + }, + }, + }, + Spec: batchv1.JobSpec{ + ActiveDeadlineSeconds: &deadline, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: annotations, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + v1.Container{ + Image: jaeger.Spec.Storage.CassandraCreateSchema.Image, + Name: fmt.Sprintf("%s-cassandra-schema", jaeger.Name), + Env: []v1.EnvVar{ + v1.EnvVar{ + Name: "CQLSH_HOST", + Value: host, + }, + v1.EnvVar{ + Name: "MODE", + Value: jaeger.Spec.Storage.CassandraCreateSchema.Mode, + }, + v1.EnvVar{ + Name: "DATACENTER", + Value: jaeger.Spec.Storage.CassandraCreateSchema.Datacenter, + }, + }, + }, + }, + RestartPolicy: v1.RestartPolicyOnFailure, + }, + }, + }, + }, + } +} diff --git a/pkg/storage/cassandra_dependencies_test.go b/pkg/storage/cassandra_dependencies_test.go new file mode 100644 index 000000000..c7bd1ba22 --- /dev/null +++ b/pkg/storage/cassandra_dependencies_test.go @@ -0,0 +1,34 @@ +package storage + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/jaegertracing/jaeger-operator/pkg/apis/io/v1alpha1" +) + +func TestCassandraCreateSchemaDisabled(t *testing.T) { + falseVar := false + + jaeger := v1alpha1.NewJaeger("TestCassandraCreateSchemaDisabled") + jaeger.Spec.Storage.CassandraCreateSchema.Enabled = &falseVar + + assert.Len(t, cassandraDeps(jaeger), 0) +} + +func TestCassandraCreateSchemaEnabled(t *testing.T) { + trueVar := true + + jaeger := v1alpha1.NewJaeger("TestCassandraCreateSchemaEnabled") + jaeger.Spec.Storage.CassandraCreateSchema.Enabled = &trueVar + + assert.Len(t, cassandraDeps(jaeger), 1) +} + +func TestCassandraCreateSchemaEnabledNil(t *testing.T) { + jaeger := v1alpha1.NewJaeger("TestCassandraCreateSchemaEnabledNil") + + assert.Nil(t, jaeger.Spec.Storage.CassandraCreateSchema.Enabled) + assert.Len(t, cassandraDeps(jaeger), 1) +} diff --git a/pkg/storage/dependency.go b/pkg/storage/dependency.go new file mode 100644 index 000000000..81e3b0cb5 --- /dev/null +++ b/pkg/storage/dependency.go @@ -0,0 +1,18 @@ +package storage + +import ( + "strings" + + batchv1 "k8s.io/api/batch/v1" + + "github.com/jaegertracing/jaeger-operator/pkg/apis/io/v1alpha1" +) + +// Dependencies return a list of Jobs that have to be finished before the other components are deployed +func Dependencies(jaeger *v1alpha1.Jaeger) []batchv1.Job { + if strings.ToLower(jaeger.Spec.Storage.Type) == "cassandra" { + return cassandraDeps(jaeger) + } + + return []batchv1.Job{} +} diff --git a/pkg/storage/dependency_test.go b/pkg/storage/dependency_test.go new file mode 100644 index 000000000..94d28c18e --- /dev/null +++ b/pkg/storage/dependency_test.go @@ -0,0 +1,19 @@ +package storage + +import ( + "testing" + + "github.com/jaegertracing/jaeger-operator/pkg/apis/io/v1alpha1" + "github.com/stretchr/testify/assert" +) + +func TestDefaultDependencies(t *testing.T) { + jaeger := v1alpha1.NewJaeger("TestCassandraDependencies") + assert.Len(t, Dependencies(jaeger), 0) +} + +func TestCassandraDependencies(t *testing.T) { + jaeger := v1alpha1.NewJaeger("TestCassandraDependencies") + jaeger.Spec.Storage.Type = "CASSANDRA" // should be converted to lowercase + assert.Len(t, Dependencies(jaeger), 1) +} diff --git a/pkg/stub/handler.go b/pkg/stub/handler.go index d88d8d025..e0e792352 100644 --- a/pkg/stub/handler.go +++ b/pkg/stub/handler.go @@ -3,12 +3,15 @@ package stub import ( "context" "fmt" + "time" "github.com/operator-framework/operator-sdk/pkg/sdk" "github.com/sirupsen/logrus" appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" "github.com/jaegertracing/jaeger-operator/pkg/apis/io/v1alpha1" "github.com/jaegertracing/jaeger-operator/pkg/controller" @@ -35,31 +38,22 @@ func (h *Handler) Handle(ctx context.Context, event sdk.Event) error { ctrl := controller.NewController(ctx, o) - objs := ctrl.Create() - created := false - for _, obj := range objs { - err := sdk.Create(obj) - if err != nil && !apierrors.IsAlreadyExists(err) { - logrus.Errorf("failed to create %v", obj) - return err - } + // wait for all the dependencies to succeed + if err := handleDependencies(ctrl); err != nil { + return err + } - if err == nil { - created = true - } + created, err := handleCreate(ctrl) + if err != nil { + return err } if created { - logrus.Infof("Configured %v", o.Name) + logrus.WithField("name", o.Name).Info("Configured Jaeger instance") } - objs = ctrl.Update() - for _, obj := range objs { - logrus.Debugf("Updating %v", obj) - if err := sdk.Update(obj); err != nil { - logrus.Errorf("failed to update %v", obj) - return err - } + if err := handleUpdate(ctrl); err != nil { + return err } // we store back the changed CR, so that what is stored reflects what is being used @@ -98,3 +92,75 @@ func (h *Handler) Handle(ctx context.Context, event sdk.Event) error { } return nil } + +func handleCreate(ctrl controller.Controller) (bool, error) { + objs := ctrl.Create() + created := false + for _, obj := range objs { + err := sdk.Create(obj) + if err != nil && !apierrors.IsAlreadyExists(err) { + logrus.Errorf("failed to create %v", obj) + return false, err + } + + if err == nil { + created = true + } + } + + return created, nil +} + +func handleUpdate(ctrl controller.Controller) error { + objs := ctrl.Update() + for _, obj := range objs { + logrus.Debugf("Updating %v", obj) + if err := sdk.Update(obj); err != nil { + logrus.Errorf("failed to update %v", obj) + return err + } + } + + return nil +} + +func handleDependencies(ctrl controller.Controller) error { + for _, dep := range ctrl.Dependencies() { + err := sdk.Create(&dep) + if err != nil && !apierrors.IsAlreadyExists(err) { + logrus.Errorf("failed to create %v", dep.Name) + return err + } + + batch := batchv1.Job{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "batch/v1", + Kind: "Job", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: dep.Name, + Namespace: dep.Namespace, + }, + } + + // we probably want to add a couple of seconds to this deadline, but for now, this should be sufficient + deadline := time.Duration(*dep.Spec.ActiveDeadlineSeconds) + return wait.Poll(time.Second, deadline*time.Second, func() (done bool, err error) { + err = sdk.Get(&batch) + if err != nil { + logrus.Errorf("failed to get the status of the dependency %v", dep.Name) + return false, err + } + + // for now, we just assume each batch job has one pod + if batch.Status.Succeeded != 1 { + logrus.WithField("dependency", dep.Name).Info("Waiting for dependency to complete") + return false, nil + } + + return true, nil + }) + } + + return nil +} diff --git a/test/cassandra.yml b/test/cassandra.yml new file mode 100644 index 000000000..6b83502a6 --- /dev/null +++ b/test/cassandra.yml @@ -0,0 +1,93 @@ +apiVersion: v1 +kind: List +items: +- apiVersion: v1 + kind: Service + metadata: + name: cassandra + labels: + app: jaeger + name: jaeger-cassandra-service + jaeger-infra: cassandra-service + spec: + ports: + - port: 7000 + name: intra-node + - port: 7001 + name: tls-intra-node + - port: 7199 + name: jmx + - port: 9042 + name: cql + - port: 9160 + name: thrift + clusterIP: None + selector: + app: cassandra +- apiVersion: "apps/v1beta1" + kind: StatefulSet + metadata: + name: cassandra + labels: + app: jaeger + jaeger-infra: cassandra-statefulset + spec: + serviceName: cassandra + replicas: 3 + template: + metadata: + labels: + app: cassandra + jaeger-infra: cassandra-replica + spec: + terminationGracePeriodSeconds: 1800 + containers: + - name: cassandra + image: cassandra:3.11 + command: + - /docker-entrypoint.sh + - "-R" + ports: + - containerPort: 7000 + name: intra-node + - containerPort: 7001 + name: tls-intra-node + - containerPort: 7199 + name: jmx + - containerPort: 9042 + name: cql + - containerPort: 9160 + name: thrift + lifecycle: + preStop: + exec: + command: ["/bin/sh", "-c", "nodetool drain"] + env: + - name: MAX_HEAP_SIZE + value: 512M + - name: HEAP_NEWSIZE + value: 100M + - name: CASSANDRA_LISTEN_ADDRESS + valueFrom: + fieldRef: + fieldPath: status.podIP + - name: CASSANDRA_CLUSTER_NAME + value: "jaeger" + - name: CASSANDRA_DC + value: "datacenter1" + - name: CASSANDRA_RACK + value: "rack1" + - name: CASSANDRA_ENDPOINT_SNITCH + value: "GossipingPropertyFileSnitch" + - name: CASSANDRA_SEEDS + value: cassandra-0.cassandra + volumeMounts: + - name: cassandra-data + mountPath: /var/lib/cassandra + - name: cassandra-logs + mountPath: /var/log/cassandra + volumes: + - name: cassandra-data + emptyDir: {} + - name: cassandra-logs + emptyDir: {} diff --git a/test/e2e/cassandra.go b/test/e2e/cassandra.go new file mode 100644 index 000000000..d740d87a7 --- /dev/null +++ b/test/e2e/cassandra.go @@ -0,0 +1,63 @@ +package e2e + +import ( + goctx "context" + "fmt" + "testing" + + "github.com/jaegertracing/jaeger-operator/pkg/apis/io/v1alpha1" + framework "github.com/operator-framework/operator-sdk/pkg/test" + "github.com/operator-framework/operator-sdk/pkg/test/e2eutil" + "github.com/sirupsen/logrus" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// Cassandra runs a test with Cassandra as the backing storage +func Cassandra(t *testing.T) { + ctx := prepare(t) + defer ctx.Cleanup() + + if err := cassandraTest(t, framework.Global, ctx); err != nil { + t.Fatal(err) + } +} + +func cassandraTest(t *testing.T, f *framework.Framework, ctx *framework.TestCtx) error { + cleanupOptions := &framework.CleanupOptions{TestContext: ctx, Timeout: timeout, RetryInterval: retryInterval} + namespace, err := ctx.GetNamespace() + if err != nil { + return fmt.Errorf("could not get namespace: %v", err) + } + + j := &v1alpha1.Jaeger{ + TypeMeta: metav1.TypeMeta{ + Kind: "Jaeger", + APIVersion: "io.jaegertracing/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "with-cassandra", + Namespace: namespace, + }, + Spec: v1alpha1.JaegerSpec{ + Strategy: "all-in-one", + AllInOne: v1alpha1.JaegerAllInOneSpec{}, + Storage: v1alpha1.JaegerStorageSpec{ + Type: "cassandra", + Options: v1alpha1.NewOptions(map[string]interface{}{"cassandra.servers": "cassandra.default.svc"}), + }, + }, + } + + logrus.Infof("passing %v", j) + err = f.Client.Create(goctx.TODO(), j, cleanupOptions) + if err != nil { + return err + } + + err = WaitForJob(t, f.KubeClient, namespace, "with-cassandra-cassandra-schema-job", retryInterval, timeout) + if err != nil { + return err + } + + return e2eutil.WaitForDeployment(t, f.KubeClient, namespace, "with-cassandra", 1, retryInterval, timeout) +} diff --git a/test/e2e/jaeger_test.go b/test/e2e/jaeger_test.go index 74c7f4e14..401ee1858 100644 --- a/test/e2e/jaeger_test.go +++ b/test/e2e/jaeger_test.go @@ -37,6 +37,7 @@ func TestJaeger(t *testing.T) { t.Run("daemonset", DaemonSet) t.Run("sidecar", Sidecar) + t.Run("cassandra", Cassandra) }) } diff --git a/test/e2e/wait_util.go b/test/e2e/wait_util.go index a638de734..39d286c8a 100644 --- a/test/e2e/wait_util.go +++ b/test/e2e/wait_util.go @@ -87,3 +87,29 @@ func WaitForIngress(t *testing.T, kubeclient kubernetes.Interface, namespace, na t.Logf("Ingress available\n") return nil } + +// WaitForJob checks to see if a given job has completed successfuly +// See #WaitForDeployment for the full semantics +func WaitForJob(t *testing.T, kubeclient kubernetes.Interface, namespace, name string, retryInterval, timeout time.Duration) error { + err := wait.Poll(retryInterval, timeout, func() (done bool, err error) { + job, err := kubeclient.BatchV1().Jobs(namespace).Get(name, metav1.GetOptions{IncludeUninitialized: true}) + if err != nil { + if apierrors.IsNotFound(err) { + t.Logf("Waiting for availability of %s job\n", name) + return false, nil + } + return false, err + } + + if job.Status.Succeeded > 0 && job.Status.Failed == 0 && job.Status.Active == 0 { + return true, nil + } + t.Logf("Waiting for job %s to succeed. Succeeded: %d, failed: %d, active: %d\n", name, job.Status.Succeeded, job.Status.Failed, job.Status.Active) + return false, nil + }) + if err != nil { + return err + } + t.Logf("Jobs succeeded\n") + return nil +}