Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

file: unzip introduced #2692

Merged
merged 2 commits into from
Aug 25, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions docs/src/main/paradox/file.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ This example can be found in the @ref:[self-contained example documentation sect

## ZIP Archive

### Writing ZIP Archives

The @apidoc[Archive$]
contains flow for compressing multiple files into one ZIP file.

Expand All @@ -182,6 +184,21 @@ Scala
Java
: @@snip [snip](/file/src/test/java/docs/javadsl/ArchiveTest.java) { #sample-zip }

### Reading ZIP archives


@apidoc[Archive.zipReader()](Archive$) reads a file in ZIP format, and emitting the metadata entry and a `Source` for every file in the stream.
It is not needed to emit every file, also multiple files can be emitted in parallel. (Every sub-source will seek into the archive.)

The example below reads the incoming file, and unzip all to the local file system.

Scala
: @@snip [snip](/file/src/test/scala/docs/scaladsl/ArchiveSpec.scala) { #zip-reader }

Java
: @@snip [snip](/file/src/test/java/docs/javadsl/ArchiveTest.java) { #sample-zip-read }


## TAR Archive

### Writing TAR archives
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka.file.impl.archive

import akka.NotUsed
import akka.annotation.InternalApi
import akka.stream.{Attributes, Outlet, SourceShape}
import akka.stream.scaladsl.Source
import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler}
import akka.util.ByteString

import java.io.{File, FileInputStream}
import java.util.zip.{ZipEntry, ZipInputStream}

case class ZipArchiveMetadata(name: String)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is user-facing API and should preferably live in akka.stream.alpakka.file instead. Add a getName for Java folks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!


@InternalApi class ZipEntrySource(n: ZipArchiveMetadata, f: File, chunkSize: Int)
extends GraphStage[SourceShape[ByteString]] {
private val out = Outlet[ByteString]("flowOut")
override val shape: SourceShape[ByteString] =
SourceShape(out)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
val zis = new ZipInputStream(new FileInputStream(f))
var entry: ZipEntry = null
val data = new Array[Byte](chunkSize)

def seek() = {
while ({
entry = zis.getNextEntry()
entry != null && entry.getName != n.name
}) {
zis.closeEntry()
}
}

setHandler(
out,
new OutHandler {
override def onPull(): Unit = {
if (entry == null) {
seek()
if (entry == null) {
failStage(new Exception("After a seek the part is not found"))
}
}

val c = zis.read(data, 0, chunkSize)
if (c == -1) {
completeStage()
} else {
push(out, ByteString.fromArray(data, 0, c))
}
}
}
)

override def postStop(): Unit = {
super.postStop()
zis.close()
}
}
}

@InternalApi class ZipSource(f: File, chunkSize: Int)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tg44 I see that ZipSource "only" handles Files. However what about this scenario: In a Play app a user uploads a zip file and I want to directly, without storing it on disk, unpack it and stream its content(s) to e.g. S3 buckets. This could probably be done in a Play Body parser. I guess ZipSource would need to handle Source to achieve that?
What do you think? Would it be somehow possible to adapt your code to support such a scenario?
Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current source works with file seek. I needed to parallel access files. Without File and seeking, you need to consume the "subSources", and you can't use parallelism, which is in my opinion makes the source hard to use, and easy to misuse. (I think the tar flow works like this right now.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I totally understand your approach. I had a look at the tar reading example now, and I think it would be good if we could have the same for zip reading as well (in addition to your implementation). What also bothers me a bit is that for writing zip files as well as for read/writing tar file we work with sources and sinks, but for reading zip files we only allow Files now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ofc. we can implement a tar reading like API too, I just haven't needed that :) I think you can easily merge the tar reading and zip file reading logic. (Sorry, but I'm really short on time nowadays, so I can't contribute that in the next weeks.)

extends GraphStage[SourceShape[(ZipArchiveMetadata, Source[ByteString, NotUsed])]] {
private val out = Outlet[(ZipArchiveMetadata, Source[ByteString, NotUsed])]("flowOut")
override val shape: SourceShape[(ZipArchiveMetadata, Source[ByteString, NotUsed])] =
SourceShape(out)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
val zis = new ZipInputStream(new FileInputStream(f))

setHandler(
out,
new OutHandler {
override def onPull(): Unit = {
val e = zis.getNextEntry
if (e != null) {
val n = ZipArchiveMetadata(e.getName)
zis.closeEntry()
push(out, n -> Source.fromGraph(new ZipEntrySource(n, f, chunkSize)))
} else {
zis.close()
completeStage()
}
}
}
)

override def postStop(): Unit = {
super.postStop()
zis.close()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ import akka.stream.alpakka.file.{scaladsl, ArchiveMetadata, TarArchiveMetadata}
import akka.stream.javadsl.Flow
import akka.util.ByteString
import akka.japi.Pair
import akka.stream.alpakka.file.impl.archive.TarReaderStage
import akka.stream.alpakka.file.impl.archive.{TarReaderStage, ZipArchiveMetadata, ZipSource}
import akka.stream.javadsl.Source

import java.io.File

/**
* Java API.
*/
Expand All @@ -26,6 +28,19 @@ object Archive {
.map(func(pair => (pair.first, pair.second.asScala)))
.via(scaladsl.Archive.zip().asJava)

/**
* Flow for reading ZIP files.
*/
def zipReader(file: File, chunkSize: Int): Source[Pair[ZipArchiveMetadata, Source[ByteString, NotUsed]], NotUsed] =
Source
.fromGraph(new ZipSource(file, chunkSize))
.map(func {
case (metadata, source) =>
Pair(metadata, source.asJava)
})
def zipReader(file: File): Source[Pair[ZipArchiveMetadata, Source[ByteString, NotUsed]], NotUsed] =
zipReader(file, 8192)

/**
* Flow for packaging multiple files into one TAR file.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,18 @@ package akka.stream.alpakka.file.scaladsl

import akka.NotUsed
import akka.stream.alpakka.file.{ArchiveMetadata, TarArchiveMetadata}
import akka.stream.alpakka.file.impl.archive.{TarArchiveManager, TarReaderStage, ZipArchiveManager}
import akka.stream.alpakka.file.impl.archive.{
TarArchiveManager,
TarReaderStage,
ZipArchiveManager,
ZipArchiveMetadata,
ZipSource
}
import akka.stream.scaladsl.{Flow, Source}
import akka.util.ByteString

import java.io.File

/**
* Scala API.
*/
Expand All @@ -21,6 +29,14 @@ object Archive {
def zip(): Flow[(ArchiveMetadata, Source[ByteString, Any]), ByteString, NotUsed] =
ZipArchiveManager.zipFlow()

/**
* Flow for reading ZIP files.
*/
def zipReader(file: File, chunkSize: Int): Source[(ZipArchiveMetadata, Source[ByteString, Any]), NotUsed] =
Source.fromGraph(new ZipSource(file, chunkSize))
def zipReader(file: File): Source[(ZipArchiveMetadata, Source[ByteString, Any]), NotUsed] =
Source.fromGraph(new ZipSource(file, 8192))

/**
* Flow for packaging multiple files into one TAR file.
*/
Expand Down
18 changes: 18 additions & 0 deletions file/src/test/java/docs/javadsl/ArchiveTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import akka.stream.Materializer;
import akka.stream.alpakka.file.ArchiveMetadata;
import akka.stream.alpakka.file.TarArchiveMetadata;
import akka.stream.alpakka.file.impl.archive.ZipArchiveMetadata;
import akka.stream.alpakka.file.javadsl.Archive;
import akka.stream.alpakka.file.javadsl.Directory;
import akka.stream.alpakka.testkit.javadsl.LogCapturingJunit4;
Expand Down Expand Up @@ -106,6 +107,23 @@ public void flowShouldCreateZIPArchive() throws Exception {
Map<String, ByteString> unzip = archiveHelper.unzip(resultFileContent);

assertThat(inputFiles, is(unzip));
Path target = Files.createTempDirectory("alpakka-tar-");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Path target = Files.createTempDirectory("alpakka-tar-");
Path target = Files.createTempDirectory("alpakka-zip-");

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!


// #sample-zip-read
Archive.zipReader(Paths.get("logo.zip").toFile())
.mapAsync(
4,
pair -> {
ZipArchiveMetadata metadata = pair.first();
Path targetFile = target.resolve(metadata.name());
targetFile.toFile().getParentFile().mkdirs(); //missing error handler
Source<ByteString, NotUsed> fSource = pair.second();
// create the target directory
return fSource
.runWith(FileIO.toPath(targetFile), system)
.thenApply(io -> Done.done());
});
// #sample-zip-read

// cleanup
new File("logo.zip").delete();
Expand Down
43 changes: 41 additions & 2 deletions file/src/test/scala/docs/scaladsl/ArchiveSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
package docs.scaladsl

import java.io._
import java.nio.file.{Path, Paths}

import java.nio.file.{Files, Path, Paths}
import akka.actor.ActorSystem
import akka.stream.alpakka.file.ArchiveMetadata
import akka.stream.alpakka.file.scaladsl.Archive
Expand Down Expand Up @@ -113,6 +112,46 @@ class ArchiveSpec

archiveHelper.unzip(akkaZipped.futureValue).asScala shouldBe inputFiles
}

"unarchive files" in {
val inputFiles = generateInputFiles(5, 100)
val inputStream = filesToStream(inputFiles)
val zipFlow = Archive.zip()

val akkaZipped: Future[ByteString] =
inputStream
.via(zipFlow)
.runWith(Sink.fold(ByteString.empty)(_ ++ _))
// #zip-reader
val zipFile = // ???
// #zip-reader
File.createTempFile("pre", "post")
zipFile.deleteOnExit()

Source.future(akkaZipped).runWith(FileIO.toPath(zipFile.toPath)).futureValue

Archive
.zipReader(zipFile)
.map(f => f._1.name -> f._2.runWith(Sink.fold(ByteString.empty)(_ ++ _)).futureValue)
.runWith(Sink.seq)
.futureValue
.toMap shouldBe inputFiles

// #zip-reader
val target: Path = // ???
// #zip-reader
Files.createTempDirectory("alpakka-zip-")
// #zip-reader
Archive
.zipReader(zipFile)
.mapAsyncUnordered(4) {
case (metadata, source) =>
val targetFile = target.resolve(metadata.name)
targetFile.toFile.getParentFile.mkdirs() //missing error handler
source.runWith(FileIO.toPath(targetFile))
}
// #zip-reader
}
}
}

Expand Down