From d2ffc62a8d6cc830d780bff77008c8203edc4868 Mon Sep 17 00:00:00 2001 From: ris <79858083+RidRisR@users.noreply.github.com> Date: Mon, 8 Jul 2024 11:28:33 +0800 Subject: [PATCH] This is an automated cherry-pick of #17238 close tikv/tikv#17224 Signed-off-by: ti-chi-bot --- src/import/sst_service.rs | 9 ++++ tests/failpoints/cases/test_import_service.rs | 48 +++++++++++++++++++ tests/integrations/import/test_apply_log.rs | 33 +++++++++++++ 3 files changed, 90 insertions(+) diff --git a/src/import/sst_service.rs b/src/import/sst_service.rs index d9c8b64a681..ce5fb46d185 100644 --- a/src/import/sst_service.rs +++ b/src/import/sst_service.rs @@ -846,6 +846,10 @@ impl ImportSst for ImportSstService { .observe(start.saturating_elapsed().as_secs_f64()); let mut resp = ApplyResponse::default(); + if get_disk_status(0) != DiskUsage::Normal { + resp.set_error(Error::DiskSpaceNotEnough.into()); + return crate::send_rpc_response!(Ok(resp), sink, label, start); + } match Self::apply_imp(req, importer, applier, limiter, max_raft_size).await { Ok(Some(r)) => resp.set_range(r), @@ -879,6 +883,11 @@ impl ImportSst for ImportSstService { sst_importer::metrics::IMPORTER_DOWNLOAD_DURATION .with_label_values(&["queue"]) .observe(start.saturating_elapsed().as_secs_f64()); + if get_disk_status(0) != DiskUsage::Normal { + let mut resp = DownloadResponse::default(); + resp.set_error(Error::DiskSpaceNotEnough.into()); + return crate::send_rpc_response!(Ok(resp), sink, label, timer); + } // FIXME: download() should be an async fn, to allow BR to cancel // a download task. diff --git a/tests/failpoints/cases/test_import_service.rs b/tests/failpoints/cases/test_import_service.rs index 475acbe9f3c..2e2456537e5 100644 --- a/tests/failpoints/cases/test_import_service.rs +++ b/tests/failpoints/cases/test_import_service.rs @@ -6,6 +6,7 @@ use std::{ }; use file_system::calc_crc32; +<<<<<<< HEAD use futures::{executor::block_on, stream, SinkExt}; use grpcio::{Result, WriteFlags}; use kvproto::import_sstpb::*; @@ -13,6 +14,16 @@ use tempfile::Builder; use test_raftstore::Simulator; use test_sst_importer::*; use tikv_util::HandyRwLock; +======= +use futures::executor::block_on; +use grpcio::{ChannelBuilder, Environment}; +use kvproto::{disk_usage::DiskUsage, import_sstpb::*, tikvpb_grpc::TikvClient}; +use tempfile::{Builder, TempDir}; +use test_raftstore::{must_raw_put, Simulator}; +use test_sst_importer::*; +use tikv::config::TikvConfig; +use tikv_util::{config::ReadableSize, sys::disk, HandyRwLock}; +>>>>>>> 485c434512 (br: pre-check TiKV disk space before download (#17238)) #[allow(dead_code)] #[path = "../../integrations/import/util.rs"] @@ -89,6 +100,43 @@ fn upload_sst(import: &ImportSstClient, meta: &SstMeta, data: &[u8]) -> Result>>>>>> 485c434512 (br: pre-check TiKV disk space before download (#17238)) use crate::import::util; @@ -34,6 +39,34 @@ fn test_basic_apply() { ); } +#[test] +fn test_apply_full_disk() { + let (_cluster, ctx, _tikv, import) = new_cluster_and_tikv_import_client(); + let tmp = TempDir::new().unwrap(); + let storage = LocalStorage::new(tmp.path()).unwrap(); + let default = [ + (b"k1", b"v1", 1), + (b"k2", b"v2", 2), + (b"k3", b"v3", 3), + (b"k4", b"v4", 4), + ]; + let mut sst_meta = make_plain_file(&storage, "file1.log", default.into_iter()); + register_range_for(&mut sst_meta, b"k1", b"k3a"); + let mut req = ApplyRequest::new(); + req.set_context(ctx.clone()); + req.set_rewrite_rules(vec![rewrite_for(&mut sst_meta, b"k", b"r")].into()); + req.set_metas(vec![sst_meta].into()); + req.set_storage_backend(local_storage(&tmp)); + disk::set_disk_status(DiskUsage::AlmostFull); + let result = import.apply(&req).unwrap(); + assert!(result.has_error()); + assert_eq!( + result.get_error().get_message(), + "TiKV disk space is not enough." + ); + disk::set_disk_status(DiskUsage::Normal); +} + #[test] fn test_apply_twice() { let (_cluster, ctx, tikv, import) = util::new_cluster_and_tikv_import_client();