Skip to content

Commit

Permalink
fixes #567: ( Impact on BC/DR - Data loss ) - High priority, Nodes wi…
Browse files Browse the repository at this point in the history
…th PointValue property cannot be successfully sink to Neo4j With Neo4j Streams Plugin
  • Loading branch information
conker84 committed Aug 30, 2023
1 parent 6bfff36 commit 8197995
Show file tree
Hide file tree
Showing 6 changed files with 309 additions and 6 deletions.
2 changes: 0 additions & 2 deletions common/src/main/kotlin/streams/events/StreamsEvent.kt
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package streams.events

import org.neo4j.graphdb.schema.ConstraintType

enum class OperationType { created, updated, deleted }

data class Meta(val timestamp: Long,
Expand Down
129 changes: 127 additions & 2 deletions common/src/main/kotlin/streams/utils/JSONUtils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,38 @@ package streams.utils

import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.core.JsonParser
import com.fasterxml.jackson.core.JsonProcessingException
import com.fasterxml.jackson.databind.*
import com.fasterxml.jackson.databind.DeserializationContext
import com.fasterxml.jackson.databind.DeserializationFeature
import com.fasterxml.jackson.databind.JsonDeserializer
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.JsonSerializer
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.SerializationFeature
import com.fasterxml.jackson.databind.SerializerProvider
import com.fasterxml.jackson.databind.module.SimpleModule
import com.fasterxml.jackson.module.kotlin.convertValue
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import org.neo4j.driver.internal.value.PointValue
import org.neo4j.function.ThrowingBiConsumer
import org.neo4j.graphdb.spatial.Point
import org.neo4j.values.AnyValue
import org.neo4j.values.storable.CoordinateReferenceSystem
import streams.events.*
import org.neo4j.values.storable.Values
import org.neo4j.values.virtual.MapValue
import org.neo4j.values.virtual.MapValueBuilder
import streams.events.EntityType
import streams.events.Meta
import streams.events.NodePayload
import streams.events.Payload
import streams.events.RecordChange
import streams.events.RelationshipPayload
import streams.events.Schema
import streams.events.StreamsTransactionEvent
import streams.events.StreamsTransactionNodeEvent
import streams.events.StreamsTransactionRelationshipEvent
import streams.extensions.asStreamsMap
import java.io.IOException
import java.time.temporal.TemporalAccessor
Expand All @@ -33,6 +55,13 @@ fun Point.toStreamsPoint(): StreamsPoint {
}
}

fun Map<String, Any>.toMapValue(): MapValue {
val map = this
val builder = MapValueBuilder()
map.forEach { (t, u) -> builder.add(t, Values.of(u) ) }
return builder.build()
}

fun PointValue.toStreamsPoint(): StreamsPoint {
val point = this.asPoint()
return when (val crsType = point.srid()) {
Expand Down Expand Up @@ -121,6 +150,100 @@ class DriverRelationshipSerializer : JsonSerializer<org.neo4j.driver.types.Relat
}
}

class StreamsTransactionRelationshipEventDeserializer : StreamsTransactionEventDeserializer<StreamsTransactionRelationshipEvent, RelationshipPayload>() {
override fun createEvent(meta: Meta, payload: RelationshipPayload, schema: Schema): StreamsTransactionRelationshipEvent {
return StreamsTransactionRelationshipEvent(meta, payload, schema)
}

override fun convertPayload(payloadMap: JsonNode): RelationshipPayload {
return JSONUtils.convertValue<RelationshipPayload>(payloadMap)
}

override fun fillPayload(payload: RelationshipPayload,
beforeProps: Map<String, Any>?,
afterProps: Map<String, Any>?): RelationshipPayload {
return payload.copy(
before = payload.before?.copy(properties = beforeProps),
after = payload.after?.copy(properties = afterProps)
)
}

override fun deserialize(parser: JsonParser, context: DeserializationContext): StreamsTransactionRelationshipEvent {
val deserialized = super.deserialize(parser, context)
if (deserialized.payload.type == EntityType.node) {
throw IllegalArgumentException("Relationship event expected, but node type found")
}
return deserialized
}

}

class StreamsTransactionNodeEventDeserializer : StreamsTransactionEventDeserializer<StreamsTransactionNodeEvent, NodePayload>() {
override fun createEvent(meta: Meta, payload: NodePayload, schema: Schema): StreamsTransactionNodeEvent {
return StreamsTransactionNodeEvent(meta, payload, schema)
}

override fun convertPayload(payloadMap: JsonNode): NodePayload {
return JSONUtils.convertValue<NodePayload>(payloadMap)
}

override fun fillPayload(payload: NodePayload,
beforeProps: Map<String, Any>?,
afterProps: Map<String, Any>?): NodePayload {
return payload.copy(
before = payload.before?.copy(properties = beforeProps),
after = payload.after?.copy(properties = afterProps)
)
}

override fun deserialize(parser: JsonParser, context: DeserializationContext): StreamsTransactionNodeEvent {
val deserialized = super.deserialize(parser, context)
if (deserialized.payload.type == EntityType.relationship) {
throw IllegalArgumentException("Node event expected, but relationship type found")
}
return deserialized
}

}

abstract class StreamsTransactionEventDeserializer<EVENT, PAYLOAD: Payload> : JsonDeserializer<EVENT>() {

abstract fun createEvent(meta: Meta, payload: PAYLOAD, schema: Schema): EVENT
abstract fun convertPayload(payloadMap: JsonNode): PAYLOAD
abstract fun fillPayload(payload: PAYLOAD,
beforeProps: Map<String, Any>?,
afterProps: Map<String, Any>?): PAYLOAD

@Throws(IOException::class, JsonProcessingException::class)
override fun deserialize(parser: JsonParser, context: DeserializationContext): EVENT {
val root: JsonNode = parser.codec.readTree(parser)
val meta = JSONUtils.convertValue<Meta>(root["meta"])
val schema = JSONUtils.convertValue<Schema>(root["schema"])
val points = schema.properties.filterValues { it == "PointValue" }.keys
var payload = convertPayload(root["payload"])
if (points.isNotEmpty()) {
val beforeProps = convertPoints(payload.before, points)
val afterProps = convertPoints(payload.after, points)
payload = fillPayload(payload, beforeProps, afterProps)
}
return createEvent(meta, payload, schema)
}

private fun convertPoints(
recordChange: RecordChange?,
points: Set<String>
) = recordChange
?.properties
?.mapValues {
if (points.contains(it.key)) {
org.neo4j.values.storable.PointValue.fromMap((it.value as Map<String, Any>).toMapValue())
} else {
it.value
}
}

}

object JSONUtils {

private val OBJECT_MAPPER: ObjectMapper = jacksonObjectMapper()
Expand All @@ -133,6 +256,8 @@ object JSONUtils {
StreamsUtils.ignoreExceptions({ module.addSerializer(org.neo4j.driver.types.Point::class.java, DriverPointSerializer()) }, NoClassDefFoundError::class.java) // in case is loaded from
StreamsUtils.ignoreExceptions({ module.addSerializer(org.neo4j.driver.types.Node::class.java, DriverNodeSerializer()) }, NoClassDefFoundError::class.java) // in case is loaded from
StreamsUtils.ignoreExceptions({ module.addSerializer(org.neo4j.driver.types.Relationship::class.java, DriverRelationshipSerializer()) }, NoClassDefFoundError::class.java) // in case is loaded from
StreamsUtils.ignoreExceptions({ module.addDeserializer(StreamsTransactionRelationshipEvent::class.java, StreamsTransactionRelationshipEventDeserializer()) }, NoClassDefFoundError::class.java) // in case is loaded from
StreamsUtils.ignoreExceptions({ module.addDeserializer(StreamsTransactionNodeEvent::class.java, StreamsTransactionNodeEventDeserializer()) }, NoClassDefFoundError::class.java) // in case is loaded from
module.addSerializer(TemporalAccessor::class.java, TemporalAccessorSerializer())
OBJECT_MAPPER.registerModule(module)
OBJECT_MAPPER.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)
Expand Down
150 changes: 150 additions & 0 deletions consumer/src/test/kotlin/integrations/kafka/KafkaEventSinkCDCTSE.kt
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,156 @@ class KafkaEventSinkCDCTSE: KafkaEventSinkBaseTSE() {
}, Matchers.equalTo(true), 30, TimeUnit.SECONDS)
}

@Test
fun shouldWriteDataFromSinkWithCDCSchemaTopicAndPointValue() = runBlocking {
val topic = UUID.randomUUID().toString()
db.setConfig("streams.sink.topic.cdc.schema", topic)
db.start()

val constraints = listOf(Constraint(label = "User", type = StreamsConstraintType.UNIQUE, properties = setOf("name", "surname")))
val nodeSchema = Schema(
properties = mapOf(
"name" to "String",
"surname" to "String",
"comp@ny" to "String",
"bornIn2d" to "PointValue",
"bornIn3d" to "PointValue",
"livesIn2d" to "PointValue",
"livesIn3d" to "PointValue",
"worksIn2d" to "PointValue",
"worksIn3d" to "PointValue"
),
constraints = constraints)
val relSchema = Schema(properties = mapOf(
"since" to "Long",
"where" to "PointValue"
), constraints = constraints)
val cdcDataStart = StreamsTransactionEvent(
meta = Meta(timestamp = System.currentTimeMillis(),
username = "user",
txId = 1,
txEventId = 0,
txEventsCount = 3,
operation = OperationType.created
),
payload = NodePayload(
id = "0",
before = null,
after = NodeChange(
properties = mapOf(
"name" to "Andrea",
"surname" to "Santurbano",
"comp@ny" to "LARUS-BA",
"bornIn3d" to mapOf(
"crs" to "wgs-84-3d",
"latitude" to 12.78,
"longitude" to 56.7,
"height" to 100.0,
),
"bornIn2d" to mapOf(
"crs" to "wgs-84",
"latitude" to 12.78,
"longitude" to 56.7
),
"livesIn3d" to mapOf(
"crs" to "wgs-84-3d",
"latitude" to 12.79,
"longitude" to 56.71,
"height" to 100.0,
),
"livesIn2d" to mapOf(
"crs" to "wgs-84",
"latitude" to 12.79,
"longitude" to 56.71
),
"worksIn2d" to mapOf(
"crs" to "cartesian",
"x" to 1.2,
"y" to 10.1
),
"worksIn3d" to mapOf(
"crs" to "cartesian-3d",
"x" to 1.2,
"y" to 10.1,
"z" to 7.1
)
),
labels = listOf("User")
)
),
schema = nodeSchema
)
val cdcDataEnd = StreamsTransactionEvent(
meta = Meta(timestamp = System.currentTimeMillis(),
username = "user",
txId = 1,
txEventId = 1,
txEventsCount = 3,
operation = OperationType.created
),
payload = NodePayload(id = "1",
before = null,
after = NodeChange(properties = mapOf("name" to "Michael", "surname" to "Hunger", "comp@ny" to "Neo4j"), labels = listOf("User"))
),
schema = nodeSchema
)
val cdcDataRelationship = StreamsTransactionEvent(
meta = Meta(timestamp = System.currentTimeMillis(),
username = "user",
txId = 1,
txEventId = 2,
txEventsCount = 3,
operation = OperationType.created
),
payload = RelationshipPayload(
id = "2",
start = RelationshipNodeChange(id = "0", labels = listOf("User"), ids = mapOf("name" to "Andrea", "surname" to "Santurbano")),
end = RelationshipNodeChange(id = "1", labels = listOf("User"), ids = mapOf("name" to "Michael", "surname" to "Hunger")),
after = RelationshipChange(properties = mapOf(
"since" to 2014,
"where" to mapOf(
"crs" to "wgs-84-3d",
"latitude" to 12.78,
"longitude" to 56.7,
"height" to 80.0,
)
)),
before = null,
label = "MEET"
),
schema = relSchema
)
var producerRecord = ProducerRecord(topic, UUID.randomUUID().toString(), JSONUtils.writeValueAsBytes(cdcDataStart))
kafkaProducer.send(producerRecord).get()
producerRecord = ProducerRecord(topic, UUID.randomUUID().toString(), JSONUtils.writeValueAsBytes(cdcDataEnd))
kafkaProducer.send(producerRecord).get()
producerRecord = ProducerRecord(topic, UUID.randomUUID().toString(), JSONUtils.writeValueAsBytes(cdcDataRelationship))
kafkaProducer.send(producerRecord).get()

Assert.assertEventually(ThrowingSupplier<Boolean, Exception> {
val query = """
|MATCH (s:User{
| name:'Andrea',
| surname:'Santurbano',
| `comp@ny`:'LARUS-BA',
| bornIn3d: point({x: 56.7, y: 12.78, z: 100.0, crs: 'wgs-84-3d'}),
| bornIn2d: point({x: 56.7, y: 12.78, crs: 'wgs-84'}),
| livesIn3d: point({longitude: 56.71, latitude: 12.79, height: 100}),
| livesIn2d: point({longitude: 56.71, latitude: 12.79}),
| worksIn2d: point({x: 1.2, y: 10.1, crs: 'cartesian'}),
| worksIn3d: point({x: 1.2, y: 10.1, z: 7.1, crs: 'cartesian-3d'})
|})
|MATCH (t:User{name:'Michael', surname:'Hunger', `comp@ny`:'Neo4j'})
|MATCH p = (s)-[r:MEET{since: 2014, where: point({x: 56.7, y: 12.78, z: 80.0, crs: 'wgs-84-3d'})}]->(t)
|RETURN count(p) AS count
|""".trimMargin()
db.execute(query) {
val result = it.columnAs<Long>("count")
result.hasNext() && result.next() == 1L && !result.hasNext()
}
}, Matchers.equalTo(true), 30, TimeUnit.SECONDS)
}

@Test
fun writeDataFromSinkWithCDCSchemaTopicMultipleConstraintsAndLabels() = runBlocking {
val topic = UUID.randomUUID().toString()
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
<modules>
<module>common</module>
<module>test-support</module>
<module>kafka-connect-neo4j</module>
<!-- <module>kafka-connect-neo4j</module>-->
<module>producer</module>
<module>consumer</module>
<module>doc</module>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class KafkaEventRouterEnterpriseTSE {
}
StreamsUtils.ignoreExceptions({
neo4j.withWaitStrategy(LogMessageWaitStrategy()
.withRegEx(".*\\[(${DB_NAME_NAMES.joinToString("|")}|neo4j)] \\[Source\\] Streams Source module initialised\n")
.withRegEx(".*\\[(${DB_NAME_NAMES.joinToString("|")}|neo4j)/\\w+\\] \\[Source\\] Streams Source module initialised\n")
.withTimes(DB_NAME_NAMES.size + 1)
.withStartupTimeout(Duration.ofMinutes(10)))
DB_NAME_NAMES.forEach {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,36 @@ class KafkaEventRouterSimpleTSE: KafkaEventRouterBaseTSE() {
})
}

@Test
fun testCreateNodeWithPointValue() {
db.start()
kafkaConsumer.subscribe(listOf("neo4j"))
db.execute("CREATE (:Person {name:'John Doe', age:42, bornIn: point({longitude: 12.78, latitude: 56.7, height: 100})})")
val records = kafkaConsumer.poll(5000)
assertEquals(1, records.count())
assertEquals(true, records.all {
JSONUtils.asStreamsTransactionEvent(it.value()).let {
val after = it.payload.after as NodeChange
val labels = after.labels
val propertiesAfter = after.properties
val expectedProperties = mapOf(
"name" to "John Doe",
"age" to 42,
"bornIn" to mapOf(
"crs" to "wgs-84-3d",
"latitude" to 12.78,
"longitude" to 56.7,
"height" to 100.0,
)
)
labels == listOf("Person") && propertiesAfter == expectedProperties
&& it.meta.operation == OperationType.created
&& it.schema.properties == mapOf("name" to "String", "age" to "Long", "bornIn" to "PointValue")
&& it.schema.constraints.isEmpty()
}
})
}

@Test
fun testCreateRelationshipWithRelRouting() {
db.setConfig("streams.source.topic.relationships.knows", "KNOWS{*}").start()
Expand Down

0 comments on commit 8197995

Please sign in to comment.