Skip to content

Commit

Permalink
add hashmap-based directory cache traits
Browse files Browse the repository at this point in the history
  • Loading branch information
nyurik committed Nov 9, 2023
1 parent 2cb2cfe commit b32fe19
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 17 deletions.
74 changes: 62 additions & 12 deletions src/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ use reqwest::{Client, IntoUrl};
#[cfg(any(feature = "http-async", feature = "mmap-async-tokio"))]
use tokio::io::AsyncReadExt;

use crate::cache::SearchResult;
#[cfg(any(feature = "http-async", feature = "mmap-async-tokio"))]
use crate::cache::{DirectoryCache, NoCache};
use crate::directory::{Directory, Entry};
use crate::error::Error;
use crate::header::{HEADER_SIZE, MAX_INITIAL_BYTES};
Expand All @@ -19,17 +22,27 @@ use crate::mmap::MmapBackend;
use crate::tile::tile_id;
use crate::{Compression, Header};

pub struct AsyncPmTilesReader<B> {
pub struct AsyncPmTilesReader<B, C = NoCache> {
backend: B,
cache: C,
header: Header,
root_directory: Directory,
}

impl<B: AsyncBackend + Sync + Send> AsyncPmTilesReader<B> {
/// Creates a new reader from a specified source and validates the provided PMTiles archive is valid.
impl<B: AsyncBackend + Sync + Send> AsyncPmTilesReader<B, NoCache> {
/// Creates a new cached reader from a specified source and validates the provided PMTiles archive is valid.
///
/// Note: Prefer using new_with_* methods.
pub async fn try_from_source(backend: B) -> Result<Self, Error> {
Self::try_from_cached_source(backend, NoCache).await
}
}

impl<B: AsyncBackend + Sync + Send, C: DirectoryCache + Sync + Send> AsyncPmTilesReader<B, C> {
/// Creates a new reader from a specified source and validates the provided PMTiles archive is valid.
///
/// Note: Prefer using new_with_* methods.
pub async fn try_from_cached_source(backend: B, cache: C) -> Result<Self, Error> {
// Read the first 127 and up to 16,384 bytes to ensure we can initialize the header and root directory.
let mut initial_bytes = backend.read(0, MAX_INITIAL_BYTES).await?;
if initial_bytes.len() < HEADER_SIZE {
Expand All @@ -47,11 +60,14 @@ impl<B: AsyncBackend + Sync + Send> AsyncPmTilesReader<B> {

Ok(Self {
backend,
cache,
header,
root_directory,
})
}
}

impl<B: AsyncBackend + Sync + Send, C: DirectoryCache + Sync + Send> AsyncPmTilesReader<B, C> {
/// Fetches tile bytes from the archive.
pub async fn get_tile(&self, z: u8, x: u64, y: u64) -> Option<Bytes> {
let tile_id = tile_id(z, x, y);
Expand Down Expand Up @@ -137,11 +153,21 @@ impl<B: AsyncBackend + Sync + Send> AsyncPmTilesReader<B> {
// the recursion is done as two functions because it is a bit cleaner,
// and it allows directory to be cached later without cloning it first.
let offset = (self.header.leaf_offset + entry.offset) as _;
let length = entry.length as _;
let dir = self.read_directory(offset, length).await.ok()?;
let entry = dir.find_tile_id(tile_id);

if let Some(entry) = entry {
let entry = match self.cache.get_dir_entry(offset, tile_id) {
SearchResult::NotCached => {
// Cache miss - read from backend
let length = entry.length as _;
let dir = self.read_directory(offset, length).await.ok()?;
let entry = dir.find_tile_id(tile_id).cloned();
self.cache.insert_dir(offset, dir);
entry
}
SearchResult::NotFound => None,
SearchResult::Found(entry) => Some(entry),
};

if let Some(ref entry) = entry {
if entry.is_leaf() {
return if depth <= 4 {
self.find_entry_rec(tile_id, entry, depth + 1).await
Expand All @@ -151,7 +177,7 @@ impl<B: AsyncBackend + Sync + Send> AsyncPmTilesReader<B> {
}
}

entry.cloned()
entry
}

async fn read_directory(&self, offset: usize, length: usize) -> Result<Directory, Error> {
Expand Down Expand Up @@ -183,26 +209,50 @@ impl<B: AsyncBackend + Sync + Send> AsyncPmTilesReader<B> {
}

#[cfg(feature = "http-async")]
impl AsyncPmTilesReader<HttpBackend> {
impl AsyncPmTilesReader<HttpBackend, NoCache> {
/// Creates a new PMTiles reader from a URL using the Reqwest backend.
///
/// Fails if [url] does not exist or is an invalid archive. (Note: HTTP requests are made to validate it.)
pub async fn new_with_url<U: IntoUrl>(client: Client, url: U) -> Result<Self, Error> {
Self::new_with_cached_url(client, url, NoCache).await
}
}

#[cfg(feature = "http-async")]
impl<C: DirectoryCache + Sync + Send> AsyncPmTilesReader<HttpBackend, C> {
/// Creates a new PMTiles reader with cache from a URL using the Reqwest backend.
///
/// Fails if [url] does not exist or is an invalid archive. (Note: HTTP requests are made to validate it.)
pub async fn new_with_cached_url<U: IntoUrl>(
client: Client,
url: U,
cache: C,
) -> Result<Self, Error> {
let backend = HttpBackend::try_from(client, url)?;

Self::try_from_source(backend).await
Self::try_from_cached_source(backend, cache).await
}
}

#[cfg(feature = "mmap-async-tokio")]
impl AsyncPmTilesReader<MmapBackend> {
impl AsyncPmTilesReader<MmapBackend, NoCache> {
/// Creates a new PMTiles reader from a file path using the async mmap backend.
///
/// Fails if [p] does not exist or is an invalid archive.
pub async fn new_with_path<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
Self::new_with_cached_path(path, NoCache).await
}
}

#[cfg(feature = "mmap-async-tokio")]
impl<C: DirectoryCache + Sync + Send> AsyncPmTilesReader<MmapBackend, C> {
/// Creates a new cached PMTiles reader from a file path using the async mmap backend.
///
/// Fails if [p] does not exist or is an invalid archive.
pub async fn new_with_cached_path<P: AsRef<Path>>(path: P, cache: C) -> Result<Self, Error> {
let backend = MmapBackend::try_from(path).await?;

Self::try_from_source(backend).await
Self::try_from_cached_source(backend, cache).await
}
}

Expand Down
60 changes: 60 additions & 0 deletions src/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

use crate::directory::{Directory, Entry};

pub enum SearchResult {
NotCached,
NotFound,
Found(Entry),
}

impl From<Option<&Entry>> for SearchResult {
fn from(entry: Option<&Entry>) -> Self {
match entry {
Some(entry) => SearchResult::Found(entry.clone()),
None => SearchResult::NotFound,
}
}
}

/// A cache for PMTiles directories.
pub trait DirectoryCache {
/// Get a directory from the cache, using the offset as a key.
fn get_dir_entry(&self, offset: usize, tile_id: u64) -> SearchResult;

/// Insert a directory into the cache, using the offset as a key.
/// Note that cache must be internally mutable.
fn insert_dir(&self, offset: usize, directory: Directory);
}

pub struct NoCache;

impl DirectoryCache for NoCache {
#[inline]
fn get_dir_entry(&self, _offset: usize, _tile_id: u64) -> SearchResult {
SearchResult::NotCached
}

#[inline]
fn insert_dir(&self, _offset: usize, _directory: Directory) {}
}

/// A simple HashMap-based implementation of a PMTiles directory cache.
#[derive(Default)]
pub struct HashMapCache {
pub cache: Arc<Mutex<HashMap<usize, Directory>>>,
}

impl DirectoryCache for HashMapCache {
fn get_dir_entry(&self, offset: usize, tile_id: u64) -> SearchResult {
if let Some(dir) = self.cache.lock().unwrap().get(&offset) {
return dir.find_tile_id(tile_id).into();
}
SearchResult::NotCached
}

fn insert_dir(&self, offset: usize, directory: Directory) {
self.cache.lock().unwrap().insert(offset, directory);
}
}
6 changes: 3 additions & 3 deletions src/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use varint_rs::VarintReader;

use crate::error::Error;

pub(crate) struct Directory {
pub struct Directory {
entries: Vec<Entry>,
}

Expand Down Expand Up @@ -81,7 +81,7 @@ impl TryFrom<Bytes> for Directory {
}

#[derive(Clone, Default, Debug)]
pub(crate) struct Entry {
pub struct Entry {
pub(crate) tile_id: u64,
pub(crate) offset: u64,
pub(crate) length: u32,
Expand All @@ -90,7 +90,7 @@ pub(crate) struct Entry {

#[cfg(any(feature = "http-async", feature = "mmap-async-tokio"))]
impl Entry {
pub fn is_leaf(&self) -> bool {
pub(crate) fn is_leaf(&self) -> bool {
self.run_length == 0
}
}
Expand Down
10 changes: 8 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
#![forbid(unsafe_code)]

mod tile;

mod header;
pub use crate::header::{Compression, Header, TileType};

mod directory;
pub use directory::{Directory, Entry};

pub mod error;
mod header;

#[cfg(feature = "http-async")]
pub mod http;
Expand All @@ -14,7 +18,9 @@ pub mod mmap;

#[cfg(any(feature = "http-async", feature = "mmap-async-tokio"))]
pub mod async_reader;
pub mod tile;

#[cfg(any(feature = "http-async", feature = "mmap-async-tokio"))]
pub mod cache;

#[cfg(test)]
mod tests {
Expand Down

0 comments on commit b32fe19

Please sign in to comment.