Skip to content

Commit

Permalink
fix sync
Browse files Browse the repository at this point in the history
  • Loading branch information
jose-torres committed Jan 12, 2018
1 parent cea2ddc commit 49f1eb6
Showing 1 changed file with 5 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -742,14 +742,14 @@ class KafkaSourceSuiteBase extends KafkaSourceTest {
val query = kafka
.writeStream
.format("memory")
.outputMode("append")
.queryName("kafkaColumnTypes")
.trigger(defaultTrigger)
.start()
query.processAllAvailable()
val rows = spark.table("kafkaColumnTypes").collect()
assert(rows.length === 1, s"Unexpected results: ${rows.toList}")
val row = rows(0)
eventually(timeout(streamingTimeout)) {
assert(spark.table("kafkaColumnTypes").count == 1,
s"Unexpected results: ${spark.table("kafkaColumnTypes").collectAsList()}")
}
val row = spark.table("kafkaColumnTypes").head()
assert(row.getAs[Array[Byte]]("key") === null, s"Unexpected results: $row")
assert(row.getAs[Array[Byte]]("value") === "1".getBytes(UTF_8), s"Unexpected results: $row")
assert(row.getAs[String]("topic") === topic, s"Unexpected results: $row")
Expand Down

0 comments on commit 49f1eb6

Please sign in to comment.