Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Not working with AWS MSK Connect #195

Open
rsharma1980 opened this issue Apr 7, 2023 · 11 comments
Open

Not working with AWS MSK Connect #195

rsharma1980 opened this issue Apr 7, 2023 · 11 comments

Comments

@rsharma1980
Copy link

I am trying to create the connector in AWS MSK. I took the latest zip file and created a custom plugin. It gives me error "There is an issue with the connector, Code: UnknownError.Unknown
Message: The last operation failed. Retry the operation" . I passed there variables connection
connector.class=io.aiven.kafka.connect.opensearch.OpensearchSinkConnector
connection.url=<> port used was 9200
tasks.max=1.
I am not sure what is causing it. Is expected to work with AWS?

@gharris1727
Copy link

@rsharma1980 That error is very generic and you will need to investigate further. A variety of things could be wrong: networking, permissions, outages, etc. I am not aware of any incompatibilities that would prevent the connector from working and that the connector needs to fix, but if you do find any, please submit a more detailed bug report.

@HaroonSaid
Copy link

HaroonSaid commented Apr 30, 2023

@rsharma1980 - We use Terraform to create connectors and don't have any issues connecting MSK with Open Search
Sample TF fragment Code (if anyone is interested)


resource "aws_mskconnect_connector" "connectors" {
  for_each = {
    for entry in var.kafkaelasticsearch_connector : "${entry.data_source_tenant}.${entry.data_source_environment}.${replace(entry.index_name, "_", "-")}" => entry if entry.enabled == true
  }

  name                 = "sink-enriched-${replace(each.value.index_name, "_", "-")}-${each.value.data_source_tenant}-${each.value.data_source_environment}"
  kafkaconnect_version = "2.7.1"
  capacity {
    autoscaling {
      mcu_count        = 1
      min_worker_count = 1
      max_worker_count = 2

      scale_in_policy {
        cpu_utilization_percentage = 20
      }

      scale_out_policy {
        cpu_utilization_percentage = 80
      }
    }
  }

  connector_configuration = {
    "connector.class"                                 = "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector"
    "tasks.max"                                       = "2"
    "topics"                                          = each.value.topic_name
    "behavior.on.malformed.documents"                 = "FAIL"
    "behavior.on.null.values"                         = each.value.on_null_values
    "behavior.on.version.conflict"                    = "ignore"
    "connection.url"                                  = each.value.elastic_search_url
    "errors.deadletterqueue.context.headers.enable"   = "true"
    "errors.deadletterqueue.topic.name"               = each.value.topic_dlq
    "errors.deadletterqueue.topic.replication.factor" = "1"
    "errors.tolerance"                                = "all"
    "key.converter"                                   = "org.apache.kafka.connect.storage.StringConverter"
    "key.ignore"                                      = "false"
    "name"                                            = "sink-enriched-${replace(each.value.index_name, "_", "-")}-${each.value.data_source_tenant}-${each.value.data_source_environment}"
    "schema.ignore"                                   = "true"
    "transforms"                                      = "addSuffix"
    "transforms.addSuffix.regex"                      = each.value.topic_name
    "transforms.addSuffix.replacement"                = each.value.index_name
    "transforms.addSuffix.type"                       = "org.apache.kafka.connect.transforms.RegexRouter",
    "type.name"                                       = "_doc",
    "value.converter"                                 = "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable"                  = "false"
    "read.timeout.ms"                                 = "60000"
    "flush.timeout.ms"                                = "60000"
    "write.method"                                    = "UPSERT"
  }

  kafka_cluster {
    apache_kafka_cluster {
      bootstrap_servers = var.msk_bootstrap_brokers

      vpc {
        security_groups = [var.msk_sg.id]
        subnets         = slice(tolist(data.aws_subnets.data.ids), 0, 3)
      }
    }
  }

  kafka_cluster_client_authentication {
    authentication_type = "NONE"
  }

  kafka_cluster_encryption_in_transit {
    encryption_type = "PLAINTEXT"
  }

  plugin {
    custom_plugin {
      arn      = data.aws_mskconnect_custom_plugin.opensearch.arn
      revision = data.aws_mskconnect_custom_plugin.opensearch.latest_revision
    }
  }
  log_delivery {
    worker_log_delivery {
      cloudwatch_logs {
        enabled   = true
        log_group = aws_cloudwatch_log_group.connector.name
      }
    }
  }
  service_execution_role_arn = aws_iam_role.kafka_connector.arn
}
resource "aws_cloudwatch_log_group" "connector" {
  name              = "/${replace(var.tenant, "_", "-")}/${var.environment}/kafka/connectors"
  retention_in_days = var.log_retention_in_days
}

@rsharma1980
Copy link
Author

Thank You @HaroonSaid . This is helpful

@leopoloc0
Copy link

@HaroonSaid how did you created your custom plugin? Im facing the following error: org.apache.kafka.connect.errors.ConnectException: Failed to find any class that implements Connector and which name matches io.aiven.kafka.connect.opensearch.OpensearchSinkConnector but in the logs i also see the instalation of the custom plugin:

[Worker-0fac0a20b870f596d] [2023-10-19 22:50:08,619] INFO Loading plugin from: KAFKA_DIR/plugins/OpenSearchSinkPlugIn.zip_inflated (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:246)
[Worker-0fac0a20b870f596d] [2023-10-19 22:50:08,844] INFO Registered loader: PluginClassLoader{pluginLocation=file:KAFKA_DIR/plugins/OpenSearchSinkPlugIn.zip_inflated/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:269)

@HaroonSaid
Copy link

This sounds like you haven't uploaded the plug-in into S3 and created the custom plugin via AWS Console

@leopoloc0
Copy link

@HaroonSaid I actually did, but i just downloaded the latest zip and uploaded to s3... probably there's something wrong with it.
image

@HaroonSaid
Copy link

Unzip locally and compare with older file zip

@HaroonSaid
Copy link

Try older, not much changes

@HaroonSaid
Copy link

@leopoloc0 keep a close eye on your cost of running the MSK connector
It's not cheap when you start running more than a couple when compared to running it on a container

@leopoloc0
Copy link

@HaroonSaid thanks for the tip man! Also, you were right, the .zip was the issue. It is now working as an MSK connector.

@HaroonSaid
Copy link

@leopoloc0 - we liked the TF integration, but found the cost at scale became prohibitive

We run around several hundred connectors per production environment regions

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants