From 72a7c2ace366a2ebd64f6daf078557090dea1aa3 Mon Sep 17 00:00:00 2001 From: Brandon Liu Date: Thu, 21 Sep 2023 21:50:55 +0800 Subject: [PATCH] Bbox cleanup (#82) * better help text for extract bbox arg * remove downloader; punting on stats --- main.go | 11 +--- pmtiles/downloader.go | 80 ------------------------- pmtiles/downloader_test.go | 33 ----------- pmtiles/stats.go | 118 ------------------------------------- 4 files changed, 1 insertion(+), 241 deletions(-) delete mode 100644 pmtiles/downloader.go delete mode 100644 pmtiles/downloader_test.go delete mode 100644 pmtiles/stats.go diff --git a/main.go b/main.go index 2c9201c..73d4c28 100644 --- a/main.go +++ b/main.go @@ -49,17 +49,13 @@ var cli struct { Output string `arg:"" help:"Output archive." type:"path"` Bucket string `help:"Remote bucket of input archive."` Region string `help:"local GeoJSON Polygon or MultiPolygon file for area of interest." type:"existingfile"` - Bbox string `help:"bbox area of interest" type:"string"` + Bbox string `help:"bbox area of interest: min_lon,min_lat,max_lon,max_lat" type:"string"` Maxzoom int8 `default:-1 help:"Maximum zoom level, inclusive."` DownloadThreads int `default:4 help:"Number of download threads."` DryRun bool `help:"Calculate tiles to extract, but don't download them."` Overfetch float32 `default:0.05 help:"What ratio of extra data to download to minimize # requests; 0.2 is 20%"` } `cmd:"" help:"Create an archive from a larger archive for a subset of zoom levels or geographic region."` - Stats struct { - Input string `arg:"" type:"existingfile"` - } `cmd:"" help:"Add a vector tile statistics file (.tilestats.tsv.gz) used for further analysis with DuckDB."` - Verify struct { Input string `arg:"" help:"Input archive." type:"existingfile"` } `cmd:"" help:"Verifies that a local archive is valid."` @@ -130,11 +126,6 @@ func main() { if err != nil { logger.Fatalf("Failed to extract, %v", err) } - case "stats ": - err := pmtiles.Stats(logger, cli.Stats.Input) - if err != nil { - logger.Fatalf("Failed to stats archive, %v", err) - } case "convert ": path := cli.Convert.Input output := cli.Convert.Output diff --git a/pmtiles/downloader.go b/pmtiles/downloader.go deleted file mode 100644 index b5db058..0000000 --- a/pmtiles/downloader.go +++ /dev/null @@ -1,80 +0,0 @@ -package pmtiles - - -type Task struct { - Index int - Rng Range - Result chan TaskResult -} - -type TaskResult struct { - Index int - Blob []byte -} - -// returns a channel of results that exactly match the requested ranges. -func DownloadParts(getter func (Range) []byte, ranges []Range, numThreads int) chan []byte { - intermediate := make(chan TaskResult, 8) - orderedOutput := make(chan []byte, 8) - tasks := make(chan Task, 100) - - lastTask := len(ranges) - 1 - - worker := func (id int, tasks <-chan Task) { - for task := range tasks { - task.Result <- TaskResult{task.Index, getter(task.Rng)} - } - } - - for i := 0; i < numThreads; i++ { - go worker(i, tasks) - } - - - // push into the queue on a separate goroutine - go func () { - for idx, r := range ranges { - tasks <- Task{Index: idx, Rng: r, Result: intermediate} - } - close(tasks) - }() - - // a goroutine that listens on a channel - // and buffers the results, outputting them in exact sorted order - // once it has received all results, it closes the result channel - go func() { - buffer := make(map[int]TaskResult) - nextIndex := 0 - - for i := range intermediate { - buffer[i.Index] = i - - for { - if next, ok := buffer[nextIndex]; ok { - orderedOutput <- next.Blob - delete(buffer, nextIndex) - nextIndex++ - - if (nextIndex == lastTask) { - close(intermediate) - } - } else { - break - } - } - } - - close(orderedOutput) - }() - - return orderedOutput -} - -// an number for overhead: 0.2 is 20% overhead, 1.0 is 100% overhead -// a number of maximum chunk size: n chunks * threads is the max memory usage -// store the smallest gaps in a heap; merge ranges until overhead budget is reached -func DownloadBatchedParts(getter func (Range) []byte, ranges []Range, overhead float32, maxSizeBytes int, numThreads int) chan []byte { - orderedOutput := make(chan []byte, 8) - return orderedOutput -} - diff --git a/pmtiles/downloader_test.go b/pmtiles/downloader_test.go deleted file mode 100644 index 8de3d18..0000000 --- a/pmtiles/downloader_test.go +++ /dev/null @@ -1,33 +0,0 @@ -package pmtiles - -import ( - "github.com/stretchr/testify/assert" - "testing" - "encoding/binary" - "time" -) - -func TestDownloadParts(t *testing.T) { - fakeGet := func(rng Range) []byte { - time.Sleep(time.Millisecond * time.Duration(3 - rng.Offset)) - bytes := make([]byte, 8) - binary.LittleEndian.PutUint64(bytes, rng.Offset) - return bytes - } - - wanted := make([]Range, 0) - wanted = append(wanted, Range{0,1}) - wanted = append(wanted, Range{1,2}) - wanted = append(wanted, Range{2,3}) - - result := DownloadParts(fakeGet, wanted, 3) - - expected := uint64(0) - - for x := range result { - assert.Equal(t, expected, binary.LittleEndian.Uint64(x)) - expected += 1 - } - - assert.Equal(t, expected, uint64(3)) -} diff --git a/pmtiles/stats.go b/pmtiles/stats.go deleted file mode 100644 index cefa678..0000000 --- a/pmtiles/stats.go +++ /dev/null @@ -1,118 +0,0 @@ -package pmtiles - -import ( - "bytes" - "compress/gzip" - "context" - "encoding/csv" - "fmt" - "github.com/RoaringBitmap/roaring/roaring64" - "io" - "log" - "os" - "strconv" - "time" -) - -func Stats(logger *log.Logger, file string) error { - start := time.Now() - ctx := context.Background() - - bucketURL, key, err := NormalizeBucketKey("", "", file) - - if err != nil { - return err - } - - bucket, err := OpenBucket(ctx, bucketURL, "") - - if err != nil { - return fmt.Errorf("Failed to open bucket for %s, %w", bucketURL, err) - } - defer bucket.Close() - - r, err := bucket.NewRangeReader(ctx, key, 0, 16384) - - if err != nil { - return fmt.Errorf("Failed to create range reader for %s, %w", key, err) - } - b, err := io.ReadAll(r) - if err != nil { - return fmt.Errorf("Failed to read %s, %w", key, err) - } - r.Close() - - header, err := deserialize_header(b[0:HEADERV3_LEN_BYTES]) - - if header.TileType != Mvt { - return fmt.Errorf("Stats only works on MVT vector tilesets.") - } - - // Pass 1: through the entire entry set, finding all non-duplicated tiles. - - var CollectEntries func(uint64, uint64, func(EntryV3)) - - CollectEntries = func(dir_offset uint64, dir_length uint64, f func(EntryV3)) { - dirbytes, err := bucket.NewRangeReader(ctx, key, int64(dir_offset), int64(dir_length)) - if err != nil { - panic(fmt.Errorf("I/O error")) - } - defer dirbytes.Close() - b, err = io.ReadAll(dirbytes) - if err != nil { - panic(fmt.Errorf("I/O Error")) - } - - directory := deserialize_entries(bytes.NewBuffer(b)) - for _, entry := range directory { - if entry.RunLength > 0 { - f(entry) - } else { - CollectEntries(header.LeafDirectoryOffset+entry.Offset, uint64(entry.Length), f) - } - } - } - - seen_once := roaring64.New() - seen_twice := roaring64.New() - CollectEntries(header.RootOffset, header.RootLength, func(e EntryV3) { - if seen_once.Contains(e.Offset) { - seen_twice.Add(e.Offset) - } - seen_once.Add(e.Offset) - }) - - seen_once.AndNot(seen_twice) - fmt.Println("Non-duplicate tiles:", seen_once.GetCardinality()) - - // pass 2: decompress and parse tiles in order. - - output, err := os.Create(file + ".stats.tsv.gz") - if err != nil { - return fmt.Errorf("Failed to create output") - } - defer output.Close() - - gzWriter := gzip.NewWriter(output) - defer gzWriter.Close() - - csvWriter := csv.NewWriter(gzWriter) - csvWriter.Comma = '\t' - defer csvWriter.Flush() - if err := csvWriter.Write([]string{"z", "x", "y", "bytes_compressed"}); err != nil { - return fmt.Errorf("Failed to write header to TSV: %v", err) - } - - CollectEntries(header.RootOffset, header.RootLength, func(e EntryV3) { - if seen_once.Contains(e.Offset) { - z, x, y := IdToZxy(e.TileId) - row := []string{strconv.FormatUint(uint64(z), 10), strconv.FormatUint(uint64(x), 10), strconv.FormatUint(uint64(y), 10), strconv.FormatUint(uint64(e.Length), 10)} - if err := csvWriter.Write(row); err != nil { - panic(fmt.Errorf("Failed to write record to TSV: %v", err)) - } - } - }) - - fmt.Printf("Completed stats in %v.\n", time.Since(start)) - return nil -}