Skip to content

Commit

Permalink
Use shared compression function
Browse files Browse the repository at this point in the history
  • Loading branch information
pka committed Oct 23, 2024
1 parent 72f5762 commit 945ef23
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 27 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ __async-aws-s3 = ["__async", "dep:aws-sdk-s3"]
aws-sdk-s3 = { version = "1.49.0", optional = true }
async-compression = { version = "0.4", features = ["gzip"] }
bytes = "1"
countio = "0.2.19"
flate2 = "1"
fmmap = { version = "0.3", default-features = false, optional = true }
hilbert_2d = "1"
Expand Down
6 changes: 4 additions & 2 deletions src/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use bytes::{Buf, Bytes};
use varint_rs::{VarintReader, VarintWriter};

use crate::error::PmtError;
use crate::writer::WriteTo;

#[derive(Default, Clone)]
pub struct Directory {
Expand Down Expand Up @@ -93,8 +94,8 @@ impl TryFrom<Bytes> for Directory {
}
}

impl Directory {
pub fn write_to<W: Write>(&self, writer: &mut W) -> io::Result<()> {
impl WriteTo for Directory {
fn write_to<W: Write>(&self, writer: &mut W) -> io::Result<()> {
// Write number of entries
writer.write_usize_varint(self.entries.len())?;

Expand Down Expand Up @@ -154,6 +155,7 @@ mod tests {
use super::Directory;
use crate::header::HEADER_SIZE;
use crate::tests::RASTER_FILE;
use crate::writer::WriteTo;
use crate::Header;

fn read_root_directory(fname: &str) -> Directory {
Expand Down
9 changes: 6 additions & 3 deletions src/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{io, io::Write};
use bytes::{Buf, Bytes};

use crate::error::{PmtError, PmtResult};
use crate::writer::WriteTo;

pub(crate) const MAX_INITIAL_BYTES: usize = 16_384;
pub(crate) const HEADER_SIZE: usize = 127;
Expand Down Expand Up @@ -199,8 +200,8 @@ impl Header {
}
}

impl Header {
pub fn write_to<W: Write>(&self, writer: &mut W) -> io::Result<()> {
impl WriteTo for Header {
fn write_to<W: Write>(&self, writer: &mut W) -> io::Result<()> {
// Write magic number
writer.write_all(V3_MAGIC.as_bytes())?;

Expand Down Expand Up @@ -233,7 +234,8 @@ impl Header {

Ok(())
}

}
impl Header {
#[allow(clippy::cast_possible_truncation)]
fn write_coordinate_part<W: Write>(writer: &mut W, value: f32) -> io::Result<()> {
writer.write_all(&((value * 10_000_000.0) as i32).to_le_bytes())
Expand All @@ -251,6 +253,7 @@ mod tests {

use crate::header::{Header, TileType, HEADER_SIZE};
use crate::tests::{RASTER_FILE, VECTOR_FILE};
use crate::writer::WriteTo;

#[test]
fn read_header() {
Expand Down
83 changes: 61 additions & 22 deletions src/writer.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,67 @@
use std::fs::File;
use std::io::{BufWriter, Seek, Write};

use countio::Counter;
use flate2::write::GzEncoder;

use crate::directory::{DirEntry, Directory};
use crate::error::PmtResult;
use crate::header::{HEADER_SIZE, MAX_INITIAL_BYTES};
use crate::PmtError::UnsupportedCompression;
use crate::{Compression, Header, TileType};

pub struct PmTilesWriter {
out: BufWriter<File>,
out: Counter<BufWriter<File>>,
header: Header,
entries: Vec<DirEntry>,
n_addressed_tiles: u64,
prev_tile_data: Vec<u8>,
}

pub(crate) trait WriteTo {
fn write_to<W: Write>(&self, writer: &mut W) -> std::io::Result<()>;
fn write_compressed_to<W: Write>(
&self,
writer: &mut W,
compression: Compression,
) -> PmtResult<()> {
match compression {
Compression::None => self.write_to(writer)?,
Compression::Gzip => {
let mut encoder = GzEncoder::new(writer, flate2::Compression::default());
self.write_to(&mut encoder)?;
}
v => Err(UnsupportedCompression(v))?,
}
Ok(())
}
fn write_compressed_to_counted<W: Write>(
&self,
writer: &mut Counter<W>,
compression: Compression,
) -> PmtResult<usize> {
let pos = writer.writer_bytes();
self.write_compressed_to(writer, compression)?;
Ok(writer.writer_bytes() - pos)
}
fn compressed_size(&self, compression: Compression) -> PmtResult<usize> {
let mut devnull = Counter::new(std::io::sink());
self.write_compressed_to(&mut devnull, compression)?;
Ok(devnull.writer_bytes())
}
}

impl WriteTo for [u8] {
fn write_to<W: Write>(&self, writer: &mut W) -> std::io::Result<()> {
writer.write_all(self)
}
}

impl PmTilesWriter {
pub fn create(name: &str, tile_type: TileType, metadata: &str) -> PmtResult<Self> {
let file = File::create(name)?;
let mut out = BufWriter::new(file);
let writer = BufWriter::new(file);
let mut out = Counter::new(writer);

// We use the following layout:
// +--------+----------------+----------+-----------+------------------+
Expand All @@ -32,15 +74,10 @@ impl PmTilesWriter {
// Reserve space for header and root directory
out.write_all(&[0u8; MAX_INITIAL_BYTES])?;

// let metadata_length = metadata.len() as u64;
// out.write_all(metadata.as_bytes())?;
let mut metadata_buf = vec![];
{
let mut encoder = GzEncoder::new(&mut metadata_buf, flate2::Compression::default());
encoder.write_all(metadata.as_bytes())?;
}
let metadata_length = metadata_buf.len() as u64;
out.write_all(&metadata_buf)?;
let metadata_length = metadata
.as_bytes()
.write_compressed_to_counted(&mut out, Compression::Gzip)?
as u64;

let header = Header {
version: 3,
Expand Down Expand Up @@ -107,12 +144,13 @@ impl PmTilesWriter {
} else {
let offset = last_entry.offset + u64::from(last_entry.length);
// Write tile
let length = data.len().try_into().expect("TODO: check max");
self.out.write_all(data)?;
let len =
data.write_compressed_to_counted(&mut self.out, self.header.tile_compression)?;
let length = len.try_into().expect("TODO: check max");

self.entries.push(DirEntry {
tile_id,
run_length: 1, // Will be increased if the next tile is the same
run_length: 1, // Will be increased by following identical tiles
offset,
length,
});
Expand All @@ -126,14 +164,18 @@ impl PmTilesWriter {
/// Build root and leaf directories from entries.
/// Leaf directories are written to output.
/// The root directory is returned.
fn build_directories(&self) -> Directory {
fn build_directories(&self) -> PmtResult<Directory> {
let mut root_dir = Directory::default();
for entry in &self.entries {
root_dir.push(entry.clone());
}
// FIXME: check max size of root directory
if root_dir.compressed_size(self.header.internal_compression)?
> MAX_INITIAL_BYTES - HEADER_SIZE
{
// TODO
}
// TODO: Build and write optimized leaf directories
root_dir
Ok(root_dir)
}

#[allow(clippy::missing_panics_doc)]
Expand All @@ -146,13 +188,10 @@ impl PmTilesWriter {
self.header.n_tile_contents = None; //TODO
}
// Write leaf directories and get root directory
let root_dir = self.build_directories();
let root_dir = self.build_directories()?;
// Determine compressed root directory length
let mut root_dir_buf = vec![];
{
let mut encoder = GzEncoder::new(&mut root_dir_buf, flate2::Compression::default());
root_dir.write_to(&mut encoder)?;
}
root_dir.write_compressed_to(&mut root_dir_buf, self.header.internal_compression)?;
self.header.root_length = root_dir_buf.len() as u64;

// Write header and root directory
Expand Down

0 comments on commit 945ef23

Please sign in to comment.