Skip to content

Commit

Permalink
x
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Jan 27, 2024
1 parent 753c2af commit 0ca2f82
Showing 1 changed file with 22 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import pekko.http.scaladsl.unmarshalling.{ FromResponseUnmarshaller, Unmarshal,
import pekko.stream.Materializer
import pekko.stream.connectors.google.http.GoogleHttp
import pekko.stream.connectors.google.util.{ AnnotateLast, EitherFlow, MaybeLast, Retry }
import pekko.stream.scaladsl.{ Flow, Keep, RetryFlow, Sink }
import pekko.stream.scaladsl.{ Flow, Keep, RetryFlow, Sink, Source }
import pekko.util.ByteString

import scala.concurrent.Future
Expand Down Expand Up @@ -179,22 +179,27 @@ private[connectors] object ResumableUpload {

private def chunker(chunkSize: Int): Flow[ByteString, ByteString, NotUsed] =
Flow[ByteString]
.map(Some(_))
.concat(Source.single(None))
.statefulMap(() => ByteString.newBuilder)((chunkBuilder, bytes) => {
chunkBuilder ++= bytes
if (chunkBuilder.length < chunkSize) {
(chunkBuilder, ByteString.empty)
} else if (chunkBuilder.length == chunkSize) {
chunkBuilder.clear()
(chunkBuilder, chunkBuilder.result())
} else {
// chunkBuilder.length > chunkSize
val result = chunkBuilder.result()
chunkBuilder.clear()
val (chunk, init) = result.splitAt(chunkSize)
chunkBuilder ++= init
(chunkBuilder, chunk)
val result = bytes.fold(Some(chunkBuilder.result()).filter(_.nonEmpty).toList) { bytes =>
chunkBuilder ++= bytes
if (chunkBuilder.length < chunkSize) {
Nil
} else if (chunkBuilder.length == chunkSize) {
val chunk = chunkBuilder.result()
chunkBuilder.clear()
chunk :: Nil
} else {
// chunkBuilder.length > chunkSize
val result = chunkBuilder.result()
chunkBuilder.clear()
val (chunk, init) = result.splitAt(chunkSize)
chunkBuilder ++= init
chunk :: Nil
}
}
}, chunkBuilder => Some(chunkBuilder.result()))
.filter(_.nonEmpty)

(chunkBuilder, result)
}, _ => None)
.mapConcat(identity())
}

0 comments on commit 0ca2f82

Please sign in to comment.