Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic write support #45

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pmtiles"
version = "0.11.0"
version = "0.12.0"
edition = "2021"
authors = ["Luke Seelenbinder <[email protected]>"]
license = "MIT OR Apache-2.0"
Expand Down Expand Up @@ -38,6 +38,8 @@ __async-aws-s3 = ["__async", "dep:aws-sdk-s3"]
aws-sdk-s3 = { version = "1.49.0", optional = true }
async-compression = { version = "0.4", features = ["gzip"] }
bytes = "1"
countio = "0.2.19"
flate2 = "1"
fmmap = { version = "0.3", default-features = false, optional = true }
hilbert_2d = "1"
reqwest = { version = "0.12.4", default-features = false, optional = true }
Expand All @@ -50,9 +52,9 @@ tokio = { version = "1", default-features = false, features = ["io-util"], optio
varint-rs = "2"

[dev-dependencies]
flate2 = "1"
fmmap = { version = "0.3", features = ["tokio-async"] }
reqwest = { version = "0.12.4", features = ["rustls-tls-webpki-roots"] }
tempfile = "3.13.0"
tokio = { version = "1", features = ["test-util", "macros", "rt"] }

[package.metadata.docs.rs]
Expand Down
17 changes: 16 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,31 @@ originally created by Brandon Liu for Protomaps.
- Async `mmap` (Tokio) for local files
- Async `http` and `https` (Reqwuest + Tokio) for URLs
- Async `s3` (Rust-S3 + Tokio) for S3-compatible buckets
- Creating PMTile archives

## Plans & TODOs

- [ ] Documentation and example code
- [ ] Support writing and conversion to and from MBTiles + `x/y/z`
- [ ] Support conversion to and from MBTiles + `x/y/z`
- [ ] Support additional backends (sync `mmap` and `http` at least)
- [ ] Support additional async styles (e.g., `async-std`)

PRs welcome!

## Usage examples

### Writing a PMTiles file
lseelenbinder marked this conversation as resolved.
Show resolved Hide resolved

```rust
use pmtiles::{tile_id, PmTilesWriter, TileType};
use std::fs::File;

let file = File::create("tiles.pmtiles").unwrap();
let mut writer = PmTilesWriter::new(TileType::Mvt).create(file).unwrap();
writer.add_tile(tile_id(0, 0, 0), &[/*...*/]).unwrap();
lseelenbinder marked this conversation as resolved.
Show resolved Hide resolved
writer.finalize().unwrap();
```

## Development
* This project is easier to develop with [just](https://github.com/casey/just#readme), a modern alternative to `make`. Install it with `cargo install just`.
* To get a list of available commands, run `just`.
Expand Down
3 changes: 3 additions & 0 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ test:
cargo test
RUSTDOCFLAGS="-D warnings" cargo doc --no-deps

test-writer:
cargo test --features mmap-async-tokio

# Run all tests and checks
test-all: check fmt clippy

Expand Down
8 changes: 7 additions & 1 deletion src/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ impl<B: AsyncBackend + Sync + Send, C: DirectoryCache + Sync + Send> AsyncPmTile

/// Fetches tile bytes from the archive.
pub async fn get_tile(&self, z: u8, x: u64, y: u64) -> PmtResult<Option<Bytes>> {
let tile_id = tile_id(z, x, y);
self.get_tile_by_id(tile_id(z, x, y)).await
}

pub(crate) async fn get_tile_by_id(&self, tile_id: u64) -> PmtResult<Option<Bytes>> {
let Some(entry) = self.find_tile_entry(tile_id).await? else {
return Ok(None);
};
Expand Down Expand Up @@ -206,6 +209,9 @@ impl<B: AsyncBackend + Sync + Send, C: DirectoryCache + Sync + Send> AsyncPmTile
.read_to_end(&mut decompressed_bytes)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The decompressed_bytes declaration should probably move inside of this now, if we're supporting compression modes where it's not needed.

.await?;
}
Compression::None => {
return Ok(bytes);
}
v => Err(UnsupportedCompression(v))?,
}

Expand Down
92 changes: 85 additions & 7 deletions src/directory.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::fmt::{Debug, Formatter};
use std::{io, io::Write};

use bytes::{Buf, Bytes};
use varint_rs::VarintReader;
use varint_rs::{VarintReader, VarintWriter};

use crate::error::PmtError;
use crate::writer::WriteTo;

#[derive(Clone)]
#[derive(Default, Clone)]
pub struct Directory {
entries: Vec<DirEntry>,
}
Expand All @@ -17,6 +19,20 @@ impl Debug for Directory {
}

impl Directory {
pub(crate) fn with_capacity(capacity: usize) -> Self {
Self {
entries: Vec::with_capacity(capacity),
}
}

pub(crate) fn from_entries(entries: Vec<DirEntry>) -> Self {
Self { entries }
}

pub(crate) fn entries(&self) -> &[DirEntry] {
&self.entries
}

/// Find the directory entry for a given tile ID.
#[must_use]
pub fn find_tile_id(&self, tile_id: u64) -> Option<&DirEntry> {
Expand All @@ -43,6 +59,10 @@ impl Directory {
pub fn get_approx_byte_size(&self) -> usize {
self.entries.capacity() * size_of::<DirEntry>()
}

pub(crate) fn push(&mut self, entry: DirEntry) {
self.entries.push(entry);
}
}

impl TryFrom<Bytes> for Directory {
Expand Down Expand Up @@ -88,6 +108,44 @@ impl TryFrom<Bytes> for Directory {
}
}

impl WriteTo for Directory {
fn write_to<W: Write>(&self, writer: &mut W) -> io::Result<()> {
// Write number of entries
writer.write_usize_varint(self.entries.len())?;

// Write tile IDs
let mut last_tile_id = 0;
for entry in &self.entries {
writer.write_u64_varint(entry.tile_id - last_tile_id)?;
last_tile_id = entry.tile_id;
}

// Write Run Lengths
for entry in &self.entries {
writer.write_u32_varint(entry.run_length)?;
}

// Write Lengths
for entry in &self.entries {
writer.write_u32_varint(entry.length)?;
}

// Write Offsets
let mut last_offset = 0;
for entry in &self.entries {
let offset_to_write = if entry.offset == last_offset + u64::from(entry.length) {
0
} else {
entry.offset + 1
};
writer.write_u64_varint(offset_to_write)?;
last_offset = entry.offset;
}

Ok(())
}
}

#[derive(Clone, Default, Debug)]
pub struct DirEntry {
pub(crate) tile_id: u64,
Expand All @@ -106,16 +164,16 @@ impl DirEntry {
mod tests {
use std::io::{BufReader, Read, Write};

use bytes::BytesMut;
use bytes::{Bytes, BytesMut};

use super::Directory;
use crate::header::HEADER_SIZE;
use crate::tests::RASTER_FILE;
use crate::writer::WriteTo;
use crate::Header;

#[test]
fn read_root_directory() {
let test_file = std::fs::File::open(RASTER_FILE).unwrap();
fn read_root_directory(fname: &str) -> Directory {
let test_file = std::fs::File::open(fname).unwrap();
let mut reader = BufReader::new(test_file);

let mut header_bytes = BytesMut::zeroed(HEADER_SIZE);
Expand All @@ -131,8 +189,12 @@ mod tests {
gunzip.write_all(&directory_bytes).unwrap();
}

let directory = Directory::try_from(decompressed.freeze()).unwrap();
Directory::try_from(decompressed.freeze()).unwrap()
}

#[test]
fn root_directory() {
let directory = read_root_directory(RASTER_FILE);
assert_eq!(directory.entries.len(), 84);
// Note: this is not true for all tiles, just the first few...
for nth in 0..10 {
Expand All @@ -145,4 +207,20 @@ mod tests {
assert_eq!(directory.entries[58].offset, 422_070);
assert_eq!(directory.entries[58].length, 850);
}

#[test]
fn write_directory() {
let root_dir = read_root_directory(RASTER_FILE);
let mut buf = vec![];
root_dir.write_to(&mut buf).unwrap();
let dir = Directory::try_from(Bytes::from(buf)).unwrap();
assert!(root_dir
.entries
.iter()
.enumerate()
.all(|(idx, entry)| dir.entries[idx].tile_id == entry.tile_id
&& dir.entries[idx].run_length == entry.run_length
&& dir.entries[idx].offset == entry.offset
&& dir.entries[idx].length == entry.length));
}
}
3 changes: 3 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub type PmtResult<T> = Result<T, PmtError>;

/// Errors that can occur while reading `PMTiles` files.
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum PmtError {
#[error("Invalid magic number")]
InvalidMagicNumber,
Expand All @@ -24,6 +25,8 @@ pub enum PmtError {
InvalidHeader,
#[error("Invalid metadata")]
InvalidMetadata,
#[error("Directory index element overflow")]
IndexEntryOverflow,
#[error("Invalid metadata UTF-8 encoding: {0}")]
InvalidMetadataUtf8Encoding(#[from] FromUtf8Error),
#[error("Invalid tile type")]
Expand Down
76 changes: 73 additions & 3 deletions src/header.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::num::NonZeroU64;
use std::num::{NonZero, NonZeroU64};
use std::panic::catch_unwind;
use std::{io, io::Write};

use bytes::{Buf, Bytes};

use crate::error::{PmtError, PmtResult};
use crate::writer::WriteTo;

#[cfg(feature = "__async")]
pub(crate) const MAX_INITIAL_BYTES: usize = 16_384;
#[cfg(any(test, feature = "__async"))]
pub(crate) const HEADER_SIZE: usize = 127;

#[allow(dead_code)]
Expand Down Expand Up @@ -200,6 +200,48 @@ impl Header {
}
}

impl WriteTo for Header {
fn write_to<W: Write>(&self, writer: &mut W) -> io::Result<()> {
// Write magic number
writer.write_all(V3_MAGIC.as_bytes())?;

// Write header fields
writer.write_all(&[self.version])?;
writer.write_all(&self.root_offset.to_le_bytes())?;
writer.write_all(&self.root_length.to_le_bytes())?;
writer.write_all(&self.metadata_offset.to_le_bytes())?;
writer.write_all(&self.metadata_length.to_le_bytes())?;
writer.write_all(&self.leaf_offset.to_le_bytes())?;
writer.write_all(&self.leaf_length.to_le_bytes())?;
writer.write_all(&self.data_offset.to_le_bytes())?;
writer.write_all(&self.data_length.to_le_bytes())?;
writer.write_all(&self.n_addressed_tiles.map_or(0, NonZero::get).to_le_bytes())?;
writer.write_all(&self.n_tile_entries.map_or(0, NonZero::get).to_le_bytes())?;
writer.write_all(&self.n_tile_contents.map_or(0, NonZero::get).to_le_bytes())?;
writer.write_all(&[u8::from(self.clustered)])?;
writer.write_all(&[self.internal_compression as u8])?;
writer.write_all(&[self.tile_compression as u8])?;
writer.write_all(&[self.tile_type as u8])?;
writer.write_all(&[self.min_zoom])?;
writer.write_all(&[self.max_zoom])?;
Self::write_coordinate_part(writer, self.min_longitude)?;
Self::write_coordinate_part(writer, self.min_latitude)?;
Self::write_coordinate_part(writer, self.max_longitude)?;
Self::write_coordinate_part(writer, self.max_latitude)?;
writer.write_all(&[self.center_zoom])?;
Self::write_coordinate_part(writer, self.center_longitude)?;
Self::write_coordinate_part(writer, self.center_latitude)?;

Ok(())
}
}
impl Header {
#[allow(clippy::cast_possible_truncation)]
fn write_coordinate_part<W: Write>(writer: &mut W, value: f32) -> io::Result<()> {
writer.write_all(&((value * 10_000_000.0) as i32).to_le_bytes())
}
}

#[cfg(test)]
mod tests {
#![allow(clippy::unreadable_literal, clippy::float_cmp)]
Expand All @@ -211,6 +253,7 @@ mod tests {

use crate::header::{Header, TileType, HEADER_SIZE};
use crate::tests::{RASTER_FILE, VECTOR_FILE};
use crate::writer::WriteTo;

#[test]
fn read_header() {
Expand Down Expand Up @@ -303,4 +346,31 @@ mod tests {
))
);
}

#[test]
fn write_header() {
let mut test = File::open(RASTER_FILE).unwrap();
let mut header_bytes = [0; HEADER_SIZE];
test.read_exact(header_bytes.as_mut_slice()).unwrap();
let header = Header::try_from_bytes(Bytes::copy_from_slice(&header_bytes)).unwrap();

let mut buf = vec![];
header.write_to(&mut buf).unwrap();
let out = Header::try_from_bytes(Bytes::from(buf)).unwrap();
assert_eq!(header.version, out.version);
assert_eq!(header.tile_type, out.tile_type);
assert_eq!(header.n_addressed_tiles, out.n_addressed_tiles);
assert_eq!(header.n_tile_entries, out.n_tile_entries);
assert_eq!(header.n_tile_contents, out.n_tile_contents);
assert_eq!(header.min_zoom, out.min_zoom);
assert_eq!(header.max_zoom, out.max_zoom);
assert_eq!(header.center_zoom, out.center_zoom);
assert_eq!(header.center_latitude, out.center_latitude);
assert_eq!(header.center_longitude, out.center_longitude);
assert_eq!(header.min_latitude, out.min_latitude);
assert_eq!(header.max_latitude, out.max_latitude);
assert_eq!(header.min_longitude, out.min_longitude);
assert_eq!(header.max_longitude, out.max_longitude);
assert_eq!(header.clustered, out.clustered);
}
}
6 changes: 5 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![allow(clippy::doc_markdown)]
#![cfg_attr(feature = "default", doc = include_str!("../README.md"))]
#![forbid(unsafe_code)]

#[cfg(feature = "__async")]
Expand All @@ -15,8 +17,8 @@ pub mod cache;
mod directory;
mod error;
mod header;
#[cfg(feature = "__async")]
mod tile;
mod writer;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you clarify why cfg(__async) was removed here? We really ought to make this PR behind a feature flag

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was glad to reduce the cfg(__) complexity a bit with 72f5762 - it reminded me of the bad old C/++ #ifndef days. IMO, a pmtile library should come with its core functionality (like calculating a tile id) and dependencies (like flate2) by default. I understand the feature flags for the different backend implementations with heavy or specialized dependencies, but not for everything. That said, if the maintainers prefer a feature flag for writing pmtile files, I can give it a try.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we shouldn't go overboard on features. My only concern is that if we have additional dependencies due to write support, it is better to make write - optional, simply because write is a relatively rare usage compare to read. My understanding is that you need compression, which might be a significant extra compilation burden - and that IMO should not be levied on users unless needed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree a feature would be nice.


#[cfg(feature = "aws-s3-async")]
pub use backend_aws_s3::AwsS3Backend;
Expand All @@ -29,6 +31,8 @@ pub use backend_s3::S3Backend;
pub use directory::{DirEntry, Directory};
pub use error::{PmtError, PmtResult};
pub use header::{Compression, Header, TileType};
pub use tile::tile_id;
pub use writer::{PmTilesStreamWriter, PmTilesWriter};
//
// Re-export crates exposed in our API to simplify dependency management
#[cfg(feature = "__async-aws-s3")]
Expand Down
Loading