Skip to content

Commit

Permalink
Fixup CI -- update go to 1.10, add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Mongey committed Jul 19, 2018
1 parent a33f2ec commit 9588f49
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 39 deletions.
26 changes: 25 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ jobs:
build:
working_directory: /go/src/github.com/Mongey/terraform-provider-kafka-connect
docker:
- image: circleci/golang:1.9
- image: circleci/golang:1.10
environment:
TEST_RESULTS: /tmp/test-results
- image: confluentinc/cp-zookeeper:latest
Expand All @@ -16,6 +16,23 @@ jobs:
KAFKA_ZOOKEEPER_CONNECT: localhost:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
- image: confluentinc/cp-kafka-connect:4.0.0-2
environment:
CONNECT_BOOTSTRAP_SERVERS: PLAINTEXT://localhost:9092
CONNECT_GROUP_ID: connect
CONNECT_REST_PORT: 8083
CONNECT_CONFIG_STORAGE_TOPIC: "quickstart-config"
CONNECT_OFFSET_STORAGE_TOPIC: "quickstart-offsets"
CONNECT_STATUS_STORAGE_TOPIC: "quickstart-status"
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "localhost"
CONNECT_PLUGIN_PATH: /usr/share/java
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
steps:
- checkout
- run: go build
Expand All @@ -24,6 +41,13 @@ jobs:
destination: terraform-provider-kafka
- run: go get github.com/jstemmer/go-junit-report
- run: mkdir -p $TEST_RESULTS
- run:
name: Wait for connect
command: |
until $(curl --output /dev/null --silent --head --fail http://localhost:8083); do
printf '.'
sleep 5
done
- run:
name: Run Tests
command: |
Expand Down
2 changes: 1 addition & 1 deletion GNUmakefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ test:
xargs -t -n4 go test $(TESTARGS) -timeout=30s -parallel=4

testacc:
KAFKA_BROKER=localhost:9092 TF_ACC=1 go test $(TEST) -v $(TESTARGS) -timeout 120m
KAFKA_CONNECT_URL=http://localhost:8083 TF_LOG=debug TF_ACC=1 go test $(TEST) -v $(TESTARGS) -timeout 120m

.PHONY: build test testacc
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ your [terraform plugin directory][third-party-plugins] (typically `~/.terraform.

## Example

Configure the provider directly, or set the Env variable `KAFKA_CONNECT_URL`
```hcl
provider "kafka-connect" {
url = "http://localhost:8083"
Expand Down
13 changes: 9 additions & 4 deletions connect/provider.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
package connect

import (
"log"

"github.com/hashicorp/terraform/helper/schema"
"github.com/hashicorp/terraform/terraform"
kc "github.com/ricardo-ch/go-kafka-connect/lib/connectors"
)

func Provider() terraform.ResourceProvider {
log.Printf("[INFO] Creating Provider")
return &schema.Provider{
Schema: map[string]*schema.Schema{
"url": {
Type: schema.TypeString,
Required: true,
Type: schema.TypeString,
Optional: true,
DefaultFunc: schema.EnvDefaultFunc("KAFKA_CONNECT_URL", ""),
},
},

ConfigureFunc: providerConfigure,
ResourcesMap: map[string]*schema.Resource{
"kafka-connect_connector": kafkaConnectorResource(),
Expand All @@ -23,7 +26,9 @@ func Provider() terraform.ResourceProvider {
}

func providerConfigure(d *schema.ResourceData) (interface{}, error) {
c := kc.NewClient(d.Get("url").(string))
log.Printf("[INFO] Initializing KafkaConnect client")
addr := d.Get("url").(string)
c := kc.NewClient(addr)

return c, nil
}
18 changes: 12 additions & 6 deletions connect/provider_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package connect

import (
"log"
"testing"

"github.com/hashicorp/terraform/helper/schema"
Expand All @@ -10,18 +11,23 @@ import (
var testProvider *schema.Provider
var testProviders map[string]terraform.ResourceProvider

func init() {
testProvider = Provider().(*schema.Provider)
testProviders = map[string]terraform.ResourceProvider{
"kafka-connect": testProvider,
}
}

func TestProvider(t *testing.T) {
if err := Provider().(*schema.Provider).InternalValidate(); err != nil {
t.Fatalf("err: %s", err)
}
}

func testAccPreCheck(t *testing.T) {
}

func accProvider() map[string]terraform.ResourceProvider {
provider := Provider().(*schema.Provider)
return map[string]terraform.ResourceProvider{
"kafka-connect": provider,
client := testProvider.Meta()
log.Printf("[INFO] Checking KafkaConnect client")
if client == nil {
//t.Fatal("No client")
}
}
60 changes: 33 additions & 27 deletions connect/resource_kafka_connector.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package connect

import (
"fmt"
"log"

"github.com/hashicorp/terraform/helper/schema"
Expand Down Expand Up @@ -35,26 +36,17 @@ func kafkaConnectorResource() *schema.Resource {

func connectorCreate(d *schema.ResourceData, meta interface{}) error {
c := meta.(kc.Client)
name := d.Get("name").(string)

cfg := d.Get("config").(map[string]interface{})
config := make(map[string]string)

for k, v := range cfg {
switch v := v.(type) {
case string:
config[k] = v
}
}

name := nameFromRD(d)
config := configFromRD(d)
req := kc.CreateConnectorRequest{
ConnectorRequest: kc.ConnectorRequest{
Name: name,
},
Config: config,
}

_, err := c.CreateConnector(req, true)
connectorResponse, err := c.CreateConnector(req, true)
fmt.Printf("[INFO] Created the connector %v\n", connectorResponse)

if err == nil {
d.SetId(name)
Expand All @@ -63,14 +55,21 @@ func connectorCreate(d *schema.ResourceData, meta interface{}) error {
return err
}

func nameFromRD(d *schema.ResourceData) string {
return d.Get("name").(string)
}

func connectorDelete(d *schema.ResourceData, meta interface{}) error {
c := meta.(kc.Client)

name := nameFromRD(d)
req := kc.ConnectorRequest{
Name: d.Get("name").(string),
Name: name,
}
_, err := c.DeleteConnector(req, true)

fmt.Printf("[INFO] Deleing the connector %s\n", name)

_, err := c.DeleteConnector(req, true)
if err == nil {
d.SetId("")
}
Expand All @@ -81,16 +80,8 @@ func connectorDelete(d *schema.ResourceData, meta interface{}) error {
func connectorUpdate(d *schema.ResourceData, meta interface{}) error {
c := meta.(kc.Client)

name := d.Get("name").(string)
cfg := d.Get("config").(map[string]interface{})
config := make(map[string]string)

for k, v := range cfg {
switch v := v.(type) {
case string:
config[k] = v
}
}
name := nameFromRD(d)
config := configFromRD(d)

req := kc.CreateConnectorRequest{
ConnectorRequest: kc.ConnectorRequest{
Expand All @@ -103,12 +94,13 @@ func connectorUpdate(d *schema.ResourceData, meta interface{}) error {
conn, err := c.UpdateConnector(req, true)

if err == nil {
log.Printf("[INFO] this the shit %v", conn.Config)
log.Printf("[INFO] Config updated %v", conn.Config)
d.Set("config", conn.Config)
}

return err
}

func connectorRead(d *schema.ResourceData, meta interface{}) error {
c := meta.(kc.Client)

Expand All @@ -120,9 +112,23 @@ func connectorRead(d *schema.ResourceData, meta interface{}) error {
conn, err := c.GetConnector(req)

if err == nil {
log.Printf("[INFO] this the shit %v", conn.Config)
log.Printf("[INFO] found the config %v", conn.Config)
d.Set("config", conn.Config)
}

return err
}

func configFromRD(d *schema.ResourceData) map[string]string {
cfg := d.Get("config").(map[string]interface{})
config := make(map[string]string)

for k, v := range cfg {
switch v := v.(type) {
case string:
config[k] = v
}
}

return config
}
107 changes: 107 additions & 0 deletions connect/resource_kafka_connector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package connect

import (
"fmt"
"testing"

r "github.com/hashicorp/terraform/helper/resource"
"github.com/hashicorp/terraform/terraform"
kc "github.com/ricardo-ch/go-kafka-connect/lib/connectors"
)

func TestAccConnectorConfigUpdate(t *testing.T) {
r.Test(t, r.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testProviders,
Steps: []r.TestStep{
{
Config: testResourceConnector_initialConfig,
Check: testResourceConnector_initialCheck,
},
{
Config: testResourceConnector_updateConfig,
Check: testResourceConnector_updateCheck,
},
},
})
}

func testResourceConnector_initialCheck(s *terraform.State) error {
resourceState := s.Modules[0].Resources["kafka-connect_connector.test"]
if resourceState == nil {
return fmt.Errorf("resource not found in state")
}

instanceState := resourceState.Primary
if instanceState == nil {
return fmt.Errorf("resource has no primary instance")
}

name := instanceState.ID

if name != instanceState.Attributes["name"] {
return fmt.Errorf("id doesn't match name")
}

client := testProvider.Meta().(kc.Client)

c, err := client.GetConnector(kc.ConnectorRequest{Name: "sqlite-sink"})
if err != nil {
return err
}

tasksMax := c.Config["tasks.max"]
expected := "2"
if tasksMax != expected {
return fmt.Errorf("tasks.max should be %s, got %s connector not updated. \n %v", expected, tasksMax, c.Config)
}

return nil
}

func testResourceConnector_updateCheck(s *terraform.State) error {
client := testProvider.Meta().(kc.Client)

c, err := client.GetConnector(kc.ConnectorRequest{Name: "sqlite-sink"})
if err != nil {
return err
}

tasksMax := c.Config["tasks.max"]
expected := "1"
if tasksMax != expected {
return fmt.Errorf("tasks.max should be %s, got %s connector not updated. \n %v", expected, tasksMax, c.Config)
}

return nil
}

const testResourceConnector_initialConfig = `
resource "kafka-connect_connector" "test" {
name = "sqlite-sink"
config = {
"name" = "sqlite-sink"
"connector.class" = "io.confluent.connect.jdbc.JdbcSinkConnector"
"tasks.max" = "2"
"topics" = "orders"
"connection.url" = "jdbc:sqlite:test.db"
"auto.create" = "true"
}
}
`

const testResourceConnector_updateConfig = `
resource "kafka-connect_connector" "test" {
name = "sqlite-sink"
config = {
"name" = "sqlite-sink"
"connector.class" = "io.confluent.connect.jdbc.JdbcSinkConnector"
"tasks.max" = "1"
"topics" = "orders"
"connection.url" = "jdbc:sqlite:test.db"
"auto.create" = "true"
}
}
`

0 comments on commit 9588f49

Please sign in to comment.