diff --git a/docs/src/main/paradox/file.md b/docs/src/main/paradox/file.md index fd99b952c9..1b9ca36056 100644 --- a/docs/src/main/paradox/file.md +++ b/docs/src/main/paradox/file.md @@ -168,6 +168,8 @@ This example can be found in the @ref:[self-contained example documentation sect ## ZIP Archive +### Writing ZIP Archives + The @apidoc[Archive$] contains flow for compressing multiple files into one ZIP file. @@ -182,6 +184,21 @@ Scala Java : @@snip [snip](/file/src/test/java/docs/javadsl/ArchiveTest.java) { #sample-zip } +### Reading ZIP archives + + +@apidoc[Archive.zipReader()](Archive$) reads a file in ZIP format, and emitting the metadata entry and a `Source` for every file in the stream. +It is not needed to emit every file, also multiple files can be emitted in parallel. (Every sub-source will seek into the archive.) + +The example below reads the incoming file, and unzip all to the local file system. + +Scala +: @@snip [snip](/file/src/test/scala/docs/scaladsl/ArchiveSpec.scala) { #zip-reader } + +Java +: @@snip [snip](/file/src/test/java/docs/javadsl/ArchiveTest.java) { #sample-zip-read } + + ## TAR Archive ### Writing TAR archives diff --git a/file/src/main/scala/akka/stream/alpakka/file/impl/archive/ZipReaderSource.scala b/file/src/main/scala/akka/stream/alpakka/file/impl/archive/ZipReaderSource.scala new file mode 100644 index 0000000000..901eefebe0 --- /dev/null +++ b/file/src/main/scala/akka/stream/alpakka/file/impl/archive/ZipReaderSource.scala @@ -0,0 +1,100 @@ +/* + * Copyright (C) 2016-2020 Lightbend Inc. + */ + +package akka.stream.alpakka.file.impl.archive + +import akka.NotUsed +import akka.annotation.InternalApi +import akka.stream.{Attributes, Outlet, SourceShape} +import akka.stream.scaladsl.Source +import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler} +import akka.util.ByteString + +import java.io.{File, FileInputStream} +import java.util.zip.{ZipEntry, ZipInputStream} + +case class ZipArchiveMetadata(name: String) + +@InternalApi class ZipEntrySource(n: ZipArchiveMetadata, f: File, chunkSize: Int) + extends GraphStage[SourceShape[ByteString]] { + private val out = Outlet[ByteString]("flowOut") + override val shape: SourceShape[ByteString] = + SourceShape(out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) { + val zis = new ZipInputStream(new FileInputStream(f)) + var entry: ZipEntry = null + val data = new Array[Byte](chunkSize) + + def seek() = { + while ({ + entry = zis.getNextEntry() + entry != null && entry.getName != n.name + }) { + zis.closeEntry() + } + } + + setHandler( + out, + new OutHandler { + override def onPull(): Unit = { + if (entry == null) { + seek() + if (entry == null) { + failStage(new Exception("After a seek the part is not found")) + } + } + + val c = zis.read(data, 0, chunkSize) + if (c == -1) { + completeStage() + } else { + push(out, ByteString.fromArray(data, 0, c)) + } + } + } + ) + + override def postStop(): Unit = { + super.postStop() + zis.close() + } + } +} + +@InternalApi class ZipSource(f: File, chunkSize: Int) + extends GraphStage[SourceShape[(ZipArchiveMetadata, Source[ByteString, NotUsed])]] { + private val out = Outlet[(ZipArchiveMetadata, Source[ByteString, NotUsed])]("flowOut") + override val shape: SourceShape[(ZipArchiveMetadata, Source[ByteString, NotUsed])] = + SourceShape(out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) { + val zis = new ZipInputStream(new FileInputStream(f)) + + setHandler( + out, + new OutHandler { + override def onPull(): Unit = { + val e = zis.getNextEntry + if (e != null) { + val n = ZipArchiveMetadata(e.getName) + zis.closeEntry() + push(out, n -> Source.fromGraph(new ZipEntrySource(n, f, chunkSize))) + } else { + zis.close() + completeStage() + } + } + } + ) + + override def postStop(): Unit = { + super.postStop() + zis.close() + } + } +} diff --git a/file/src/main/scala/akka/stream/alpakka/file/javadsl/Archive.scala b/file/src/main/scala/akka/stream/alpakka/file/javadsl/Archive.scala index 07d5b14d7c..bed6691887 100644 --- a/file/src/main/scala/akka/stream/alpakka/file/javadsl/Archive.scala +++ b/file/src/main/scala/akka/stream/alpakka/file/javadsl/Archive.scala @@ -9,9 +9,11 @@ import akka.stream.alpakka.file.{scaladsl, ArchiveMetadata, TarArchiveMetadata} import akka.stream.javadsl.Flow import akka.util.ByteString import akka.japi.Pair -import akka.stream.alpakka.file.impl.archive.TarReaderStage +import akka.stream.alpakka.file.impl.archive.{TarReaderStage, ZipArchiveMetadata, ZipSource} import akka.stream.javadsl.Source +import java.io.File + /** * Java API. */ @@ -26,6 +28,19 @@ object Archive { .map(func(pair => (pair.first, pair.second.asScala))) .via(scaladsl.Archive.zip().asJava) + /** + * Flow for reading ZIP files. + */ + def zipReader(file: File, chunkSize: Int): Source[Pair[ZipArchiveMetadata, Source[ByteString, NotUsed]], NotUsed] = + Source + .fromGraph(new ZipSource(file, chunkSize)) + .map(func { + case (metadata, source) => + Pair(metadata, source.asJava) + }) + def zipReader(file: File): Source[Pair[ZipArchiveMetadata, Source[ByteString, NotUsed]], NotUsed] = + zipReader(file, 8192) + /** * Flow for packaging multiple files into one TAR file. */ diff --git a/file/src/main/scala/akka/stream/alpakka/file/scaladsl/Archive.scala b/file/src/main/scala/akka/stream/alpakka/file/scaladsl/Archive.scala index 0acc05af05..17ea631f3d 100644 --- a/file/src/main/scala/akka/stream/alpakka/file/scaladsl/Archive.scala +++ b/file/src/main/scala/akka/stream/alpakka/file/scaladsl/Archive.scala @@ -6,10 +6,18 @@ package akka.stream.alpakka.file.scaladsl import akka.NotUsed import akka.stream.alpakka.file.{ArchiveMetadata, TarArchiveMetadata} -import akka.stream.alpakka.file.impl.archive.{TarArchiveManager, TarReaderStage, ZipArchiveManager} +import akka.stream.alpakka.file.impl.archive.{ + TarArchiveManager, + TarReaderStage, + ZipArchiveManager, + ZipArchiveMetadata, + ZipSource +} import akka.stream.scaladsl.{Flow, Source} import akka.util.ByteString +import java.io.File + /** * Scala API. */ @@ -21,6 +29,14 @@ object Archive { def zip(): Flow[(ArchiveMetadata, Source[ByteString, Any]), ByteString, NotUsed] = ZipArchiveManager.zipFlow() + /** + * Flow for reading ZIP files. + */ + def zipReader(file: File, chunkSize: Int): Source[(ZipArchiveMetadata, Source[ByteString, Any]), NotUsed] = + Source.fromGraph(new ZipSource(file, chunkSize)) + def zipReader(file: File): Source[(ZipArchiveMetadata, Source[ByteString, Any]), NotUsed] = + Source.fromGraph(new ZipSource(file, 8192)) + /** * Flow for packaging multiple files into one TAR file. */ diff --git a/file/src/test/java/docs/javadsl/ArchiveTest.java b/file/src/test/java/docs/javadsl/ArchiveTest.java index 0a7423f921..22fcd462d2 100644 --- a/file/src/test/java/docs/javadsl/ArchiveTest.java +++ b/file/src/test/java/docs/javadsl/ArchiveTest.java @@ -12,6 +12,7 @@ import akka.stream.Materializer; import akka.stream.alpakka.file.ArchiveMetadata; import akka.stream.alpakka.file.TarArchiveMetadata; +import akka.stream.alpakka.file.impl.archive.ZipArchiveMetadata; import akka.stream.alpakka.file.javadsl.Archive; import akka.stream.alpakka.file.javadsl.Directory; import akka.stream.alpakka.testkit.javadsl.LogCapturingJunit4; @@ -106,6 +107,23 @@ public void flowShouldCreateZIPArchive() throws Exception { Map unzip = archiveHelper.unzip(resultFileContent); assertThat(inputFiles, is(unzip)); + Path target = Files.createTempDirectory("alpakka-tar-"); + + // #sample-zip-read + Archive.zipReader(Paths.get("logo.zip").toFile()) + .mapAsync( + 4, + pair -> { + ZipArchiveMetadata metadata = pair.first(); + Path targetFile = target.resolve(metadata.name()); + targetFile.toFile().getParentFile().mkdirs(); //missing error handler + Source fSource = pair.second(); + // create the target directory + return fSource + .runWith(FileIO.toPath(targetFile), system) + .thenApply(io -> Done.done()); + }); + // #sample-zip-read // cleanup new File("logo.zip").delete(); diff --git a/file/src/test/scala/docs/scaladsl/ArchiveSpec.scala b/file/src/test/scala/docs/scaladsl/ArchiveSpec.scala index db41745535..7a2e9b28f4 100644 --- a/file/src/test/scala/docs/scaladsl/ArchiveSpec.scala +++ b/file/src/test/scala/docs/scaladsl/ArchiveSpec.scala @@ -5,8 +5,7 @@ package docs.scaladsl import java.io._ -import java.nio.file.{Path, Paths} - +import java.nio.file.{Files, Path, Paths} import akka.actor.ActorSystem import akka.stream.alpakka.file.ArchiveMetadata import akka.stream.alpakka.file.scaladsl.Archive @@ -113,6 +112,46 @@ class ArchiveSpec archiveHelper.unzip(akkaZipped.futureValue).asScala shouldBe inputFiles } + + "unarchive files" in { + val inputFiles = generateInputFiles(5, 100) + val inputStream = filesToStream(inputFiles) + val zipFlow = Archive.zip() + + val akkaZipped: Future[ByteString] = + inputStream + .via(zipFlow) + .runWith(Sink.fold(ByteString.empty)(_ ++ _)) + // #zip-reader + val zipFile = // ??? + // #zip-reader + File.createTempFile("pre", "post") + zipFile.deleteOnExit() + + Source.future(akkaZipped).runWith(FileIO.toPath(zipFile.toPath)).futureValue + + Archive + .zipReader(zipFile) + .map(f => f._1.name -> f._2.runWith(Sink.fold(ByteString.empty)(_ ++ _)).futureValue) + .runWith(Sink.seq) + .futureValue + .toMap shouldBe inputFiles + + // #zip-reader + val target: Path = // ??? + // #zip-reader + Files.createTempDirectory("alpakka-zip-") + // #zip-reader + Archive + .zipReader(zipFile) + .mapAsyncUnordered(4) { + case (metadata, source) => + val targetFile = target.resolve(metadata.name) + targetFile.toFile.getParentFile.mkdirs() //missing error handler + source.runWith(FileIO.toPath(targetFile)) + } + // #zip-reader + } } }