Skip to content

Commit

Permalink
Check timeout more often during compression (#160)
Browse files Browse the repository at this point in the history
  • Loading branch information
kornelski authored and shssoichiro committed Feb 1, 2019
1 parent 33890cf commit dad6230
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 32 deletions.
19 changes: 12 additions & 7 deletions src/deflate/cfzlib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use Deadline;
use atomicmin::AtomicMin;
pub use cloudflare_zlib::is_supported;
use cloudflare_zlib::*;
Expand All @@ -13,24 +14,27 @@ impl From<ZError> for PngError {
}
}

pub fn cfzlib_deflate(
pub(crate) fn cfzlib_deflate(
data: &[u8],
level: u8,
strategy: u8,
window_bits: u8,
max_size: &AtomicMin,
deadline: &Deadline,
) -> PngResult<Vec<u8>> {
let mut stream = Deflate::new(level.into(), strategy.into(), window_bits.into())?;
stream.reserve(max_size.get().unwrap_or(data.len() / 2));
let max_size = max_size.as_atomic_usize();
// max size is generally checked after each split,
// so splitting the buffer into pieces gices more checks
// so splitting the buffer into pieces gives more checks
// = better chance of hitting it sooner.
let (first, rest) = data.split_at(data.len() / 2);
stream.compress_with_limit(first, max_size)?;
let (rest1, rest2) = rest.split_at(rest.len() / 2);
stream.compress_with_limit(rest1, max_size)?;
stream.compress_with_limit(rest2, max_size)?;
let chunk_size = (data.len()/4).max(1<<15).min(1<<18); // 32-256KB
for chunk in data.chunks(chunk_size) {
stream.compress_with_limit(chunk, max_size)?;
if deadline.passed() {
return Err(PngError::TimedOut);
}
}
Ok(stream.finish()?)
}

Expand All @@ -42,6 +46,7 @@ fn compress_test() {
Z_DEFAULT_STRATEGY as u8,
15,
&AtomicMin::new(None),
&Deadline::new(None, false),
).unwrap();
let res = ::deflate::inflate(&vec).unwrap();
assert_eq!(&res, b"azxcvbnm");
Expand Down
5 changes: 3 additions & 2 deletions src/deflate/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use Deadline;
use atomicmin::AtomicMin;
use error::PngError;
use miniz_oxide;
Expand Down Expand Up @@ -25,11 +26,11 @@ pub fn inflate(data: &[u8]) -> PngResult<Vec<u8>> {
}

/// Compress a data stream using the DEFLATE algorithm
pub fn deflate(data: &[u8], zc: u8, zs: u8, zw: u8, max_size: &AtomicMin) -> PngResult<Vec<u8>> {
pub(crate) fn deflate(data: &[u8], zc: u8, zs: u8, zw: u8, max_size: &AtomicMin, deadline: &Deadline) -> PngResult<Vec<u8>> {
#[cfg(any(target_arch = "x86_64", target_arch = "aarch64"))]
{
if cfzlib::is_supported() {
return cfzlib::cfzlib_deflate(data, zc, zs, zw, max_size);
return cfzlib::cfzlib_deflate(data, zc, zs, zw, max_size, deadline);
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::fmt;
#[derive(Debug, Clone)]
pub enum PngError {
DeflatedDataTooLong(usize),
TimedOut,
NotPNG,
APNGNotSupported,
InvalidData,
Expand All @@ -28,6 +29,7 @@ impl fmt::Display for PngError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
PngError::DeflatedDataTooLong(_) => f.write_str("deflated data too long"),
PngError::TimedOut => f.write_str("timed out"),
PngError::NotPNG => f.write_str("Invalid header detected; Not a PNG file"),
PngError::InvalidData => f.write_str("Invalid data found; unable to read PNG file"),
PngError::TruncatedData => {
Expand Down
13 changes: 9 additions & 4 deletions src/evaluate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use atomicmin::AtomicMin;
use deflate;
use Deadline;
use png::PngData;
use png::PngImage;
use png::STD_COMPRESSION;
Expand All @@ -16,20 +17,20 @@ use std::sync::Mutex;
use std::thread;

/// Collect image versions and pick one that compresses best
pub struct Evaluator {
pub(crate) struct Evaluator {
/// images are sent to the thread for evaluation
eval_send: Option<SyncSender<(Arc<PngImage>, f32, bool)>>,
// the thread helps evaluate images asynchronously
eval_thread: thread::JoinHandle<Option<PngData>>,
}

impl Evaluator {
pub fn new() -> Self {
pub fn new(deadline: Arc<Deadline>) -> Self {
// queue size ensures we're not using too much memory for pending reductions
let (tx, rx) = sync_channel(4);
Self {
eval_send: Some(tx),
eval_thread: thread::spawn(move || Self::evaluate_images(rx)),
eval_thread: thread::spawn(move || Self::evaluate_images(rx, deadline)),
}
}

Expand Down Expand Up @@ -57,20 +58,24 @@ impl Evaluator {
}

/// Main loop of evaluation thread
fn evaluate_images(from_channel: Receiver<(Arc<PngImage>, f32, bool)>) -> Option<PngData> {
fn evaluate_images(from_channel: Receiver<(Arc<PngImage>, f32, bool)>, deadline: Arc<Deadline>) -> Option<PngData> {
let best_candidate_size = AtomicMin::new(None);
let best_result: Mutex<Option<(PngData, _, _)>> = Mutex::new(None);
// ends when sender is dropped
for (nth, (image, bias, is_reduction)) in from_channel.iter().enumerate() {
let filters_iter = STD_FILTERS.par_iter().with_max_len(1);

filters_iter.for_each(|&f| {
if deadline.passed() {
return;
}
if let Ok(idat_data) = deflate::deflate(
&image.filter_image(f),
STD_COMPRESSION,
STD_STRATEGY,
STD_WINDOW,
&best_candidate_size,
&deadline,
) {
let mut res = best_result.lock().unwrap();
if best_candidate_size.get().map_or(true, |old_best_len| {
Expand Down
42 changes: 26 additions & 16 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,8 @@ pub fn optimize(input: &InFile, output: &OutFile, opts: &Options) -> PngResult<(
eprintln!("Processing: {}", input);
}

let deadline = Arc::new(Deadline::new(opts.timeout, opts.verbosity.is_some()));

let in_data = match *input {
InFile::Path(ref input_path) => PngData::read_file(input_path)?,
InFile::StdIn => {
Expand All @@ -354,7 +356,7 @@ pub fn optimize(input: &InFile, output: &OutFile, opts: &Options) -> PngResult<(
let mut png = PngData::from_slice(&in_data, opts.fix_errors)?;

// Run the optimizer on the decoded PNG.
let mut optimized_output = optimize_png(&mut png, &in_data, opts)?;
let mut optimized_output = optimize_png(&mut png, &in_data, opts, deadline)?;

if is_fully_optimized(in_data.len(), optimized_output.len(), opts) {
if opts.verbosity.is_some() {
Expand Down Expand Up @@ -437,11 +439,14 @@ pub fn optimize_from_memory(data: &[u8], opts: &Options) -> PngResult<Vec<u8>> {
if opts.verbosity.is_some() {
eprintln!("Processing from memory");
}

let deadline = Arc::new(Deadline::new(opts.timeout, opts.verbosity.is_some()));

let original_size = data.len() as usize;
let mut png = PngData::from_slice(data, opts.fix_errors)?;

// Run the optimizer on the decoded PNG.
let optimized_output = optimize_png(&mut png, data, opts)?;
let optimized_output = optimize_png(&mut png, data, opts, deadline)?;

if is_fully_optimized(original_size, optimized_output.len(), opts) {
eprintln!("Image already optimized");
Expand All @@ -460,11 +465,9 @@ struct TrialOptions {
}

/// Perform optimization on the input PNG object using the options provided
fn optimize_png(png: &mut PngData, original_data: &[u8], opts: &Options) -> PngResult<Vec<u8>> {
fn optimize_png(png: &mut PngData, original_data: &[u8], opts: &Options, deadline: Arc<Deadline>) -> PngResult<Vec<u8>> {
type TrialWithData = (TrialOptions, Vec<u8>);

let deadline = Deadline::new(opts);

let original_png = png.clone();

// Print png info
Expand Down Expand Up @@ -519,7 +522,7 @@ fn optimize_png(png: &mut PngData, original_data: &[u8], opts: &Options) -> PngR
}

// This will collect all versions of images and pick one that compresses best
let eval = Evaluator::new();
let eval = Evaluator::new(deadline.clone());
// Usually we want transformations that are smaller than the unmodified original,
// but if we're interlacing, we have to accept a possible file size increase.
if opts.interlace.is_none() {
Expand All @@ -535,15 +538,12 @@ fn optimize_png(png: &mut PngData, original_data: &[u8], opts: &Options) -> PngR

if opts.idat_recoding || reduction_occurred {
// Go through selected permutations and determine the best
let combinations = if opts.deflate == Deflaters::Zlib {
let combinations = if opts.deflate == Deflaters::Zlib && !deadline.passed() {
filter.len() * compression.len() * strategies.len()
} else {
filter.len()
};
let mut results: Vec<TrialOptions> = Vec::with_capacity(combinations);
if opts.verbosity.is_some() {
eprintln!("Trying: {} combinations", combinations);
}

for f in &filter {
if opts.deflate == Deflaters::Zlib {
Expand All @@ -555,6 +555,9 @@ fn optimize_png(png: &mut PngData, original_data: &[u8], opts: &Options) -> PngR
strategy: *zs,
});
}
if deadline.passed() {
break;
}
}
} else {
// Zopfli compression has no additional options
Expand All @@ -570,6 +573,10 @@ fn optimize_png(png: &mut PngData, original_data: &[u8], opts: &Options) -> PngR
}
}

if opts.verbosity.is_some() {
eprintln!("Trying: {} combinations", results.len());
}

let filter_iter = filter.par_iter().with_max_len(1);
let filters: HashMap<u8, Vec<u8>> = filter_iter
.map(|f| {
Expand All @@ -594,10 +601,12 @@ fn optimize_png(png: &mut PngData, original_data: &[u8], opts: &Options) -> PngR
trial.strategy,
opts.window,
&best_size,
&deadline,
)
} else {
deflate::zopfli_deflate(filtered)
};

let new_idat = match new_idat {
Ok(n) => n,
Err(PngError::DeflatedDataTooLong(max)) if opts.verbosity == Some(1) => {
Expand Down Expand Up @@ -775,18 +784,18 @@ fn perform_reductions(mut png: Arc<PngImage>, opts: &Options, deadline: &Deadlin
}

/// Keep track of processing timeout
struct Deadline {
pub(crate) struct Deadline {
start: Instant,
timeout: Option<Duration>,
print_message: AtomicBool,
}

impl Deadline {
pub fn new(opts: &Options) -> Self {
pub fn new(timeout: Option<Duration>, verbose: bool) -> Self {
Self {
start: Instant::now(),
timeout: opts.timeout,
print_message: AtomicBool::new(opts.verbosity.is_some()),
timeout,
print_message: AtomicBool::new(verbose),
}
}

Expand All @@ -795,10 +804,11 @@ impl Deadline {
/// If the verbose option is on, it also prints a timeout message once.
pub fn passed(&self) -> bool {
if let Some(timeout) = self.timeout {
if self.start.elapsed() > timeout {
let elapsed = self.start.elapsed();
if elapsed > timeout {
if self.print_message.load(Ordering::Relaxed) {
self.print_message.store(false, Ordering::Relaxed);
eprintln!("Timed out after {} second(s)", timeout.as_secs());
eprintln!("Timed out after {} second(s)", elapsed.as_secs());
}
return true;
}
Expand Down
2 changes: 1 addition & 1 deletion src/reduction/alpha.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use png::PngImage;
use colors::ColorType;
use rayon::prelude::*;

pub fn try_alpha_reductions(png: Arc<PngImage>, alphas: &HashSet<AlphaOptim>, eval: &Evaluator) {
pub(crate) fn try_alpha_reductions(png: Arc<PngImage>, alphas: &HashSet<AlphaOptim>, eval: &Evaluator) {
assert!(!alphas.is_empty());
let alphas = alphas.iter().collect::<Vec<_>>();
let alphas_iter = alphas.par_iter().with_max_len(1);
Expand Down
4 changes: 2 additions & 2 deletions src/reduction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use bit_depth::*;
pub mod color;
use color::*;

pub use bit_depth::reduce_bit_depth;
pub use alpha::try_alpha_reductions;
pub(crate) use bit_depth::reduce_bit_depth;
pub(crate) use alpha::try_alpha_reductions;

/// Attempt to reduce the number of colors in the palette
/// Returns `None` if palette hasn't changed
Expand Down

0 comments on commit dad6230

Please sign in to comment.