Skip to content

Commit

Permalink
address feedback onthegomap#2
Browse files Browse the repository at this point in the history
  • Loading branch information
bbilger committed Aug 15, 2023
1 parent 4e05163 commit 091cfcf
Show file tree
Hide file tree
Showing 14 changed files with 84 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,5 @@ default byte[] getTile(TileCoord coord) {
*/
TileArchiveMetadata metadata();

default boolean supportsMetadata() {
return true;
}

// TODO access archive metadata
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,9 @@ public Arguments applyFallbacks(Arguments arguments) {
}

public enum Format {
MBTILES("mbtiles", false, false),
MBTILES("mbtiles",
false /* TODO mbtiles could support append in the future by using insert statements with an "on conflict"-clause (i.e. upsert) and by creating tables only if they don't exist, yet */,
false),
PMTILES("pmtiles", false, false),
CSV("csv", true, true),
PROTO("proto", true, true),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ public static void writeOutput(FeatureGroup features, WriteableTileArchive outpu
* To emit tiles in order, fork the input queue and send features to both the encoder and writer. The writer
* waits on them to be encoded in the order they were received, and the encoder processes them in parallel.
* One batch might take a long time to process, so make the queues very big to avoid idle encoding CPUs.
*
* Note:
* In the future emitting tiles out order might be especially interesting when tileWriteThreads>1,
* since when multiple threads/files are included there's no order that needs to be preserved.
* So some of the restrictions could be lifted then.
*/
WorkQueue<TileBatch> writerQueue = new WorkQueue<>("archive_writer_queue", queueSize, 1, stats);
encodeBranch = pipeline
Expand All @@ -138,13 +143,11 @@ public static void writeOutput(FeatureGroup features, WriteableTileArchive outpu
writerEnqueuer.accept(batch); // also send immediately to writer
});
writerQueue.close();
// TODO can this restriction be lifted when tileWriteThreads>1 ?
// use only 1 thread since readFeaturesAndBatch needs to be single-threaded
}, 1)
.addBuffer("reader_queue", queueSize)
.sinkTo("encode", processThreads, writer::tileEncoderSink);

// TODO can this restriction be lifted when tileWriteThreads>1 ? or is it already lifted now that we have multiple writer threads?
// the tile writer will wait on the result of each batch to ensure tiles are written in order
writeBranch = pipeline.readFromQueue(writerQueue)
.sinkTo("write", tileWriteThreads, writer::tileWriter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,13 @@ public Map<String, String> getAll() {
* specification</a>
*/
public Metadata set(TileArchiveMetadata tileArchiveMetadata) {

final TileCompression tileCompression = tileArchiveMetadata.tileCompression();
if (tileCompression != null && tileCompression != TileCompression.GZIP) {
LOGGER.warn("will use {} for tile compression, but the mbtiles specification actually requires gzip",
tileCompression.id());
}

var map = new LinkedHashMap<>(tileArchiveMetadata.toMap());

setMetadata(TileArchiveMetadata.FORMAT_KEY, tileArchiveMetadata.format());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,14 @@
* mkfifo /tmp/data/output_transformed.csv
* # prefix hex-data with '\x' for the postgres import
* cat /tmp/data/output_raw.csv | sed -r 's/^([0-9]+,)([0-9]+,)([0-9]+,)(.*)$/\1\2\3\\x\4/' > /tmp/data/output_transformed.csv
* # now run planetiler with the options --append --output=/tmp/data/output.csv --csv_binary_encoding=hex
* # now run planetiler with the options --append --output=/tmp/data/output_raw.csv --csv_binary_encoding=hex
* ...create tile(s) table
* postgres=# \copy tiles(tile_column, tile_row, zoom_level, tile_data) from /tmp/data/output_raw.csv DELIMITER ',' CSV;
* postgres=# \copy tiles(tile_column, tile_row, zoom_level, tile_data) from /tmp/data/output_transformed.csv DELIMITER ',' CSV;
* </pre>
*
* Check {@link WritableStreamArchive} to see how to write to multiple files. This can be used to parallelize uploads.
* Check {@link WriteableStreamArchive} to see how to write to multiple files. This can be used to parallelize uploads.
*/
public final class WriteableCsvArchive extends WritableStreamArchive {
public final class WriteableCsvArchive extends WriteableStreamArchive {

static final String OPTION_COLUMN_SEPARATOR = "column_separator";
static final String OPTION_LINE_SEPARTATOR = "line_separator";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
* Writes JSON-serialized tile data as well as meta data into file(s). The entries are of type
* {@link WriteableJsonStreamArchive.Entry} are separated by newline (by default).
*/
public final class WriteableJsonStreamArchive extends WritableStreamArchive {
public final class WriteableJsonStreamArchive extends WriteableStreamArchive {

private static final Logger LOGGER = LoggerFactory.getLogger(WriteableJsonStreamArchive.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,24 @@
/**
* Writes protobuf-serialized tile data as well as meta data into file(s). The messages are of type
* {@link StreamArchiveProto.Entry} and are length-delimited.
* <p>
* Custom plugins/integrations should prefer to use this format since - given it's binary - it's the fastest to write
* and read, and once setup, it should also be the simplest to use since models and the code to parse it are generated.
* It's also the most stable and straightforward format in regards to schema evolution.
* <p>
* In Java the stream could be read like this:
*
* <pre>
* // note: do not use nio (Files.newInputStream) for pipes
* try (var in = new FileInputStream(...)) {
* StreamArchiveProto.Entry entry;
* while ((entry = StreamArchiveProto.Entry.parseDelimitedFrom(in)) != null) {
* ...
* }
* }
* </pre>
*/
public final class WriteableProtoStreamArchive extends WritableStreamArchive {
public final class WriteableProtoStreamArchive extends WriteableStreamArchive {

private WriteableProtoStreamArchive(Path p, StreamArchiveConfig config) {
super(p, config);
Expand Down Expand Up @@ -76,7 +92,7 @@ private static StreamArchiveProto.Metadata toExportData(TileArchiveMetadata meta
setIfNotNull(metaDataBuilder::setZoom, metadata.zoom());
setIfNotNull(metaDataBuilder::setMinZoom, metadata.minzoom());
setIfNotNull(metaDataBuilder::setMaxZoom, metadata.maxzoom());
StreamArchiveProto.TileCompression tileCompression = switch (metadata.tileCompression()) {
final StreamArchiveProto.TileCompression tileCompression = switch (metadata.tileCompression()) {
case GZIP -> StreamArchiveProto.TileCompression.TILE_COMPRESSION_GZIP;
case NONE -> StreamArchiveProto.TileCompression.TILE_COMPRESSION_NONE;
case UNKNWON -> throw new IllegalArgumentException("should not produce \"UNKNOWN\" compression");
Expand Down Expand Up @@ -117,13 +133,13 @@ private static StreamArchiveProto.CoordinateXY toExportData(CoordinateXY coord)
private static StreamArchiveProto.VectorLayer toExportData(VectorLayer vectorLayer) {
final var builder = StreamArchiveProto.VectorLayer.newBuilder();
builder.setId(vectorLayer.id());
vectorLayer.fields().entrySet().forEach(e -> {
var exportType = switch (e.getValue()) {
vectorLayer.fields().forEach((key, value) -> {
var exportType = switch (value) {
case NUMBER -> StreamArchiveProto.VectorLayer.FieldType.FIELD_TYPE_NUMBER;
case BOOLEAN -> StreamArchiveProto.VectorLayer.FieldType.FIELD_TYPE_BOOLEAN;
case STRING -> StreamArchiveProto.VectorLayer.FieldType.FIELD_TYPE_STRING;
};
builder.putFields(e.getKey(), exportType);
builder.putFields(key, exportType);
});
vectorLayer.description().ifPresent(builder::setDescription);
vectorLayer.minzoom().ifPresent(builder::setMinZoom);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* # now run planetiler with the options --append --output=/tmp/data/output.csv --tile_write_threads=3
* </pre>
*/
abstract class WritableStreamArchive implements WriteableTileArchive {
abstract class WriteableStreamArchive implements WriteableTileArchive {

private final OutputStream primaryOutputStream;
private final OutputStreamSupplier outputStreamFactory;
Expand All @@ -41,14 +41,14 @@ abstract class WritableStreamArchive implements WriteableTileArchive {

private final AtomicInteger tileWriterCounter = new AtomicInteger(0);

private WritableStreamArchive(OutputStreamSupplier outputStreamFactory, StreamArchiveConfig config) {
private WriteableStreamArchive(OutputStreamSupplier outputStreamFactory, StreamArchiveConfig config) {
this.outputStreamFactory = outputStreamFactory;
this.config = config;

this.primaryOutputStream = outputStreamFactory.newOutputStream(0);
}

protected WritableStreamArchive(Path p, StreamArchiveConfig config) {
protected WriteableStreamArchive(Path p, StreamArchiveConfig config) {
this(new FileOutputStreamSupplier(p, config.appendToFile()), config);
}

Expand Down
1 change: 0 additions & 1 deletion planetiler-core/src/main/proto/stream_archive_proto.proto
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ message Envelope {
double max_y = 4;
}

// TODO can we just use int here?
message CoordinateXY {
double x = 1;
double y = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1860,7 +1860,13 @@ public void processFeature(SourceFeature source, FeatureCollector features) {
assertEquals(11, tileMap.size(), "num tiles");
assertEquals(2146, features, "num buildings");

if (db.supportsMetadata()) {
final boolean checkMetadata = switch (format) {
case MBTILES -> true;
case PMTILES -> true;
default -> db.metadata() != null;
};

if (checkMetadata) {
assertSubmap(Map.of(
"planetiler:version", BuildInfo.get().version(),
"planetiler:osm:osmosisreplicationtime", "2021-04-21T20:21:46Z",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.Set;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.locationtech.jts.geom.CoordinateXY;
import org.locationtech.jts.geom.Envelope;

Expand Down Expand Up @@ -223,11 +225,12 @@ void testRoundtripMetadata() throws IOException {
));
}

@Test
void testRoundtripMetadataMinimal() throws IOException {
@ParameterizedTest
@EnumSource(value = TileCompression.class, names = {"GZIP", "NONE"})
void testRoundtripMetadataMinimal(TileCompression tileCompression) throws IOException {
roundTripMetadata(
new TileArchiveMetadata(null, null, null, null, null, null, null, null, null, null, null, null, Map.of(),
TileCompression.GZIP),
tileCompression),
new TileArchiveMetadata(null, null, null, null, null, null,
new Envelope(-180, 180, -85.0511287, 85.0511287),
new CoordinateXY(0, 0),
Expand All @@ -236,7 +239,7 @@ void testRoundtripMetadataMinimal() throws IOException {
15,
null,
Map.of(),
TileCompression.GZIP
tileCompression
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,4 @@ public TileArchiveMetadata metadata() {
return metadata;
}

@Override
public boolean supportsMetadata() {
return metadata != null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,15 @@
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.junit.jupiter.params.provider.CsvSource;

class StreamArchiveUtilsTest {

@ParameterizedTest
@ArgumentsSource(GetEscpacedStringTestArgsProvider.class)
@CsvSource(value = {"a,a", "'a',a", "' ',$ $", "'\\n',$\n$"}, quoteCharacter = '$')
void testGetEscpacedString(String in, String out) {

final Arguments options = Arguments.of(Map.of("key", in));
Expand All @@ -34,21 +31,4 @@ void testConstructIndexedPath(@TempDir Path tempDir) {
assertEquals(tempDir.resolve("base.test" + 1), StreamArchiveUtils.constructIndexedPath(base, 1));
assertEquals(tempDir.resolve("base.test" + 13), StreamArchiveUtils.constructIndexedPath(base, 13));
}

private static class GetEscpacedStringTestArgsProvider implements ArgumentsProvider {

@Override
public Stream<? extends org.junit.jupiter.params.provider.Arguments> provideArguments(ExtensionContext context) {
return Stream.of(
argsOf("a", "a"),
argsOf("'a'", "a"),
argsOf("' '", " "),
argsOf("'\\n'", "\n")
);
}

private static org.junit.jupiter.params.provider.Arguments argsOf(String in, String out) {
return org.junit.jupiter.params.provider.Arguments.of(in, out);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package com.onthegomap.planetiler.stream;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.onthegomap.planetiler.archive.TileArchiveMetadata;
import com.onthegomap.planetiler.archive.TileCompression;
Expand All @@ -10,6 +14,7 @@
import com.onthegomap.planetiler.geo.TileCoord;
import com.onthegomap.planetiler.util.LayerStats;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
Expand Down Expand Up @@ -101,7 +106,7 @@ void testWriteToSingleFile(@TempDir Path tempDir) throws IOException {
archive.finish(minMetadataIn);
}

assertEquals(
assertEqualsDelimitedJson(
"""
{"type":"initialization","metadata":%s}
{"type":"tile","x":0,"y":0,"z":0,"encodedData":"AA=="}
Expand Down Expand Up @@ -142,7 +147,7 @@ void testWriteToMultipleFiles(@TempDir Path tempDir) throws IOException {
archive.finish(maxMetadataIn);
}

assertEquals(
assertEqualsDelimitedJson(
"""
{"type":"initialization","metadata":%s}
{"type":"tile","x":11,"y":12,"z":1,"encodedData":"AA=="}
Expand All @@ -152,15 +157,15 @@ void testWriteToMultipleFiles(@TempDir Path tempDir) throws IOException {
Files.readString(csvFilePrimary)
);

assertEquals(
assertEqualsDelimitedJson(
"""
{"type":"tile","x":31,"y":32,"z":3,"encodedData":"Ag=="}
{"type":"tile","x":41,"y":42,"z":4,"encodedData":"Aw=="}
""",
Files.readString(csvFileSecondary)
);

assertEquals(
assertEqualsDelimitedJson(
"""
{"type":"tile","x":51,"y":52,"z":5,"encodedData":"BA=="}
""",
Expand Down Expand Up @@ -202,6 +207,8 @@ void testRootValueSeparator(@TempDir Path tempDir) throws IOException {
.replace('\n', ' ');

testTileOptions(tempDir, config, expectedJson);

assertFalse(Files.readString(tempDir.resolve("mbtiles.json")).contains("\n"));
}

private void testTileOptions(Path tempDir, StreamArchiveConfig config, String expectedJson) throws IOException {
Expand All @@ -217,9 +224,21 @@ private void testTileOptions(Path tempDir, StreamArchiveConfig config, String ex
archive.finish(maxMetadataIn);
}

assertEquals(expectedJson, Files.readString(csvFile));
assertEqualsDelimitedJson(expectedJson, Files.readString(csvFile));

assertEquals(Set.of(csvFile), Files.list(tempDir).collect(Collectors.toUnmodifiableSet()));
}

private static void assertEqualsDelimitedJson(String expectedJson, String actualJson) {
assertEquals(readDelimitedNodes(expectedJson), readDelimitedNodes(actualJson));
}

private static List<JsonNode> readDelimitedNodes(String json) {
try {
return ImmutableList.copyOf(new ObjectMapper().readerFor(JsonNode.class).readValues(json));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

}

0 comments on commit 091cfcf

Please sign in to comment.