diff --git a/build.sbt b/build.sbt index e297024..963fca8 100644 --- a/build.sbt +++ b/build.sbt @@ -7,6 +7,8 @@ val scalaMacrosVersion = "2.1.1" val slf4jVersion = "1.7.29" val scalatestVersion = "3.0.8" val mockitoVersion = "2.28.2" +val circe = "0.11.1" +val igluCore = "0.5.0" lazy val compilerOptions = Seq( "-target:jvm-1.8", @@ -58,6 +60,9 @@ lazy val root: Project = project "com.spotify" %% "scio-core" % scioVersion, "com.spotify" %% "scio-test" % scioVersion % Test, "org.apache.beam" % "beam-runners-google-cloud-dataflow-java" % beamVersion, + "io.circe" %% "circe-core" % circe, + "io.circe" %% "circe-parser" % circe, + "com.snowplowanalytics" %% "iglu-core-circe" % igluCore, "org.slf4j" % "slf4j-simple" % slf4jVersion, "org.scalatest" %% "scalatest" % scalatestVersion % Test, "org.mockito" % "mockito-core" % mockitoVersion % Test diff --git a/src/main/scala/com/snowplowanalytics/storage/googlecloudstorage/loader/CloudStorageLoader.scala b/src/main/scala/com/snowplowanalytics/storage/googlecloudstorage/loader/CloudStorageLoader.scala index 9b84d37..fbca63f 100644 --- a/src/main/scala/com/snowplowanalytics/storage/googlecloudstorage/loader/CloudStorageLoader.scala +++ b/src/main/scala/com/snowplowanalytics/storage/googlecloudstorage/loader/CloudStorageLoader.scala @@ -14,18 +14,28 @@ */ package com.snowplowanalytics.storage.googlecloudstorage.loader +import io.circe.parser.parse + import com.spotify.scio._ -import org.apache.beam.sdk.io.{Compression, FileBasedSink, TextIO} + +import org.joda.time.Duration + +import org.apache.beam.sdk.coders.StringUtf8Coder +import org.apache.beam.sdk.io.FileIO.Write.FileNaming +import org.apache.beam.sdk.io.{Compression, FileBasedSink, FileIO, TextIO} import org.apache.beam.sdk.io.fs.ResourceId import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO -import org.apache.beam.sdk.options.PipelineOptionsFactory +import org.apache.beam.sdk.options.{ValueProvider, PipelineOptionsFactory} import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider import org.apache.beam.sdk.transforms.SerializableFunction import org.apache.beam.sdk.transforms.windowing.{FixedWindows, Window} -import org.joda.time.Duration + +import com.snowplowanalytics.iglu.core._ +import com.snowplowanalytics.iglu.core.circe.implicits._ /** Dataflow job outputting the content from a Pubsub subscription to a Cloud Storage bucket. */ object CloudStorageLoader { + def main(args: Array[String]): Unit = { PipelineOptionsFactory.register(classOf[Options]) val options = PipelineOptionsFactory @@ -41,37 +51,87 @@ object CloudStorageLoader { def run(options: Options): Unit = { val sc = ScioContext(options) + val outputDirectory = options.getOutputDirectory + val outputFileNamePrefix = options.getOutputFilenamePrefix + val shardTemplate = options.getShardTemplate + val outputFilenameSuffix = options.getOutputFilenameSuffix + val dateFormat = options.getDateFormat + val partitionErrorDirectory = options.getPartitionErrorDirectory() + val inputIO = PubsubIO.readStrings().fromSubscription(options.getInputSubscription) - val outputIO = TextIO.write() + + val input = sc + .customInput("input", inputIO) + .applyTransform( + Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowDuration.toLong))) + ).withName("windowed") + + if (options.getPartitionBySchema()) { + // Partition output according to row type + val outputDynamic = FileIO.writeDynamic[String, String]() + .by(getRowType(_, partitionErrorDirectory).getName) + .via(TextIO.sink()) + .to(outputDirectory.get()) + .withNumShards(options.getNumShards) + .withCompression(getCompression(options.getCompression)) + .withDestinationCoder(StringUtf8Coder.of()) + .withNaming(new SerializableFunction[String, FileNaming] { + // Create FileNaming for partition of window which + // partitioned according to row type + override def apply(rowType: String): FileNaming = + WindowedFilenamePolicy( + None, + outputFileNamePrefix, + shardTemplate, + outputFilenameSuffix, + dateFormat, + Some(rowType) + ) + }) + input.saveAsCustomOutput("output", outputDynamic) + } else { + // Output to same directory without partitioning + // according to row type + val outputIO = TextIO.write() .withWindowedWrites .withNumShards(options.getNumShards) .withWritableByteChannelFactory( FileBasedSink.CompressionType.fromCanonical(getCompression(options.getCompression))) .withTempDirectory(NestedValueProvider.of( - options.getOutputDirectory, + outputDirectory, new SerializableFunction[String, ResourceId] { def apply(input: String): ResourceId = FileBasedSink.convertToFileResourceIfPossible(input) } )) .to(WindowedFilenamePolicy( - options.getOutputDirectory, - options.getOutputFilenamePrefix, - options.getShardTemplate, - options.getOutputFilenameSuffix + Some(outputDirectory), + outputFileNamePrefix, + shardTemplate, + outputFilenameSuffix, + dateFormat, + None )) - - - sc - .customInput("input", inputIO) - .applyTransform( - Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowDuration.toLong))) - ).withName("windowed") - .saveAsCustomOutput("output", outputIO) + input.saveAsCustomOutput("output", outputIO) + } sc.close() } + /** + * Find type of the row according to its schema key + * @param row string to find the type of it + * @return row type of given string + */ + private[loader] def getRowType(row: String, partitionErrorDir: ValueProvider[String]): RowType = + parse(row) match { + case Left(_) => RowType.PartitionError(partitionErrorDir.get) + case Right(json) => SchemaKey.extract(json).fold( + _ => RowType.PartitionError(partitionErrorDir.get), + k => RowType.SelfDescribing(k) + ) + } + /** * Tries to parse a string as a [[Compression]], falls back to uncompressed. * @param compression string to parse diff --git a/src/main/scala/com/snowplowanalytics/storage/googlecloudstorage/loader/Options.scala b/src/main/scala/com/snowplowanalytics/storage/googlecloudstorage/loader/Options.scala index 517e8b0..9cb6598 100644 --- a/src/main/scala/com/snowplowanalytics/storage/googlecloudstorage/loader/Options.scala +++ b/src/main/scala/com/snowplowanalytics/storage/googlecloudstorage/loader/Options.scala @@ -35,6 +35,11 @@ trait Options extends PipelineOptions with StreamingOptions { def getOutputFilenamePrefix: ValueProvider[String] def setOutputFilenamePrefix(value: ValueProvider[String]): Unit + @Description("Date format") + @Default.String("YYYY/MM/dd/HH") + def getDateFormat: ValueProvider[String] + def setDateFormat(value: ValueProvider[String]): Unit + @Description("The shard template which will be part of the filennams") @Default.String("-W-P-SSSSS-of-NNNNN") def getShardTemplate: ValueProvider[String] @@ -59,4 +64,14 @@ trait Options extends PipelineOptions with StreamingOptions { @Default.Integer(0) def getNumShards: Int def setNumShards(value: Int): Unit + + @Description("Partition output according to schema") + @Default.Boolean(false) + def getPartitionBySchema(): Boolean + def setPartitionBySchema(value: Boolean): Unit + + @Description("The directory for rows which gives error during type partition") + @Default.String("") + def getPartitionErrorDirectory(): ValueProvider[String] + def setPartitionErrorDirectory(value: ValueProvider[String]): Unit } diff --git a/src/main/scala/com/snowplowanalytics/storage/googlecloudstorage/loader/RowType.scala b/src/main/scala/com/snowplowanalytics/storage/googlecloudstorage/loader/RowType.scala new file mode 100644 index 0000000..7b0ff67 --- /dev/null +++ b/src/main/scala/com/snowplowanalytics/storage/googlecloudstorage/loader/RowType.scala @@ -0,0 +1,26 @@ +package com.snowplowanalytics.storage.googlecloudstorage.loader + +import com.snowplowanalytics.iglu.core.SchemaKey + +/** Type of row which determined according to schema of self describing data */ +sealed trait RowType extends Product with Serializable { + def getName(): String +} + +object RowType { + + /** Represents cases where row type could not be determined + * since either row is not valid json or it is not self + * describing json + */ + case class PartitionError(errorDir: String) extends RowType { + override def getName(): String = errorDir + } + + /** Represents cases where type of row can be determined successfully + * e.g. does have proper schema key + */ + case class SelfDescribing(schemaKey: SchemaKey) extends RowType { + override def getName(): String = s"${schemaKey.vendor}.${schemaKey.name}" + } +} diff --git a/src/main/scala/com/snowplowanalytics/storage/googlecloudstorage/loader/WindowedFilenamePolicy.scala b/src/main/scala/com/snowplowanalytics/storage/googlecloudstorage/loader/WindowedFilenamePolicy.scala index 531624d..940900f 100644 --- a/src/main/scala/com/snowplowanalytics/storage/googlecloudstorage/loader/WindowedFilenamePolicy.scala +++ b/src/main/scala/com/snowplowanalytics/storage/googlecloudstorage/loader/WindowedFilenamePolicy.scala @@ -14,32 +14,38 @@ */ package com.snowplowanalytics.storage.googlecloudstorage.loader -import org.apache.beam.sdk.io.DefaultFilenamePolicy -import org.apache.beam.sdk.io.FileBasedSink.{FilenamePolicy, OutputFileHints} -import org.apache.beam.sdk.io.FileSystems +import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy +import org.apache.beam.sdk.io.{DefaultFilenamePolicy, FileSystems, Compression} +import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints +import org.apache.beam.sdk.io.FileIO.Write.FileNaming import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions import org.apache.beam.sdk.io.fs.ResourceId -import org.apache.beam.sdk.options.ValueProvider import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider -import org.apache.beam.sdk.transforms.windowing.{BoundedWindow, IntervalWindow, PaneInfo} +import org.apache.beam.sdk.options.ValueProvider +import org.apache.beam.sdk.transforms.windowing.{IntervalWindow, PaneInfo, BoundedWindow} import org.joda.time.DateTime import org.joda.time.format.DateTimeFormat /** * Case class providing a policy on how the output files will be named in the output bucket. * It supports only windowed files. - * @param outputDirectory Cloud Storage directory to output to, must end with a / + * @param outputDirectory Cloud Storage directory to output to * @param outputFilenamePrefix the prefix with which the filenames will be prepended * @param shardTemplate the template controlling how shard numbers will be incorporated into * filenames * @param outputFilenameSuffix the suffix with which the filenames will be appended + * @param dateTemplate the template controlling how date directory will be constructed + * @param rowType row type to create directory which rows with same type to put under */ final case class WindowedFilenamePolicy( - outputDirectory: ValueProvider[String], + outputDirectory: Option[ValueProvider[String]], outputFilenamePrefix: ValueProvider[String], shardTemplate: ValueProvider[String], - outputFilenameSuffix: ValueProvider[String] -) extends FilenamePolicy { + outputFilenameSuffix: ValueProvider[String], + dateTemplate: ValueProvider[String], + rowType: Option[String] +) extends FilenamePolicy with FileNaming { + /** Generates a filename from window information, fill possible date templates. */ override def windowedFilename( shardNumber: Int, @@ -48,8 +54,13 @@ final case class WindowedFilenamePolicy( paneInfo: PaneInfo, outputFileHints: OutputFileHints ): ResourceId = { - val outputFile = resolveWithDateTemplates(outputDirectory, window) - .resolve(outputFilenamePrefix.get, StandardResolveOptions.RESOLVE_FILE) + val outputFile = resolveWithDateTemplates( + outputDirectory.map(_.get), + rowType, + window, + dateTemplate.get + ).resolve(outputFilenamePrefix.get, StandardResolveOptions.RESOLVE_FILE) + val policy = DefaultFilenamePolicy.fromStandardParameters( StaticValueProvider.of(outputFile), shardTemplate.get, @@ -67,23 +78,44 @@ final case class WindowedFilenamePolicy( ): ResourceId = throw new UnsupportedOperationException("This policy only supports windowed files") + override def getFilename( + window: BoundedWindow, + pane: PaneInfo, + numShards: Int, + shardIndex: Int, + compression: Compression + ): String = + windowedFilename(shardIndex, numShards, window, pane, new OutputFileHints { + override def getMimeType: String = null + override def getSuggestedFilenameSuffix: String = "" + }).toString.stripPrefix("/") ++ compression.getSuggestedSuffix + /** * Fill the date templates with actual time information from the window we are in. - * @param outputDirectory directory possibly containing date templates + * @param outputDirectory Cloud Storage directory to output to + * @param rowType row type to create directory which rows with same type to put under * @param window time window we are currently in + * @param dateTemplate the template controlling how date directory will be constructed * @return a resource id with the date templated in */ private def resolveWithDateTemplates( - outputDirectory: ValueProvider[String], - window: BoundedWindow + outputDirectory: Option[String], + rowType: Option[String], + window: BoundedWindow, + dateTemplate: String ): ResourceId = { - val outputDir = FileSystems.matchNewResource(outputDirectory.get, isDirectory = true) - + val outputDir = (outputDirectory, rowType) match { + case (Some(o), Some(t)) => FileSystems.matchNewResource(o, isDirectory = true) + .resolve(t, StandardResolveOptions.RESOLVE_DIRECTORY) + case (Some(o), None) => FileSystems.matchNewResource(o, isDirectory = true) + case (None, Some(t)) => FileSystems.matchNewResource(t, isDirectory = true) + case (None, None) => FileSystems.matchNewResource("", isDirectory = true) + } window match { case w: IntervalWindow => val windowEndTime = w.end.toDateTime - val outputPath = dateFormat(windowEndTime, outputDir.toString) - FileSystems.matchNewResource(outputPath, isDirectory = true) + val dateStr = dateFormat(windowEndTime, dateTemplate) + outputDir.resolve(dateStr, StandardResolveOptions.RESOLVE_DIRECTORY) case _ => outputDir } } @@ -102,17 +134,3 @@ final case class WindowedFilenamePolicy( } .fold(identity[String] _)(_ andThen _)(template) } - -object WindowedFilenamePolicy { - def apply( - outputDirectory: String, - outputFilenamePrefix: String, - shardTemplate: String, - outputFilenameSuffix: String - ): WindowedFilenamePolicy = WindowedFilenamePolicy( - StaticValueProvider.of(outputDirectory), - StaticValueProvider.of(outputFilenamePrefix), - StaticValueProvider.of(shardTemplate), - StaticValueProvider.of(outputFilenameSuffix) - ) -} diff --git a/src/test/scala/com/snowplowanalytics/storage/googlecloudstorage/loader/CloudStorageLoaderSpec.scala b/src/test/scala/com/snowplowanalytics/storage/googlecloudstorage/loader/CloudStorageLoaderSpec.scala index fc1c0eb..504b33a 100644 --- a/src/test/scala/com/snowplowanalytics/storage/googlecloudstorage/loader/CloudStorageLoaderSpec.scala +++ b/src/test/scala/com/snowplowanalytics/storage/googlecloudstorage/loader/CloudStorageLoaderSpec.scala @@ -16,16 +16,50 @@ package com.snowplowanalytics.storage.googlecloudstorage.loader import com.spotify.scio.io.CustomIO import com.spotify.scio.testing._ +import com.spotify.scio.io.CustomIO +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider class CloudStorageLoaderSpec extends PipelineSpec { val expected = (1 to 10).map(_.toString) + val errorDir = "err_dir" "CloudStorageLoader" should "output a file" in { val sub = "projects/project/subscriptions/sub" JobTest[CloudStorageLoader.type] - .args(s"--inputSubscription=${sub}", "--outputDirectory=gs://out-dir/") + .args(s"--inputSubscription=${sub}", "--outputDirectory=gs://out-dir/", "--partitionBySchema=true") .input(CustomIO[String]("input"), expected) .output(CustomIO[String]("output"))(_ should containInAnyOrder(expected)) .run() } + + "getRowType" should "return parsing error when given json is not valid" in { + val invalidJson = + """ + |{ + | "key": "value + |} + """.stripMargin + CloudStorageLoader.getRowType(invalidJson, StaticValueProvider.of(errorDir)) should equal(RowType.PartitionError(errorDir)) + } + + "getRowType" should "return NonSelfDescribing as RowType when given json is not self describing json" in { + val nonSelfDescribingJson = + """ + |{ + | "key": "value" + |} + """.stripMargin + CloudStorageLoader.getRowType(nonSelfDescribingJson, StaticValueProvider.of(errorDir)) should equal(RowType.PartitionError(errorDir)) + } + + "getRowType" should "return SelfDescribing as RowType when given json is self describing json" in { + val json = + """ + |{ + | "schema": "iglu:com.acme2/example1/jsonschema/2-0-1", + | "data": "data3" + |} + """.stripMargin + CloudStorageLoader.getRowType(json, StaticValueProvider.of(errorDir)).getName should equal("com.acme2.example1") + } } diff --git a/src/test/scala/com/snowplowanalytics/storage/googlecloudstorage/loader/WindowedFilenamePolicySpec.scala b/src/test/scala/com/snowplowanalytics/storage/googlecloudstorage/loader/WindowedFilenamePolicySpec.scala index ad5c906..520be73 100644 --- a/src/test/scala/com/snowplowanalytics/storage/googlecloudstorage/loader/WindowedFilenamePolicySpec.scala +++ b/src/test/scala/com/snowplowanalytics/storage/googlecloudstorage/loader/WindowedFilenamePolicySpec.scala @@ -18,12 +18,13 @@ import java.io.File import java.nio.file.Files import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints -import org.apache.beam.sdk.io.LocalResources -import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions -import org.apache.beam.sdk.transforms.windowing.{BoundedWindow, IntervalWindow, PaneInfo} +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider +import org.apache.beam.sdk.transforms.windowing.{IntervalWindow, PaneInfo, BoundedWindow} + import org.joda.time.DateTime import org.mockito.Mockito.when + import org.scalatest.FreeSpec import org.scalatest.Matchers._ import org.scalatestplus.mockito.MockitoSugar @@ -35,41 +36,123 @@ class WindowedFilenamePolicySpec extends FreeSpec with MockitoSugar { } "the WindowedFilenamePolicy" - { - "make a windowedFilename function available" - { + "makes a windowedFilename function available" - { "which produces a windowed filename" in { - val outputDirectory = LocalResources.fromFile(tempDir(), isDirectory = true) - .resolve("WindowedFilenamePolicy", StandardResolveOptions.RESOLVE_DIRECTORY) + val outputDirectoryStr = "outputDirectory" + val filenamePrefixStr = "file-prefix" + val filenameSuffixStr = ".pdf" + val rowTypeStr = "rowType" + val outputDirectory = Some(StaticValueProvider.of(outputDirectoryStr)) val window = mock[BoundedWindow] val paneInfo = PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 0, 0) - val policy = WindowedFilenamePolicy(outputDirectory.toString, "out", "-SSS-NNN", ".txt") + val policy = WindowedFilenamePolicy( + outputDirectory, + StaticValueProvider.of(filenamePrefixStr), + StaticValueProvider.of("-SSS-NNN"), + StaticValueProvider.of(filenameSuffixStr), + StaticValueProvider.of("YYYY/MM/dd/HH"), + Some(rowTypeStr) + ) val filename = policy.windowedFilename(1, 1, window, paneInfo, TestOutputFileHints) - filename.getFilename shouldEqual "out-001-001.txt" + filename.toString should endWith(s"/$outputDirectoryStr/$rowTypeStr/$filenamePrefixStr-001-001$filenameSuffixStr") } - "which produces a dynamic filename" in { - val outputDirectory = LocalResources.fromFile(tempDir(), isDirectory = true) - .resolve("YYYY/MM/dd/HH", StandardResolveOptions.RESOLVE_DIRECTORY) - val paneInfo = PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 0, 0) + "which produces a correct file name with interval window" in { + val outputDirectoryStr = "outputDirectory" + val filenamePrefixStr = "file-prefix" + val filenameSuffixStr = ".pdf" + val rowTypeStr = "rowType" + val outputDirectory = Some(StaticValueProvider.of(outputDirectoryStr)) val window = mock[IntervalWindow] - val windowBegin = new DateTime(2018, 1, 8, 10, 55, 0).toInstant + val windowBegin = new DateTime(2018, 1, 8, 10, 53, 0).toInstant val windowEnd = new DateTime(2018, 1, 8, 10, 56, 0).toInstant when(window.maxTimestamp).thenReturn(windowEnd) when(window.start).thenReturn(windowBegin) when(window.end).thenReturn(windowEnd) - val policy = WindowedFilenamePolicy(outputDirectory.toString, "out", "-SSS-NNN", ".txt") + val paneInfo = PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 0, 0) + val policy = WindowedFilenamePolicy( + outputDirectory, + StaticValueProvider.of(filenamePrefixStr), + StaticValueProvider.of("-W-P-SSS-NNN"), + StaticValueProvider.of(filenameSuffixStr), + StaticValueProvider.of("YYYY/dd/MM/HH"), + Some(rowTypeStr) + ) + + val filename = policy.windowedFilename(1, 1, window, paneInfo, TestOutputFileHints) + + filename.toString should endWith(s"/$outputDirectoryStr/$rowTypeStr/2018/08/01/10/$filenamePrefixStr-$windowBegin-$windowEnd-pane-0-last-001-001$filenameSuffixStr") + } + "which produces a correct file name when given output directory is None" in { + val filenamePrefixStr = "file-prefix" + val filenameSuffixStr = ".pdf" + val rowTypeStr = "rowType" + val window = mock[BoundedWindow] + val paneInfo = PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 0, 0) + val policy = WindowedFilenamePolicy( + None, + StaticValueProvider.of(filenamePrefixStr), + StaticValueProvider.of("-SSS-NNN"), + StaticValueProvider.of(filenameSuffixStr), + StaticValueProvider.of("YYYY/MM/dd/HH"), + Some(rowTypeStr) + ) + + val filename = policy.windowedFilename(1, 1, window, paneInfo, TestOutputFileHints) + + filename.toString should endWith(s"/$rowTypeStr/$filenamePrefixStr-001-001$filenameSuffixStr") + } + "which produces a correct file name when both given type prefix is None" in { + val outputDirectoryStr = "outputDirectory" + val filenamePrefixStr = "file-prefix" + val filenameSuffixStr = ".pdf" + val outputDirectory = Some(StaticValueProvider.of(outputDirectoryStr)) + val window = mock[BoundedWindow] + val paneInfo = PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 0, 0) + val policy = WindowedFilenamePolicy( + outputDirectory, + StaticValueProvider.of(filenamePrefixStr), + StaticValueProvider.of("-SSS-NNN"), + StaticValueProvider.of(filenameSuffixStr), + StaticValueProvider.of("YYYY/MM/dd/HH"), + None + ) + + val filename = policy.windowedFilename(1, 1, window, paneInfo, TestOutputFileHints) + + filename.toString should endWith(s"/$outputDirectoryStr/$filenamePrefixStr-001-001$filenameSuffixStr") + } + "which produces a correct file name when both given output directory and type prefix is None" in { + val filenamePrefixStr = "file-prefix" + val filenameSuffixStr = ".pdf" + val window = mock[BoundedWindow] + val paneInfo = PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 0, 0) + val policy = WindowedFilenamePolicy( + None, + StaticValueProvider.of(filenamePrefixStr), + StaticValueProvider.of("-SSS-NNN"), + StaticValueProvider.of(filenameSuffixStr), + StaticValueProvider.of("YYYY/MM/dd/HH"), + None + ) val filename = policy.windowedFilename(1, 1, window, paneInfo, TestOutputFileHints) - filename.getCurrentDirectory.toString should endWith("2018/01/08/10/") - filename.getFilename shouldEqual "out-001-001.txt" + filename.toString should endWith(s"/$filenamePrefixStr-001-001$filenameSuffixStr") } } - "make a unwindowedFilename function available" - { + "makes a unwindowedFilename function available" - { "which throws an unsupported operation" in { - val outputDirectory = LocalResources.fromFile(tempDir(), isDirectory = true) - .resolve("WindowedFilenamePolicy", StandardResolveOptions.RESOLVE_DIRECTORY) - val policy = WindowedFilenamePolicy(outputDirectory.toString, "out", "-SSS-NNN", ".txt") + val outputDirectory = Some(StaticValueProvider.of("outputDirectory")) + val policy = WindowedFilenamePolicy( + outputDirectory, + StaticValueProvider.of("out"), + StaticValueProvider.of("-SSS-NNN"), + StaticValueProvider.of(".txt"), + StaticValueProvider.of("YYYY/MM/dd/HH"), + Some("typePrefix") + ) an [UnsupportedOperationException] should be thrownBy policy.unwindowedFilename(1, 1, TestOutputFileHints)