Skip to content

Commit

Permalink
file: unzip introduced (akka#2692)
Browse files Browse the repository at this point in the history
  • Loading branch information
tg44 authored Aug 25, 2021
1 parent 2539487 commit f9214c8
Show file tree
Hide file tree
Showing 7 changed files with 211 additions and 6 deletions.
17 changes: 17 additions & 0 deletions docs/src/main/paradox/file.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ This example can be found in the @ref:[self-contained example documentation sect

## ZIP Archive

### Writing ZIP Archives

The @apidoc[Archive$]
contains flow for compressing multiple files into one ZIP file.

Expand All @@ -182,6 +184,21 @@ Scala
Java
: @@snip [snip](/file/src/test/java/docs/javadsl/ArchiveTest.java) { #sample-zip }

### Reading ZIP archives


@apidoc[Archive.zipReader()](Archive$) reads a file in ZIP format, and emitting the metadata entry and a `Source` for every file in the stream.
It is not needed to emit every file, also multiple files can be emitted in parallel. (Every sub-source will seek into the archive.)

The example below reads the incoming file, and unzip all to the local file system.

Scala
: @@snip [snip](/file/src/test/scala/docs/scaladsl/ArchiveSpec.scala) { #zip-reader }

Java
: @@snip [snip](/file/src/test/java/docs/javadsl/ArchiveTest.java) { #sample-zip-read }


## TAR Archive

### Writing TAR archives
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka.file.impl.archive

import akka.NotUsed
import akka.annotation.InternalApi
import akka.stream.alpakka.file.ZipArchiveMetadata
import akka.stream.{Attributes, Outlet, SourceShape}
import akka.stream.scaladsl.Source
import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler}
import akka.util.ByteString

import java.io.{File, FileInputStream}
import java.util.zip.{ZipEntry, ZipInputStream}

@InternalApi class ZipEntrySource(n: ZipArchiveMetadata, f: File, chunkSize: Int)
extends GraphStage[SourceShape[ByteString]] {
private val out = Outlet[ByteString]("flowOut")
override val shape: SourceShape[ByteString] =
SourceShape(out)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
val zis = new ZipInputStream(new FileInputStream(f))
var entry: ZipEntry = null
val data = new Array[Byte](chunkSize)

def seek() = {
while ({
entry = zis.getNextEntry()
entry != null && entry.getName != n.name
}) {
zis.closeEntry()
}
}

setHandler(
out,
new OutHandler {
override def onPull(): Unit = {
if (entry == null) {
seek()
if (entry == null) {
failStage(new Exception("After a seek the part is not found"))
}
}

val c = zis.read(data, 0, chunkSize)
if (c == -1) {
completeStage()
} else {
push(out, ByteString.fromArray(data, 0, c))
}
}
}
)

override def postStop(): Unit = {
super.postStop()
zis.close()
}
}
}

@InternalApi class ZipSource(f: File, chunkSize: Int)
extends GraphStage[SourceShape[(ZipArchiveMetadata, Source[ByteString, NotUsed])]] {
private val out = Outlet[(ZipArchiveMetadata, Source[ByteString, NotUsed])]("flowOut")
override val shape: SourceShape[(ZipArchiveMetadata, Source[ByteString, NotUsed])] =
SourceShape(out)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
val zis = new ZipInputStream(new FileInputStream(f))

setHandler(
out,
new OutHandler {
override def onPull(): Unit = {
val e = zis.getNextEntry
if (e != null) {
val n = ZipArchiveMetadata(e.getName)
zis.closeEntry()
push(out, n -> Source.fromGraph(new ZipEntrySource(n, f, chunkSize)))
} else {
zis.close()
completeStage()
}
}
}
)

override def postStop(): Unit = {
super.postStop()
zis.close()
}
}
}
19 changes: 17 additions & 2 deletions file/src/main/scala/akka/stream/alpakka/file/javadsl/Archive.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
package akka.stream.alpakka.file.javadsl

import akka.NotUsed
import akka.stream.alpakka.file.{scaladsl, ArchiveMetadata, TarArchiveMetadata}
import akka.stream.alpakka.file.{scaladsl, ArchiveMetadata, TarArchiveMetadata, ZipArchiveMetadata}
import akka.stream.javadsl.Flow
import akka.util.ByteString
import akka.japi.Pair
import akka.stream.alpakka.file.impl.archive.TarReaderStage
import akka.stream.alpakka.file.impl.archive.{TarReaderStage, ZipSource}
import akka.stream.javadsl.Source

import java.io.File

/**
* Java API.
*/
Expand All @@ -26,6 +28,19 @@ object Archive {
.map(func(pair => (pair.first, pair.second.asScala)))
.via(scaladsl.Archive.zip().asJava)

/**
* Flow for reading ZIP files.
*/
def zipReader(file: File, chunkSize: Int): Source[Pair[ZipArchiveMetadata, Source[ByteString, NotUsed]], NotUsed] =
Source
.fromGraph(new ZipSource(file, chunkSize))
.map(func {
case (metadata, source) =>
Pair(metadata, source.asJava)
})
def zipReader(file: File): Source[Pair[ZipArchiveMetadata, Source[ByteString, NotUsed]], NotUsed] =
zipReader(file, 8192)

/**
* Flow for packaging multiple files into one TAR file.
*/
Expand Down
7 changes: 7 additions & 0 deletions file/src/main/scala/akka/stream/alpakka/file/model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ object ArchiveMetadata {
def create(filePath: String): ArchiveMetadata = new ArchiveMetadata(filePath)
}

final case class ZipArchiveMetadata(name: String) {
def getName() = name
}
object ZipArchiveMetadata {
def create(name: String): ZipArchiveMetadata = ZipArchiveMetadata(name)
}

final class TarArchiveMetadata private (
val filePathPrefix: Option[String],
val filePathName: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
package akka.stream.alpakka.file.scaladsl

import akka.NotUsed
import akka.stream.alpakka.file.{ArchiveMetadata, TarArchiveMetadata}
import akka.stream.alpakka.file.impl.archive.{TarArchiveManager, TarReaderStage, ZipArchiveManager}
import akka.stream.alpakka.file.{ArchiveMetadata, TarArchiveMetadata, ZipArchiveMetadata}
import akka.stream.alpakka.file.impl.archive.{TarArchiveManager, TarReaderStage, ZipArchiveManager, ZipSource}
import akka.stream.scaladsl.{Flow, Source}
import akka.util.ByteString

import java.io.File

/**
* Scala API.
*/
Expand All @@ -21,6 +23,14 @@ object Archive {
def zip(): Flow[(ArchiveMetadata, Source[ByteString, Any]), ByteString, NotUsed] =
ZipArchiveManager.zipFlow()

/**
* Flow for reading ZIP files.
*/
def zipReader(file: File, chunkSize: Int): Source[(ZipArchiveMetadata, Source[ByteString, Any]), NotUsed] =
Source.fromGraph(new ZipSource(file, chunkSize))
def zipReader(file: File): Source[(ZipArchiveMetadata, Source[ByteString, Any]), NotUsed] =
Source.fromGraph(new ZipSource(file, 8192))

/**
* Flow for packaging multiple files into one TAR file.
*/
Expand Down
18 changes: 18 additions & 0 deletions file/src/test/java/docs/javadsl/ArchiveTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import akka.stream.Materializer;
import akka.stream.alpakka.file.ArchiveMetadata;
import akka.stream.alpakka.file.TarArchiveMetadata;
import akka.stream.alpakka.file.ZipArchiveMetadata;
import akka.stream.alpakka.file.javadsl.Archive;
import akka.stream.alpakka.file.javadsl.Directory;
import akka.stream.alpakka.testkit.javadsl.LogCapturingJunit4;
Expand Down Expand Up @@ -106,6 +107,23 @@ public void flowShouldCreateZIPArchive() throws Exception {
Map<String, ByteString> unzip = archiveHelper.unzip(resultFileContent);

assertThat(inputFiles, is(unzip));
Path target = Files.createTempDirectory("alpakka-zip-");

// #sample-zip-read
Archive.zipReader(Paths.get("logo.zip").toFile())
.mapAsync(
4,
pair -> {
ZipArchiveMetadata metadata = pair.first();
Path targetFile = target.resolve(metadata.getName());
targetFile.toFile().getParentFile().mkdirs(); // missing error handler
Source<ByteString, NotUsed> fSource = pair.second();
// create the target directory
return fSource
.runWith(FileIO.toPath(targetFile), system)
.thenApply(io -> Done.done());
});
// #sample-zip-read

// cleanup
new File("logo.zip").delete();
Expand Down
43 changes: 41 additions & 2 deletions file/src/test/scala/docs/scaladsl/ArchiveSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
package docs.scaladsl

import java.io._
import java.nio.file.{Path, Paths}

import java.nio.file.{Files, Path, Paths}
import akka.actor.ActorSystem
import akka.stream.alpakka.file.ArchiveMetadata
import akka.stream.alpakka.file.scaladsl.Archive
Expand Down Expand Up @@ -113,6 +112,46 @@ class ArchiveSpec

archiveHelper.unzip(akkaZipped.futureValue).asScala shouldBe inputFiles
}

"unarchive files" in {
val inputFiles = generateInputFiles(5, 100)
val inputStream = filesToStream(inputFiles)
val zipFlow = Archive.zip()

val akkaZipped: Future[ByteString] =
inputStream
.via(zipFlow)
.runWith(Sink.fold(ByteString.empty)(_ ++ _))
// #zip-reader
val zipFile = // ???
// #zip-reader
File.createTempFile("pre", "post")
zipFile.deleteOnExit()

Source.future(akkaZipped).runWith(FileIO.toPath(zipFile.toPath)).futureValue

Archive
.zipReader(zipFile)
.map(f => f._1.name -> f._2.runWith(Sink.fold(ByteString.empty)(_ ++ _)).futureValue)
.runWith(Sink.seq)
.futureValue
.toMap shouldBe inputFiles

// #zip-reader
val target: Path = // ???
// #zip-reader
Files.createTempDirectory("alpakka-zip-")
// #zip-reader
Archive
.zipReader(zipFile)
.mapAsyncUnordered(4) {
case (metadata, source) =>
val targetFile = target.resolve(metadata.name)
targetFile.toFile.getParentFile.mkdirs() //missing error handler
source.runWith(FileIO.toPath(targetFile))
}
// #zip-reader
}
}
}

Expand Down

0 comments on commit f9214c8

Please sign in to comment.