Skip to content
This repository has been archived by the owner on Mar 9, 2021. It is now read-only.

Commit

Permalink
add feature to delete a kafka source
Browse files Browse the repository at this point in the history
  • Loading branch information
Daisy Guo committed Apr 20, 2020
1 parent b43f67f commit 5eb4b46
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 11 deletions.
19 changes: 19 additions & 0 deletions plugins/source-kafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,22 @@ Flags:
-s, --sink string Addressable sink for events
--topics string Topics to consume messages from
```

#### `kn source_kafka delete`

```
$ kn source_kafka delete --help
delete a Kafka source
Usage:
kafka delete NAME [flags]
Examples:
#Deletes a Kafka source with name 'mykafkasrc'
kn source_kafka delete mykafkasrc
Flags:
-A, --all-namespaces If present, list the requested object(s) across all namespaces. Namespace in current context is ignored even if specified with --namespace.
-h, --help help for delete
-n, --namespace string Specify the namespace to operate in.
```
20 changes: 15 additions & 5 deletions plugins/source-kafka/pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ func NewKafkaSourceClient(kafkaParams *types.KafkaSourceParams, c clientv1alpha1
}
}

func (client *kafkaSourceClient) KnSourceParams() *sourcetypes.KnSourceParams {
return client.kafkaSourceParams.KnSourceParams
func (c *kafkaSourceClient) KnSourceParams() *sourcetypes.KnSourceParams {
return c.kafkaSourceParams.KnSourceParams
}

func (client *kafkaSourceClient) KafkaSourceParams() *types.KafkaSourceParams {
return client.kafkaSourceParams
func (c *kafkaSourceClient) KafkaSourceParams() *types.KafkaSourceParams {
return c.kafkaSourceParams
}

//CreateKafkaSource is used to create an instance of ApiServerSource
//CreateKafkaSource is used to create an instance of KafkaSource
func (c *kafkaSourceClient) CreateKafkaSource(kafkaSource *v1alpha1.KafkaSource) error {
_, err := c.client.KafkaSources(c.namespace).Create(kafkaSource)
if err != nil {
Expand All @@ -60,6 +60,16 @@ func (c *kafkaSourceClient) CreateKafkaSource(kafkaSource *v1alpha1.KafkaSource)
return nil
}

//DeleteKafkaSource is used to create an instance of KafkaSource
func (c *kafkaSourceClient) DeleteKafkaSource(name string) error {
err := c.client.KafkaSources(c.namespace).Delete(name, &metav1.DeleteOptions{})
if err != nil {
return knerrors.GetError(err)
}

return nil
}

// Return the client's namespace
func (c *kafkaSourceClient) Namespace() string {
return c.namespace
Expand Down
10 changes: 10 additions & 0 deletions plugins/source-kafka/pkg/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ func TestCreateKafka(t *testing.T) {
assert.NilError(t, err)
}

func TestDeleteKafka(t *testing.T) {
fakeE := fake.FakeSourcesV1alpha1{Fake: &client_testing.Fake{}}
cli := NewKafkaSourceClient(&types.KafkaSourceParams{}, &fakeE, "fake-namespace")
objNew := newKafkaSource("samplekafka")
err := cli.CreateKafkaSource(objNew)
assert.NilError(t, err)
err = cli.DeleteKafkaSource("samplekafka")
assert.NilError(t, err)
}

func newKafkaSource(name string) *v1alpha1.KafkaSource {
return NewKafkaSourceBuilder(name).
BootstrapServers("test.server.org").
Expand Down
6 changes: 3 additions & 3 deletions plugins/source-kafka/pkg/factories/command_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,16 @@ func (f *kafkaSourceCommandFactory) CreateCommand() *cobra.Command {
createCmd := f.defaultCommandFactory.CreateCommand()
createCmd.Short = "create NAME"
createCmd.Example = `#Creates a new Kafka source named as 'mykafkasrc' which subscribes a Kafka server 'my-cluster-kafka-bootstrap.kafka.svc:9092' at topic 'test-topic' using the consumer group ID 'test-consumer-group' and sends the event messages to service 'event-display'
kn source_kafka create kafka-name --servers my-cluster-kafka-bootstrap.kafka.svc:9092 --topics test-topic --consumergroup test-consumer-group --sink svc:event-display`
kn source_kafka create mykafkasrc --servers my-cluster-kafka-bootstrap.kafka.svc:9092 --topics test-topic --consumergroup test-consumer-group --sink svc:event-display`
return createCmd
}

func (f *kafkaSourceCommandFactory) DeleteCommand() *cobra.Command {
deleteCmd := f.defaultCommandFactory.DeleteCommand()
deleteCmd.Short = "delete NAME"
deleteCmd.Long = "delete a Kafka source"
deleteCmd.Example = `#Deletes a Kafka source with NAME
kn source_kafka delete kafka-name`
deleteCmd.Example = `#Deletes a Kafka source with name 'mykafkasrc'
kn source_kafka delete mykafkasrc`
return deleteCmd
}

Expand Down
28 changes: 25 additions & 3 deletions plugins/source-kafka/pkg/factories/rune_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ func (f *kafkaSourceRunEFactory) CreateRunE() sourcetypes.RunE {
return err
}
f.kafkaSourceClient = f.KafkaSourceClient(namespace)
fmt.Printf("%s RunE function called for Kafka source: args: %#v, client: %#v\n", cmd.Name(), args, f.kafkaSourceClient)

if len(args) != 1 {
return errors.New("requires the name of the source to create as single argument")
Expand Down Expand Up @@ -110,8 +109,31 @@ func (f *kafkaSourceRunEFactory) CreateRunE() sourcetypes.RunE {

func (f *kafkaSourceRunEFactory) DeleteRunE() sourcetypes.RunE {
return func(cmd *cobra.Command, args []string) error {
fmt.Printf("%s RunE function called for Kafka source: args: %#v, client: %#v\n", cmd.Name(), args, f.kafkaSourceClient)
return nil
var err error
namespace, err := f.KnSourceParams().GetNamespace(cmd)
if err != nil {
return err
}
f.kafkaSourceClient = f.KafkaSourceClient(namespace)

if len(args) != 1 {
return errors.New("requires the name of the source to create as single argument")
}
name := args[0]

err = f.kafkaSourceClient.DeleteKafkaSource(name)

if err != nil {
return fmt.Errorf(
"cannot delete KafkaSource '%s' in namespace '%s' "+
"because: %s", name, f.kafkaSourceClient.Namespace(), err)
}

if err == nil {
fmt.Fprintf(cmd.OutOrStdout(), "Kafka source '%s' deleted in namespace '%s'.\n", args[0], f.kafkaSourceClient.Namespace())
}

return err
}
}

Expand Down
6 changes: 6 additions & 0 deletions plugins/source-kafka/pkg/factories/rune_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ func TestCreateRunE(t *testing.T) {
assert.Assert(t, function != nil)
}

func TestDeleteRunE(t *testing.T) {
runEFactory := createKafkaSourceRunEFactory()
function := runEFactory.DeleteRunE()
assert.Assert(t, function != nil)
}

// Private

func createKafkaSourceRunEFactory() types.KafkaSourceRunEFactory {
Expand Down
1 change: 1 addition & 0 deletions plugins/source-kafka/pkg/types/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
type KafkaSourceClient interface {
sourcetypes.KnSourceClient
CreateKafkaSource(kafkaSource *v1alpha1.KafkaSource) error
DeleteKafkaSource(name string) error
}

type KafkaSourceFactory interface {
Expand Down

0 comments on commit 5eb4b46

Please sign in to comment.