Skip to content

Commit

Permalink
Upgrade ZIO to RC16 (#56)
Browse files Browse the repository at this point in the history
  • Loading branch information
iravid authored Oct 31, 2019
1 parent 5f01747 commit 43f066f
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 8 deletions.
6 changes: 3 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ buildInfoPackage := "zio.kafka"
buildInfoObject := "BuildInfo"

libraryDependencies ++= Seq(
"dev.zio" %% "zio-streams" % "1.0.0-RC15",
"dev.zio" %% "zio-test" % "1.0.0-RC15" % "test",
"dev.zio" %% "zio-test-sbt" % "1.0.0-RC15" % "test",
"dev.zio" %% "zio-streams" % "1.0.0-RC16",
"dev.zio" %% "zio-test" % "1.0.0-RC16" % "test",
"dev.zio" %% "zio-test-sbt" % "1.0.0-RC16" % "test",
"org.apache.kafka" % "kafka-clients" % "2.3.0",
"org.scalatest" %% "scalatest" % "3.0.5" % "test",
"io.github.embeddedkafka" %% "embedded-kafka" % "2.3.0" % "test",
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/zio/kafka/client/Runloop.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ object Runloop {

val isRebalancing = rebalancingRef.get

def polls = ZStream(Command.Poll()).repeat(Schedule.spaced(pollFrequency))
def polls = ZStream(Command.Poll()).repeat(ZSchedule.spaced(pollFrequency))
def newPartition(tp: TopicPartition, data: StreamChunk[Throwable, ByteArrayCommittableRecord]) =
partitions.offer(Take.Value(tp -> data)).unit

Expand Down
6 changes: 2 additions & 4 deletions src/test/scala/zio/kafka/client/ConsumerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,13 @@ class ConsumerTest extends WordSpecLike with Matchers with LazyLogging with Defa
.flatMap(_._2.flattenChunks)
.take(5)
.transduce(ZSink.collectAll[CommittableRecord[String, String]])
.mapM { committableRecords =>
.mapConcatM { committableRecords =>
val records = committableRecords.map(_.record)
val offsetBatch =
committableRecords.foldLeft(OffsetBatch.empty)(_ merge _.offset)

offsetBatch.commit.as(records)
}
.mapConcat(Chunk.fromIterable)
.runCollect
} yield results
}
Expand All @@ -115,14 +114,13 @@ class ConsumerTest extends WordSpecLike with Matchers with LazyLogging with Defa
.flatMap(_._2.flattenChunks)
.take(5)
.transduce(ZSink.collectAll[CommittableRecord[String, String]])
.mapM { committableRecords =>
.mapConcatM { committableRecords =>
val records = committableRecords.map(_.record)
val offsetBatch =
committableRecords.foldLeft(OffsetBatch.empty)(_ merge _.offset)

offsetBatch.commit.as(records)
}
.mapConcat(Chunk.fromIterable)
.runCollect
} yield results
}
Expand Down

0 comments on commit 43f066f

Please sign in to comment.