Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#17238
Browse files Browse the repository at this point in the history
close tikv#17224

Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
RidRisR authored and ti-chi-bot committed Oct 31, 2024
1 parent 0f2863c commit d2ffc62
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 0 deletions.
9 changes: 9 additions & 0 deletions src/import/sst_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,10 @@ impl<E: Engine> ImportSst for ImportSstService<E> {
.observe(start.saturating_elapsed().as_secs_f64());

let mut resp = ApplyResponse::default();
if get_disk_status(0) != DiskUsage::Normal {
resp.set_error(Error::DiskSpaceNotEnough.into());
return crate::send_rpc_response!(Ok(resp), sink, label, start);
}

match Self::apply_imp(req, importer, applier, limiter, max_raft_size).await {
Ok(Some(r)) => resp.set_range(r),
Expand Down Expand Up @@ -879,6 +883,11 @@ impl<E: Engine> ImportSst for ImportSstService<E> {
sst_importer::metrics::IMPORTER_DOWNLOAD_DURATION
.with_label_values(&["queue"])
.observe(start.saturating_elapsed().as_secs_f64());
if get_disk_status(0) != DiskUsage::Normal {
let mut resp = DownloadResponse::default();
resp.set_error(Error::DiskSpaceNotEnough.into());
return crate::send_rpc_response!(Ok(resp), sink, label, timer);
}

// FIXME: download() should be an async fn, to allow BR to cancel
// a download task.
Expand Down
48 changes: 48 additions & 0 deletions tests/failpoints/cases/test_import_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,24 @@ use std::{
};

use file_system::calc_crc32;
<<<<<<< HEAD
use futures::{executor::block_on, stream, SinkExt};
use grpcio::{Result, WriteFlags};
use kvproto::import_sstpb::*;
use tempfile::Builder;
use test_raftstore::Simulator;
use test_sst_importer::*;
use tikv_util::HandyRwLock;
=======
use futures::executor::block_on;
use grpcio::{ChannelBuilder, Environment};
use kvproto::{disk_usage::DiskUsage, import_sstpb::*, tikvpb_grpc::TikvClient};
use tempfile::{Builder, TempDir};
use test_raftstore::{must_raw_put, Simulator};
use test_sst_importer::*;
use tikv::config::TikvConfig;
use tikv_util::{config::ReadableSize, sys::disk, HandyRwLock};
>>>>>>> 485c434512 (br: pre-check TiKV disk space before download (#17238))

#[allow(dead_code)]
#[path = "../../integrations/import/util.rs"]
Expand Down Expand Up @@ -89,6 +100,43 @@ fn upload_sst(import: &ImportSstClient, meta: &SstMeta, data: &[u8]) -> Result<U
})
}

#[test]
fn test_download_to_full_disk() {
let (_cluster, ctx, _tikv, import) = new_cluster_and_tikv_import_client();
let temp_dir = Builder::new()
.prefix("test_download_sst_blocking_sst_writer")
.tempdir()
.unwrap();

let sst_path = temp_dir.path().join("test.sst");
let sst_range = (0, 100);
let (mut meta, _) = gen_sst_file(sst_path, sst_range);
meta.set_region_id(ctx.get_region_id());
meta.set_region_epoch(ctx.get_region_epoch().clone());

// Now perform a proper download.
let mut download = DownloadRequest::default();
download.set_sst(meta.clone());
download.set_storage_backend(external_storage::make_local_backend(temp_dir.path()));
download.set_name("test.sst".to_owned());
download.mut_sst().mut_range().set_start(vec![sst_range.1]);
download
.mut_sst()
.mut_range()
.set_end(vec![sst_range.1 + 1]);
download.mut_sst().mut_range().set_start(Vec::new());
download.mut_sst().mut_range().set_end(Vec::new());
disk::set_disk_status(DiskUsage::AlmostFull);
let result = import.download(&download).unwrap();
assert!(!result.get_is_empty());
assert!(result.has_error());
assert_eq!(
result.get_error().get_message(),
"TiKV disk space is not enough."
);
disk::set_disk_status(DiskUsage::Normal);
}

#[test]
fn test_ingest_reentrant() {
let (cluster, ctx, _tikv, import) = new_cluster_and_tikv_import_client();
Expand Down
33 changes: 33 additions & 0 deletions tests/integrations/import/test_apply_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ use engine_traits::CF_DEFAULT;
use external_storage_export::LocalStorage;
use kvproto::import_sstpb::ApplyRequest;
use tempfile::TempDir;
<<<<<<< HEAD
=======
use test_sst_importer::*;
use tikv_util::sys::disk::{self, DiskUsage};
>>>>>>> 485c434512 (br: pre-check TiKV disk space before download (#17238))

use crate::import::util;

Expand Down Expand Up @@ -34,6 +39,34 @@ fn test_basic_apply() {
);
}

#[test]
fn test_apply_full_disk() {
let (_cluster, ctx, _tikv, import) = new_cluster_and_tikv_import_client();
let tmp = TempDir::new().unwrap();
let storage = LocalStorage::new(tmp.path()).unwrap();
let default = [
(b"k1", b"v1", 1),
(b"k2", b"v2", 2),
(b"k3", b"v3", 3),
(b"k4", b"v4", 4),
];
let mut sst_meta = make_plain_file(&storage, "file1.log", default.into_iter());
register_range_for(&mut sst_meta, b"k1", b"k3a");
let mut req = ApplyRequest::new();
req.set_context(ctx.clone());
req.set_rewrite_rules(vec![rewrite_for(&mut sst_meta, b"k", b"r")].into());
req.set_metas(vec![sst_meta].into());
req.set_storage_backend(local_storage(&tmp));
disk::set_disk_status(DiskUsage::AlmostFull);
let result = import.apply(&req).unwrap();
assert!(result.has_error());
assert_eq!(
result.get_error().get_message(),
"TiKV disk space is not enough."
);
disk::set_disk_status(DiskUsage::Normal);
}

#[test]
fn test_apply_twice() {
let (_cluster, ctx, tikv, import) = util::new_cluster_and_tikv_import_client();
Expand Down

0 comments on commit d2ffc62

Please sign in to comment.