Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hpgrahsl kafka-connect-mongodb and geojson data #107

Closed
opistara opened this issue Oct 9, 2019 · 5 comments
Closed

hpgrahsl kafka-connect-mongodb and geojson data #107

opistara opened this issue Oct 9, 2019 · 5 comments
Assignees

Comments

@opistara
Copy link

opistara commented Oct 9, 2019

Hello Hans-Peter,
I'm using your kafka-connect-mongodb v 1.3.1, with Kafka 0.10.1 and schema registry (confluent) ver 3.1

I also use Nifi 1.5.0 to ingest data and publish it in Kafka topic by publishKafkaRcord processor.

Now if i ingest, a flat, csv file associated to a simple avro schema, everything is ok.
Infact i ingest CSV data, procces single record, publish on Kafka topic, and then mongodb sink consume from topic and save on mongoDB collection. Wonderful!!!

But if i ingest geojson file, my pipeline breaks on your mongoDB sink.

I give you more information about my geojson data processing.

1)To simplify i ingest a geojson with a single record
[ {
"type" : "Feature",
"geometry": {
"type": "Polygon",
"coordinates": [
[
[
18.183730244636536,
40.35109032638699
],
[
18.183746337890625,
40.351016739174426
],
[
18.183730244636536,
40.35109032638699
]
]
]
},
"properties": {
"name": "Z-04",
"description": "parking 1,30 €/hour"
}
}]

  1. this the avro schema that i have regitered on schema-registry

{"type": "record", "name": "piano_sosta_tariffata_lecce", "namespace": "opendata.piano_sosta-le", "fields": [
{"name" : "type", "type" : "string"},
{"name" : "geometry", "type" : {
"type" : "record", "name" : "_geometry", "fields" : [
{"name": "type", "type": "string"},
{"name" : "coordinates", "type" : {
"type": "array",
"items": {
"type": "array",
"items":
{"type": "array",
"items" : "double"
}
}
}
}
]
}
},
{"name" : "properties", "type" :
{
"type" : "record", "name" : "_properties", "fields" : [
{"name": "name", "type": "string"},
{"name": "description", "type": "string"}
]
}
},
{"name": "ingestion_date", "type":
{"type": "int", "logicalType":"date"}
},
{"name": "ingestion_timestamp", "type":
{ "type": "long", "logicalType": "timestamp-millis"}
}
]
}

I have also used an on-line avro validator (https://json-schema-validator.herokuapp.com/avro.jsp) to check the avro schema correctness.

  1. I ingest and publish it on kafka topic, and using "kafka-avro-console-consumer" tool with following command
    sudo bin/kafka-avro-console-consumer --zookeeper hdpmaster1.it:2181 --topic raw_OD_LE_piano_sosta_tariffata --from-beginning
    I consume and can see the record
    {"type":"Feature","geometry":{"type":"Polygon","coordinates":[[[18.183730244636536,40.35109032638699],[18.183746337890625,40.351016739174426],[18.183730244636536,40.35109032638699]]]},"properties":{"name":"","description":""},"ingestion_date":0,"ingestion_timestamp":1570201210514}

  2. At this point, even WorkerSinkTask mongoDB, try to consume the same record on Kafka topic, but it has problem during deserialization, i think.
    This the error that i found on confluent-kafka-connect log:

Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: [2019-10-09 13:37:09,515] ERROR Task mongodb_sink_OD_LE_piano_sosta_tariffata-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerS
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: org.apache.kafka.connect.errors.DataException: error while processing field geometry
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at at.grahsl.kafka.connect.mongodb.converter.AvroJsonSchemafulRecordConverter.processField(AvroJsonSchemafulRecordConverter.java:131)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at at.grahsl.kafka.connect.mongodb.converter.AvroJsonSchemafulRecordConverter.lambda$toBsonDoc$0(AvroJsonSchemafulRecordConverter.java:92)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at java.util.ArrayList.forEach(ArrayList.java:1257)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1080)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at at.grahsl.kafka.connect.mongodb.converter.AvroJsonSchemafulRecordConverter.toBsonDoc(AvroJsonSchemafulRecordConverter.java:92)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at at.grahsl.kafka.connect.mongodb.converter.AvroJsonSchemafulRecordConverter.convert(AvroJsonSchemafulRecordConverter.java:78)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at at.grahsl.kafka.connect.mongodb.converter.SinkConverter.convert(SinkConverter.java:50)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$buildWriteModel$3(MongoDbSinkTask.java:213)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at java.util.ArrayList.forEach(ArrayList.java:1257)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.buildWriteModel(MongoDbSinkTask.java:212)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.processSinkRecords(MongoDbSinkTask.java:143)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$null$0(MongoDbSinkTask.java:118)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at java.util.ArrayList.forEach(ArrayList.java:1257)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$put$1(MongoDbSinkTask.java:117)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at java.util.HashMap.forEach(HashMap.java:1289)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.put(MongoDbSinkTask.java:112)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:384)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:240)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at java.util.concurrent.FutureTask.run(FutureTask.java:266)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at java.lang.Thread.run(Thread.java:748)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: Caused by: org.apache.kafka.connect.errors.DataException: error while processing field coordinates
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at at.grahsl.kafka.connect.mongodb.converter.AvroJsonSchemafulRecordConverter.processField(AvroJsonSchemafulRecordConverter.java:131)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at at.grahsl.kafka.connect.mongodb.converter.AvroJsonSchemafulRecordConverter.lambda$toBsonDoc$0(AvroJsonSchemafulRecordConverter.java:92)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at java.util.ArrayList.forEach(ArrayList.java:1257)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1080)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at at.grahsl.kafka.connect.mongodb.converter.AvroJsonSchemafulRecordConverter.toBsonDoc(AvroJsonSchemafulRecordConverter.java:92)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at at.grahsl.kafka.connect.mongodb.converter.AvroJsonSchemafulRecordConverter.handleStructField(AvroJsonSchemafulRecordConverter.java:178)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at at.grahsl.kafka.connect.mongodb.converter.AvroJsonSchemafulRecordConverter.processField(AvroJsonSchemafulRecordConverter.java:119)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: ... 26 more
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: Caused by: org.apache.kafka.connect.errors.DataException: Cannot list fields on non-struct type
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at org.apache.kafka.connect.data.ConnectSchema.fields(ConnectSchema.java:177)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at at.grahsl.kafka.connect.mongodb.converter.AvroJsonSchemafulRecordConverter.toBsonDoc(AvroJsonSchemafulRecordConverter.java:92)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at at.grahsl.kafka.connect.mongodb.converter.AvroJsonSchemafulRecordConverter.handleArrayField(AvroJsonSchemafulRecordConverter.java:168)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at at.grahsl.kafka.connect.mongodb.converter.AvroJsonSchemafulRecordConverter.processField(AvroJsonSchemafulRecordConverter.java:122)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: ... 32 more
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: [2019-10-09 13:37:09,516] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:405)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: [2019-10-09 13:37:09,516] INFO WorkerSinkTask{id=mongodb_sink_OD_LE_piano_sosta_tariffata-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: [2019-10-09 13:37:09,519] ERROR Task mongodb_sink_OD_LE_piano_sosta_tariffata-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerT
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:406)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:240)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at java.util.concurrent.FutureTask.run(FutureTask.java:266)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at java.lang.Thread.run(Thread.java:748)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: [2019-10-09 13:37:09,519] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:143)
Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: [2019-10-09 13:37:09,519] INFO stopping MongoDB sink task (at.grahsl.kafka.connect.mongodb.MongoDbSinkTask:258)

I suspect that the "array of array" presence in my geojson data creates deserialization problem to the WorkerSinkTask mongoDB.
Is there some aspect I am missing? Can you help me to understand what is the problem
and what workaround adopt to overcome it.

Cheers and thx.

Orazio

@hpgrahsl hpgrahsl self-assigned this Oct 18, 2019
@hpgrahsl
Copy link
Owner

@opistara thx for reaching out. glad to hear you like this project and find it useful!

Based on your detailed issue description including logs it looks very much that the actual problem is indeed related to a multi-level nesting of arrays. I will further investigate to be sure and report back. Cannot promise when there will be a fix for this but it shouldn't take too long.

@hpgrahsl
Copy link
Owner

@opistara you may try the connector based on this branch / snapshot

@hpgrahsl
Copy link
Owner

@opistara it's merged now. so just build the latest from master branch and give it a try.

@opistara
Copy link
Author

opistara commented Dec 2, 2019

@hpgrahsl thanks a lot.

@hpgrahsl
Copy link
Owner

@opistara does it work for your GeoJSON use case now with the provided fix? would be nice to know :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants