Skip to content

Commit

Permalink
fix: skip empty chunks when decoding gz
Browse files Browse the repository at this point in the history
  • Loading branch information
MicaiahReid committed Oct 18, 2023
1 parent f1b89f7 commit b4ce82f
Showing 1 changed file with 38 additions and 31 deletions.
69 changes: 38 additions & 31 deletions components/chainhook-cli/src/archive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,48 +41,55 @@ pub async fn download_tsv_file(config: &Config) -> Result<(), String> {

// Download chunks
let (tx, rx) = flume::bounded(0);
destination_path.push(default_tsv_file_path(&config.network.stacks_network));

let decoder_thread = std::thread::spawn(move || {
let input = ChannelRead::new(rx);
let mut decoder = GzDecoder::new(input);
let mut file = fs::File::create(&destination_path).unwrap();
let mut buffer = [0; 512_000];
loop {
match decoder.read(&mut buffer) {
Ok(0) => break,
Ok(n) => {
if let Err(e) = file.write_all(&buffer[..n]) {
let err = format!("unable to update compressed archive: {}", e.to_string());

if res.status() == reqwest::StatusCode::OK {
destination_path.push(default_tsv_file_path(&config.network.stacks_network));

let decoder_thread = std::thread::spawn(move || {
let mut file = fs::File::create(&destination_path).unwrap();
let input = ChannelRead::new(rx);
let mut decoder = GzDecoder::new(input);
let mut buffer = [0; 512_000];
loop {
match decoder.read(&mut buffer) {
Ok(0) => break,
Ok(n) => {
if let Err(e) = file.write_all(&buffer[..n]) {
let err =
format!("unable to update compressed archive: {}", e.to_string());
return Err(err);
}
}
Err(e) => {
let err = format!("unable to write compressed archive: {}", e.to_string());
return Err(err);
}
}
Err(e) => {
let err = format!("unable to write compressed archive: {}", e.to_string());
return Err(err);
}
}
}
let _ = file.flush();
Ok(())
});

if res.status() == reqwest::StatusCode::OK {
let _ = file.flush();
Ok(())
});
let mut stream = res.bytes_stream();
while let Some(item) = stream.next().await {
let chunk = item.or(Err(format!("Error while downloading file")))?;
let chunk = match item {
Ok(i) => Ok(i),
Err(e) => Err(format!("Error while downloading file {}", e.to_string())),
}?;
if chunk.is_empty() {
continue;
}
tx.send_async(chunk.to_vec())
.await
.map_err(|e| format!("unable to download stacks event: {}", e.to_string()))?;
.map_err(|e| format!("unable to download stacks archive: {}", e.to_string()))?;
}
drop(tx);
}

tokio::task::spawn_blocking(|| decoder_thread.join())
.await
.unwrap()
.unwrap()
.unwrap();
tokio::task::spawn_blocking(|| decoder_thread.join())
.await
.unwrap()
.unwrap()
.unwrap();
}

Ok(())
}
Expand Down

0 comments on commit b4ce82f

Please sign in to comment.