Skip to content

Commit

Permalink
Directory Caching trait (#24)
Browse files Browse the repository at this point in the history
A cache implementation which adds a new `DirectoryCache` trait and a few helpers. `Directory` and `Entry` are now public, but neither have any public functions except for `find_tile_id` which cache should use while locking its internal state if needed.
  • Loading branch information
nyurik authored Nov 10, 2023
1 parent ef1aee2 commit e7f9444
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 25 deletions.
76 changes: 62 additions & 14 deletions src/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use reqwest::{Client, IntoUrl};
#[cfg(any(feature = "http-async", feature = "mmap-async-tokio"))]
use tokio::io::AsyncReadExt;

use crate::directory::{Directory, Entry};
use crate::cache::DirCacheResult;
#[cfg(any(feature = "http-async", feature = "mmap-async-tokio"))]
use crate::cache::{DirectoryCache, NoCache};
use crate::directory::{DirEntry, Directory};
use crate::error::PmtError;
use crate::header::{HEADER_SIZE, MAX_INITIAL_BYTES};
#[cfg(feature = "http-async")]
Expand All @@ -19,17 +22,27 @@ use crate::mmap::MmapBackend;
use crate::tile::tile_id;
use crate::{Compression, Header};

pub struct AsyncPmTilesReader<B> {
pub struct AsyncPmTilesReader<B, C = NoCache> {
backend: B,
cache: C,
header: Header,
root_directory: Directory,
}

impl<B: AsyncBackend + Sync + Send> AsyncPmTilesReader<B> {
impl<B: AsyncBackend + Sync + Send> AsyncPmTilesReader<B, NoCache> {
/// Creates a new reader from a specified source and validates the provided PMTiles archive is valid.
///
/// Note: Prefer using new_with_* methods.
pub async fn try_from_source(backend: B) -> Result<Self, PmtError> {
Self::try_from_cached_source(backend, NoCache).await
}
}

impl<B: AsyncBackend + Sync + Send, C: DirectoryCache + Sync + Send> AsyncPmTilesReader<B, C> {
/// Creates a new cached reader from a specified source and validates the provided PMTiles archive is valid.
///
/// Note: Prefer using new_with_* methods.
pub async fn try_from_cached_source(backend: B, cache: C) -> Result<Self, PmtError> {
// Read the first 127 and up to 16,384 bytes to ensure we can initialize the header and root directory.
let mut initial_bytes = backend.read(0, MAX_INITIAL_BYTES).await?;
if initial_bytes.len() < HEADER_SIZE {
Expand All @@ -47,6 +60,7 @@ impl<B: AsyncBackend + Sync + Send> AsyncPmTilesReader<B> {

Ok(Self {
backend,
cache,
header,
root_directory,
})
Expand Down Expand Up @@ -130,7 +144,7 @@ impl<B: AsyncBackend + Sync + Send> AsyncPmTilesReader<B> {
}

/// Recursively locates a tile in the archive.
async fn find_tile_entry(&self, tile_id: u64) -> Option<Entry> {
async fn find_tile_entry(&self, tile_id: u64) -> Option<DirEntry> {
let entry = self.root_directory.find_tile_id(tile_id);
if let Some(entry) = entry {
if entry.is_leaf() {
Expand All @@ -141,15 +155,25 @@ impl<B: AsyncBackend + Sync + Send> AsyncPmTilesReader<B> {
}

#[async_recursion]
async fn find_entry_rec(&self, tile_id: u64, entry: &Entry, depth: u8) -> Option<Entry> {
async fn find_entry_rec(&self, tile_id: u64, entry: &DirEntry, depth: u8) -> Option<DirEntry> {
// the recursion is done as two functions because it is a bit cleaner,
// and it allows directory to be cached later without cloning it first.
let offset = (self.header.leaf_offset + entry.offset) as _;
let length = entry.length as _;
let dir = self.read_directory(offset, length).await.ok()?;
let entry = dir.find_tile_id(tile_id);

if let Some(entry) = entry {
let entry = match self.cache.get_dir_entry(offset, tile_id).await {
DirCacheResult::NotCached => {
// Cache miss - read from backend
let length = entry.length as _;
let dir = self.read_directory(offset, length).await.ok()?;
let entry = dir.find_tile_id(tile_id).cloned();
self.cache.insert_dir(offset, dir).await;
entry
}
DirCacheResult::NotFound => None,
DirCacheResult::Found(entry) => Some(entry),
};

if let Some(ref entry) = entry {
if entry.is_leaf() {
return if depth <= 4 {
self.find_entry_rec(tile_id, entry, depth + 1).await
Expand All @@ -159,7 +183,7 @@ impl<B: AsyncBackend + Sync + Send> AsyncPmTilesReader<B> {
}
}

entry.cloned()
entry
}

async fn read_directory(&self, offset: usize, length: usize) -> Result<Directory, PmtError> {
Expand Down Expand Up @@ -191,26 +215,50 @@ impl<B: AsyncBackend + Sync + Send> AsyncPmTilesReader<B> {
}

#[cfg(feature = "http-async")]
impl AsyncPmTilesReader<HttpBackend> {
impl AsyncPmTilesReader<HttpBackend, NoCache> {
/// Creates a new PMTiles reader from a URL using the Reqwest backend.
///
/// Fails if [url] does not exist or is an invalid archive. (Note: HTTP requests are made to validate it.)
pub async fn new_with_url<U: IntoUrl>(client: Client, url: U) -> Result<Self, PmtError> {
Self::new_with_cached_url(NoCache, client, url).await
}
}

#[cfg(feature = "http-async")]
impl<C: DirectoryCache + Sync + Send> AsyncPmTilesReader<HttpBackend, C> {
/// Creates a new PMTiles reader with cache from a URL using the Reqwest backend.
///
/// Fails if [url] does not exist or is an invalid archive. (Note: HTTP requests are made to validate it.)
pub async fn new_with_cached_url<U: IntoUrl>(
cache: C,
client: Client,
url: U,
) -> Result<Self, PmtError> {
let backend = HttpBackend::try_from(client, url)?;

Self::try_from_source(backend).await
Self::try_from_cached_source(backend, cache).await
}
}

#[cfg(feature = "mmap-async-tokio")]
impl AsyncPmTilesReader<MmapBackend> {
impl AsyncPmTilesReader<MmapBackend, NoCache> {
/// Creates a new PMTiles reader from a file path using the async mmap backend.
///
/// Fails if [p] does not exist or is an invalid archive.
pub async fn new_with_path<P: AsRef<Path>>(path: P) -> Result<Self, PmtError> {
Self::new_with_cached_path(NoCache, path).await
}
}

#[cfg(feature = "mmap-async-tokio")]
impl<C: DirectoryCache + Sync + Send> AsyncPmTilesReader<MmapBackend, C> {
/// Creates a new cached PMTiles reader from a file path using the async mmap backend.
///
/// Fails if [p] does not exist or is an invalid archive.
pub async fn new_with_cached_path<P: AsRef<Path>>(cache: C, path: P) -> Result<Self, PmtError> {
let backend = MmapBackend::try_from(path).await?;

Self::try_from_source(backend).await
Self::try_from_cached_source(backend, cache).await
}
}

Expand Down
65 changes: 65 additions & 0 deletions src/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use std::collections::HashMap;
use std::sync::{Arc, RwLock};

use async_trait::async_trait;

use crate::directory::{DirEntry, Directory};

pub enum DirCacheResult {
NotCached,
NotFound,
Found(DirEntry),
}

impl From<Option<&DirEntry>> for DirCacheResult {
fn from(entry: Option<&DirEntry>) -> Self {
match entry {
Some(entry) => DirCacheResult::Found(entry.clone()),
None => DirCacheResult::NotFound,
}
}
}

/// A cache for PMTiles directories.
#[async_trait]
pub trait DirectoryCache {
/// Get a directory from the cache, using the offset as a key.
async fn get_dir_entry(&self, offset: usize, tile_id: u64) -> DirCacheResult;

/// Insert a directory into the cache, using the offset as a key.
/// Note that cache must be internally mutable.
async fn insert_dir(&self, offset: usize, directory: Directory);
}

pub struct NoCache;

#[async_trait]
impl DirectoryCache for NoCache {
#[inline]
async fn get_dir_entry(&self, _offset: usize, _tile_id: u64) -> DirCacheResult {
DirCacheResult::NotCached
}

#[inline]
async fn insert_dir(&self, _offset: usize, _directory: Directory) {}
}

/// A simple HashMap-based implementation of a PMTiles directory cache.
#[derive(Default)]
pub struct HashMapCache {
pub cache: Arc<RwLock<HashMap<usize, Directory>>>,
}

#[async_trait]
impl DirectoryCache for HashMapCache {
async fn get_dir_entry(&self, offset: usize, tile_id: u64) -> DirCacheResult {
if let Some(dir) = self.cache.read().unwrap().get(&offset) {
return dir.find_tile_id(tile_id).into();
}
DirCacheResult::NotCached
}

async fn insert_dir(&self, offset: usize, directory: Directory) {
self.cache.write().unwrap().insert(offset, directory);
}
}
17 changes: 9 additions & 8 deletions src/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ use varint_rs::VarintReader;

use crate::error::PmtError;

pub(crate) struct Directory {
entries: Vec<Entry>,
#[derive(Clone)]
pub struct Directory {
entries: Vec<DirEntry>,
}

impl Debug for Directory {
Expand All @@ -17,7 +18,7 @@ impl Debug for Directory {

impl Directory {
#[cfg(any(feature = "http-async", feature = "mmap-async-tokio"))]
pub fn find_tile_id(&self, tile_id: u64) -> Option<&Entry> {
pub fn find_tile_id(&self, tile_id: u64) -> Option<&DirEntry> {
match self.entries.binary_search_by(|e| e.tile_id.cmp(&tile_id)) {
Ok(idx) => self.entries.get(idx),
Err(next_id) => {
Expand All @@ -44,7 +45,7 @@ impl TryFrom<Bytes> for Directory {
let mut buffer = buffer.reader();
let n_entries = buffer.read_usize_varint()?;

let mut entries = vec![Entry::default(); n_entries];
let mut entries = vec![DirEntry::default(); n_entries];

// Read tile IDs
let mut next_tile_id = 0;
Expand All @@ -64,7 +65,7 @@ impl TryFrom<Bytes> for Directory {
}

// Read Offsets
let mut last_entry: Option<&Entry> = None;
let mut last_entry: Option<&DirEntry> = None;
for entry in entries.iter_mut() {
let offset = buffer.read_u64_varint()?;
entry.offset = if offset == 0 {
Expand All @@ -81,16 +82,16 @@ impl TryFrom<Bytes> for Directory {
}

#[derive(Clone, Default, Debug)]
pub(crate) struct Entry {
pub struct DirEntry {
pub(crate) tile_id: u64,
pub(crate) offset: u64,
pub(crate) length: u32,
pub(crate) run_length: u32,
}

#[cfg(any(feature = "http-async", feature = "mmap-async-tokio"))]
impl Entry {
pub fn is_leaf(&self) -> bool {
impl DirEntry {
pub(crate) fn is_leaf(&self) -> bool {
self.run_length == 0
}
}
Expand Down
10 changes: 7 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
#![forbid(unsafe_code)]

mod tile;

mod header;
pub use crate::header::{Compression, Header, TileType};

mod directory;
pub use directory::{DirEntry, Directory};

mod error;
pub use error::PmtError;
#[cfg(feature = "http-async")]
pub use error::PmtHttpError;

mod header;

#[cfg(feature = "http-async")]
pub mod http;

Expand All @@ -19,7 +21,9 @@ pub mod mmap;

#[cfg(any(feature = "http-async", feature = "mmap-async-tokio"))]
pub mod async_reader;
pub mod tile;

#[cfg(any(feature = "http-async", feature = "mmap-async-tokio"))]
pub mod cache;

#[cfg(test)]
mod tests {
Expand Down

0 comments on commit e7f9444

Please sign in to comment.