Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issue-193: streaming sink improvements #310

Merged
merged 1 commit into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
"mode": "debug",
// this assumes your workspace is the root of the repo
"program": "${workspaceFolder}",
"env": {},
"env": {
"ASTRA_API_TOKEN": "<mytoken>",
},
"args": [
// pass the debug flag for reattaching
"-debug",
],
}
]
}
}
1 change: 1 addition & 0 deletions docs/resources/streaming_sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ resource "astra_streaming_sink" "streaming_sink" {

### Optional

- `archive` (String) Name of the sink archive type to use. Defaults to the value of sink_name. Must be formatted as a URL, e.g. 'builtin://jdbc-clickhouse
- `deletion_protection` (Boolean) Whether or not to allow Terraform to destroy this streaming sink. Unless this field is set to false in Terraform state, a `terraform destroy` or `terraform apply` command that deletes the instance will fail. Defaults to `true`.

### Read-Only
Expand Down
186 changes: 68 additions & 118 deletions internal/provider/resource_streaming_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package provider
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"regexp"
Expand Down Expand Up @@ -65,6 +64,13 @@ func resourceStreamingSink() *schema.Resource {
ForceNew: true,
ValidateFunc: validation.StringMatch(regexp.MustCompile("^.{2,}"), "name must be atleast 2 characters"),
},
"archive": {
Description: "Name of the sink archive type to use. Defaults to the value of sink_name. Must be formatted as a URL, e.g. 'builtin://jdbc-clickhouse",
Type: schema.TypeString,
Optional: true,
ForceNew: true,
ValidateFunc: validation.StringMatch(regexp.MustCompile("^.{2,}"), "name must be atleast 2 characters"),
},
"retain_ordering": {
Description: "Retain ordering.",
Type: schema.TypeBool,
Expand Down Expand Up @@ -136,12 +142,14 @@ func resourceStreamingSinkDelete(ctx context.Context, resourceData *schema.Resou

pulsarCluster := getPulsarCluster("", cloudProvider, region, "")

orgResp, _ := client.GetCurrentOrganization(ctx)
orgResp, err := client.GetCurrentOrganization(ctx)
if err != nil {
return diag.Errorf("failed to get current organization ID: %v", err)
}

var org OrgId
err := json.NewDecoder(orgResp.Body).Decode(&org)
if err != nil {
return diag.FromErr(err)
if err := json.NewDecoder(orgResp.Body).Decode(&org); err != nil {
return diag.Errorf("failed to decode current organization ID: %v", err)
}

token := meta.(astraClients).token
Expand Down Expand Up @@ -170,38 +178,33 @@ func resourceStreamingSinkDelete(ctx context.Context, resourceData *schema.Resou
}

type SinkResponse struct {
Tenant string `json:"tenant"`
Namespace string `json:"namespace"`
Name string `json:"name"`
ClassName string `json:"className"`
SourceSubscriptionName interface{} `json:"sourceSubscriptionName"`
SourceSubscriptionPosition string `json:"sourceSubscriptionPosition"`
Inputs interface{} `json:"inputs"`
TopicToSerdeClassName interface{} `json:"topicToSerdeClassName"`
TopicsPattern interface{} `json:"topicsPattern"`
TopicToSchemaType interface{} `json:"topicToSchemaType"`
TopicToSchemaProperties interface{} `json:"topicToSchemaProperties"`
MaxMessageRetries interface{} `json:"maxMessageRetries"`
DeadLetterTopic interface{} `json:"deadLetterTopic"`
Configs struct {
Password string `json:"password"`
JdbcURL string `json:"jdbcUrl"`
UserName string `json:"userName"`
TableName string `json:"tableName"`
} `json:"configs"`
Secrets interface{} `json:"secrets"`
Parallelism int `json:"parallelism"`
ProcessingGuarantees string `json:"processingGuarantees"`
RetainOrdering bool `json:"retainOrdering"`
RetainKeyOrdering bool `json:"retainKeyOrdering"`
Resources interface{} `json:"resources"`
AutoAck bool `json:"autoAck"`
TimeoutMs interface{} `json:"timeoutMs"`
NegativeAckRedeliveryDelayMs interface{} `json:"negativeAckRedeliveryDelayMs"`
Archive string `json:"archive"`
CleanupSubscription interface{} `json:"cleanupSubscription"`
RuntimeFlags interface{} `json:"runtimeFlags"`
CustomRuntimeOptions interface{} `json:"customRuntimeOptions"`
Tenant string `json:"tenant"`
Namespace string `json:"namespace"`
Name string `json:"name"`
ClassName string `json:"className"`
SourceSubscriptionName interface{} `json:"sourceSubscriptionName"`
SourceSubscriptionPosition string `json:"sourceSubscriptionPosition"`
Inputs interface{} `json:"inputs"`
TopicToSerdeClassName interface{} `json:"topicToSerdeClassName"`
TopicsPattern interface{} `json:"topicsPattern"`
TopicToSchemaType interface{} `json:"topicToSchemaType"`
TopicToSchemaProperties interface{} `json:"topicToSchemaProperties"`
MaxMessageRetries interface{} `json:"maxMessageRetries"`
DeadLetterTopic interface{} `json:"deadLetterTopic"`
Configs map[string]interface{} `json:"configs"`
Secrets interface{} `json:"secrets"`
Parallelism int `json:"parallelism"`
ProcessingGuarantees string `json:"processingGuarantees"`
RetainOrdering bool `json:"retainOrdering"`
RetainKeyOrdering bool `json:"retainKeyOrdering"`
Resources interface{} `json:"resources"`
AutoAck bool `json:"autoAck"`
TimeoutMs interface{} `json:"timeoutMs"`
NegativeAckRedeliveryDelayMs interface{} `json:"negativeAckRedeliveryDelayMs"`
Archive string `json:"archive"`
CleanupSubscription interface{} `json:"cleanupSubscription"`
RuntimeFlags interface{} `json:"runtimeFlags"`
CustomRuntimeOptions interface{} `json:"customRuntimeOptions"`
}

func resourceStreamingSinkRead(ctx context.Context, resourceData *schema.ResourceData, meta interface{}) diag.Diagnostics {
Expand All @@ -220,14 +223,14 @@ func resourceStreamingSinkRead(ctx context.Context, resourceData *schema.Resourc

pulsarCluster := getPulsarCluster("", cloudProvider, region, "")

orgBody, _ := client.GetCurrentOrganization(ctx)
orgBody, err := client.GetCurrentOrganization(ctx)
if err != nil {
return diag.Errorf("failed to get current organization ID: %v", err)
}

var org OrgId
bodyBuffer, err := io.ReadAll(orgBody.Body)

err = json.Unmarshal(bodyBuffer, &org)
if err != nil {
fmt.Println("Can't deserislize", orgBody)
if err = json.NewDecoder(orgBody.Body).Decode(&org); err != nil {
return diag.Errorf("failed to decode current organization ID: %v", err)
}

token := meta.(astraClients).token
Expand All @@ -244,13 +247,14 @@ func resourceStreamingSinkRead(ctx context.Context, resourceData *schema.Resourc
getSinkResponse, err := streamingClientv3.GetSinksWithResponse(ctx, tenantName, namespace, sinkName, &getSinksParams)
if err != nil {
diag.FromErr(err)
}
if !strings.HasPrefix(getSinkResponse.Status(), "2") {
return diag.Errorf("Error getting sinks %s", getSinkResponse.Body)
} else if getSinkResponse.StatusCode() > 299 {
return diag.Errorf("failed to get sink, status code %d, message: %s", getSinkResponse.StatusCode(), getSinkResponse.Body)
}

var sinkResponse SinkResponse
json.Unmarshal(getSinkResponse.Body, sinkResponse)
if err := json.Unmarshal(getSinkResponse.Body, &sinkResponse); err != nil {
return diag.Errorf("failed to read sink response: %v", err)
}

setStreamingSinkData(resourceData, tenantName, topic)

Expand All @@ -268,6 +272,7 @@ func resourceStreamingSinkCreate(ctx context.Context, resourceData *schema.Resou
tenantName := resourceData.Get("tenant_name").(string)

sinkName := resourceData.Get("sink_name").(string)
archive := resourceData.Get("archive").(string)
retainOrdering := resourceData.Get("retain_ordering").(bool)
processingGuarantees := resourceData.Get("processing_guarantees").(string)
parallelism := int32(resourceData.Get("parallelism").(int))
Expand All @@ -276,14 +281,18 @@ func resourceStreamingSinkCreate(ctx context.Context, resourceData *schema.Resou
topic := resourceData.Get("topic").(string)
autoAck := resourceData.Get("auto_ack").(bool)

if archive == "" {
archive = fmt.Sprintf("builtin://%s", sinkName)
}

orgResp, err := client.GetCurrentOrganization(ctx)
if err != nil {
return diag.FromErr(fmt.Errorf("failed to request organization: %w", err))
return diag.Errorf("failed to get current organization ID: %v", err)
}

var org OrgId
if err := json.NewDecoder(orgResp.Body).Decode(&org); err != nil {
return diag.FromErr(fmt.Errorf("failed to read organization: %w", err))
return diag.Errorf("failed to decode current organization ID: %v", err)
}

streamingClustersResponse, err := streamingClient.GetPulsarClustersWithResponse(ctx, org.ID)
Expand All @@ -292,20 +301,10 @@ func resourceStreamingSinkCreate(ctx context.Context, resourceData *schema.Resou
}

var streamingClusters StreamingClusters
err = json.Unmarshal(streamingClustersResponse.Body, &streamingClusters)
if err != nil {
if err = json.Unmarshal(streamingClustersResponse.Body, &streamingClusters); err != nil {
return diag.FromErr(fmt.Errorf("failed to read pulsar clusters: %w", err))
}

for i := 0; i < len(streamingClusters); i++ {
if streamingClusters[i].CloudProvider == cloudProvider {
if streamingClusters[i].CloudRegion == region {
// TODO - validation
//fmt.Printf("body %s", streamingClusters[i].ClusterName)
}
}
}

pulsarCluster := getPulsarCluster("", cloudProvider, region, "")

token := meta.(astraClients).token
Expand All @@ -314,59 +313,18 @@ func resourceStreamingSinkCreate(ctx context.Context, resourceData *schema.Resou
diag.FromErr(err)
}

createSinkParams := astrastreaming.CreateSinkJSONParams{
XDataStaxPulsarCluster: pulsarCluster,
//XDataStaxCurrentOrg: org.ID,
XDataStaxCurrentOrg: "",
Authorization: fmt.Sprintf("Bearer %s", pulsarToken),
var configs map[string]interface{}
if err := json.Unmarshal([]byte(rawConfigs), &configs); err != nil {
return diag.Errorf("failed to unmarshal sink config: %v", err)
}

getBuiltinSinkParams := astrastreaming.GetBuiltInSinksParams{
createSinkParams := astrastreaming.CreateSinkJSONParams{
XDataStaxPulsarCluster: pulsarCluster,
Authorization: pulsarToken,
}

builtinSinksResponse, err := streamingClientv3.GetBuiltInSinks(ctx, &getBuiltinSinkParams)
if err != nil {
return diag.FromErr(err)
}

type SinkConfig []struct {
Name string `json:"name"`
Description string `json:"description"`
SourceClass interface{} `json:"sourceClass"`
SinkClass string `json:"sinkClass"`
SourceConfigClass interface{} `json:"sourceConfigClass"`
SinkConfigClass interface{} `json:"sinkConfigClass"`
}

var builtinSinks []map[string]interface{}
if err = json.NewDecoder(builtinSinksResponse.Body).Decode(&builtinSinks); err != nil {
return diag.FromErr(fmt.Errorf("failed to read builtin sinks response: %w", err))
}

var sinkConfig map[string]interface{}
for index := range builtinSinks {
for key, element := range builtinSinks[index] {
if key == "name" {
if element == sinkName {
sinkConfig = builtinSinks[index]
}
}

}
}

var configs map[string]interface{}
json.Unmarshal([]byte(rawConfigs), &configs)

if sinkConfig == nil {
return diag.Errorf("Could not find sink name %s in prebuilt sinks", sinkName)
XDataStaxCurrentOrg: "",
Authorization: fmt.Sprintf("Bearer %s", pulsarToken),
}

archive := fmt.Sprintf("builtin://%s", sinkName)

inputs := []string{topic}
sinkInputs := []string{topic}
createSinkBody := astrastreaming.CreateSinkJSONJSONRequestBody{
Archive: &archive,
AutoAck: &autoAck,
Expand All @@ -376,7 +334,7 @@ func resourceStreamingSinkCreate(ctx context.Context, resourceData *schema.Resou
CustomRuntimeOptions: nil,
DeadLetterTopic: nil,
InputSpecs: nil,
Inputs: &inputs,
Inputs: &sinkInputs,
MaxMessageRetries: nil,
Name: &sinkName,
Namespace: &namespace,
Expand Down Expand Up @@ -405,7 +363,7 @@ func resourceStreamingSinkCreate(ctx context.Context, resourceData *schema.Resou
}
if sinkCreationResponse.StatusCode > 299 {
bodyBuffer, _ := io.ReadAll(sinkCreationResponse.Body)
return diag.Errorf("Error creating sink %s", bodyBuffer)
return diag.Errorf("failed to create sink, status code: %d, message %s", sinkCreationResponse.StatusCode, bodyBuffer)
}

setStreamingSinkData(resourceData, tenantName, topic)
Expand All @@ -425,11 +383,3 @@ func setStreamingSinkData(d *schema.ResourceData, tenantName string, topic strin

return nil
}

func parseStreamingSinkID(id string) (string, string, error) {
idParts := strings.Split(strings.ToLower(id), "/")
if len(idParts) != 1 {
return "", "", errors.New("invalid role id format: expected tenant_name/topic")
}
return idParts[0], idParts[1], nil
}
Loading