diff --git a/plugins/source-kafka/README.md b/plugins/source-kafka/README.md index 3958e8c2..1b461878 100644 --- a/plugins/source-kafka/README.md +++ b/plugins/source-kafka/README.md @@ -68,3 +68,22 @@ Flags: -s, --sink string Addressable sink for events --topics string Topics to consume messages from ``` + +#### `kn source_kafka delete` + +``` +$ kn source_kafka delete --help +delete a Kafka source + +Usage: + kafka delete NAME [flags] + +Examples: +#Deletes a Kafka source with name 'mykafkasrc' +kn source_kafka delete mykafkasrc + +Flags: + -A, --all-namespaces If present, list the requested object(s) across all namespaces. Namespace in current context is ignored even if specified with --namespace. + -h, --help help for delete + -n, --namespace string Specify the namespace to operate in. +``` diff --git a/plugins/source-kafka/pkg/client/client.go b/plugins/source-kafka/pkg/client/client.go index 5da6707d..4401adf5 100644 --- a/plugins/source-kafka/pkg/client/client.go +++ b/plugins/source-kafka/pkg/client/client.go @@ -42,15 +42,15 @@ func NewKafkaSourceClient(kafkaParams *types.KafkaSourceParams, c clientv1alpha1 } } -func (client *kafkaSourceClient) KnSourceParams() *sourcetypes.KnSourceParams { - return client.kafkaSourceParams.KnSourceParams +func (c *kafkaSourceClient) KnSourceParams() *sourcetypes.KnSourceParams { + return c.kafkaSourceParams.KnSourceParams } -func (client *kafkaSourceClient) KafkaSourceParams() *types.KafkaSourceParams { - return client.kafkaSourceParams +func (c *kafkaSourceClient) KafkaSourceParams() *types.KafkaSourceParams { + return c.kafkaSourceParams } -//CreateKafkaSource is used to create an instance of ApiServerSource +//CreateKafkaSource is used to create an instance of KafkaSource func (c *kafkaSourceClient) CreateKafkaSource(kafkaSource *v1alpha1.KafkaSource) error { _, err := c.client.KafkaSources(c.namespace).Create(kafkaSource) if err != nil { @@ -60,6 +60,16 @@ func (c *kafkaSourceClient) CreateKafkaSource(kafkaSource *v1alpha1.KafkaSource) return nil } +//DeleteKafkaSource is used to create an instance of KafkaSource +func (c *kafkaSourceClient) DeleteKafkaSource(name string) error { + err := c.client.KafkaSources(c.namespace).Delete(name, &metav1.DeleteOptions{}) + if err != nil { + return knerrors.GetError(err) + } + + return nil +} + // Return the client's namespace func (c *kafkaSourceClient) Namespace() string { return c.namespace diff --git a/plugins/source-kafka/pkg/client/client_test.go b/plugins/source-kafka/pkg/client/client_test.go index 188bdcf9..8ff82874 100644 --- a/plugins/source-kafka/pkg/client/client_test.go +++ b/plugins/source-kafka/pkg/client/client_test.go @@ -50,6 +50,16 @@ func TestCreateKafka(t *testing.T) { assert.NilError(t, err) } +func TestDeleteKafka(t *testing.T) { + fakeE := fake.FakeSourcesV1alpha1{Fake: &client_testing.Fake{}} + cli := NewKafkaSourceClient(&types.KafkaSourceParams{}, &fakeE, "fake-namespace") + objNew := newKafkaSource("samplekafka") + err := cli.CreateKafkaSource(objNew) + assert.NilError(t, err) + err = cli.DeleteKafkaSource("samplekafka") + assert.NilError(t, err) +} + func newKafkaSource(name string) *v1alpha1.KafkaSource { return NewKafkaSourceBuilder(name). BootstrapServers("test.server.org"). diff --git a/plugins/source-kafka/pkg/factories/command_factory.go b/plugins/source-kafka/pkg/factories/command_factory.go index d9aa60f2..55336e1f 100644 --- a/plugins/source-kafka/pkg/factories/command_factory.go +++ b/plugins/source-kafka/pkg/factories/command_factory.go @@ -63,7 +63,7 @@ func (f *kafkaSourceCommandFactory) CreateCommand() *cobra.Command { createCmd := f.defaultCommandFactory.CreateCommand() createCmd.Short = "create NAME" createCmd.Example = `#Creates a new Kafka source named as 'mykafkasrc' which subscribes a Kafka server 'my-cluster-kafka-bootstrap.kafka.svc:9092' at topic 'test-topic' using the consumer group ID 'test-consumer-group' and sends the event messages to service 'event-display' -kn source_kafka create kafka-name --servers my-cluster-kafka-bootstrap.kafka.svc:9092 --topics test-topic --consumergroup test-consumer-group --sink svc:event-display` +kn source_kafka create mykafkasrc --servers my-cluster-kafka-bootstrap.kafka.svc:9092 --topics test-topic --consumergroup test-consumer-group --sink svc:event-display` return createCmd } @@ -71,8 +71,8 @@ func (f *kafkaSourceCommandFactory) DeleteCommand() *cobra.Command { deleteCmd := f.defaultCommandFactory.DeleteCommand() deleteCmd.Short = "delete NAME" deleteCmd.Long = "delete a Kafka source" - deleteCmd.Example = `#Deletes a Kafka source with NAME -kn source_kafka delete kafka-name` + deleteCmd.Example = `#Deletes a Kafka source with name 'mykafkasrc' +kn source_kafka delete mykafkasrc` return deleteCmd } diff --git a/plugins/source-kafka/pkg/factories/rune_factory.go b/plugins/source-kafka/pkg/factories/rune_factory.go index 4d0b344f..f06df2ff 100644 --- a/plugins/source-kafka/pkg/factories/rune_factory.go +++ b/plugins/source-kafka/pkg/factories/rune_factory.go @@ -68,7 +68,6 @@ func (f *kafkaSourceRunEFactory) CreateRunE() sourcetypes.RunE { return err } f.kafkaSourceClient = f.KafkaSourceClient(namespace) - fmt.Printf("%s RunE function called for Kafka source: args: %#v, client: %#v\n", cmd.Name(), args, f.kafkaSourceClient) if len(args) != 1 { return errors.New("requires the name of the source to create as single argument") @@ -110,8 +109,31 @@ func (f *kafkaSourceRunEFactory) CreateRunE() sourcetypes.RunE { func (f *kafkaSourceRunEFactory) DeleteRunE() sourcetypes.RunE { return func(cmd *cobra.Command, args []string) error { - fmt.Printf("%s RunE function called for Kafka source: args: %#v, client: %#v\n", cmd.Name(), args, f.kafkaSourceClient) - return nil + var err error + namespace, err := f.KnSourceParams().GetNamespace(cmd) + if err != nil { + return err + } + f.kafkaSourceClient = f.KafkaSourceClient(namespace) + + if len(args) != 1 { + return errors.New("requires the name of the source to create as single argument") + } + name := args[0] + + err = f.kafkaSourceClient.DeleteKafkaSource(name) + + if err != nil { + return fmt.Errorf( + "cannot delete KafkaSource '%s' in namespace '%s' "+ + "because: %s", name, f.kafkaSourceClient.Namespace(), err) + } + + if err == nil { + fmt.Fprintf(cmd.OutOrStdout(), "Kafka source '%s' deleted in namespace '%s'.\n", args[0], f.kafkaSourceClient.Namespace()) + } + + return err } } diff --git a/plugins/source-kafka/pkg/factories/rune_factory_test.go b/plugins/source-kafka/pkg/factories/rune_factory_test.go index a042b1bf..8b935aa6 100644 --- a/plugins/source-kafka/pkg/factories/rune_factory_test.go +++ b/plugins/source-kafka/pkg/factories/rune_factory_test.go @@ -53,6 +53,12 @@ func TestCreateRunE(t *testing.T) { assert.Assert(t, function != nil) } +func TestDeleteRunE(t *testing.T) { + runEFactory := createKafkaSourceRunEFactory() + function := runEFactory.DeleteRunE() + assert.Assert(t, function != nil) +} + // Private func createKafkaSourceRunEFactory() types.KafkaSourceRunEFactory { diff --git a/plugins/source-kafka/pkg/types/interfaces.go b/plugins/source-kafka/pkg/types/interfaces.go index b921ade1..b2396e59 100644 --- a/plugins/source-kafka/pkg/types/interfaces.go +++ b/plugins/source-kafka/pkg/types/interfaces.go @@ -23,6 +23,7 @@ import ( type KafkaSourceClient interface { sourcetypes.KnSourceClient CreateKafkaSource(kafkaSource *v1alpha1.KafkaSource) error + DeleteKafkaSource(name string) error } type KafkaSourceFactory interface {