Skip to content

Commit

Permalink
import: cherry-pick fixup
Browse files Browse the repository at this point in the history
Signed-off-by: kennytm <[email protected]>
  • Loading branch information
kennytm committed Oct 21, 2019
1 parent 2aa3a99 commit 7771a86
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 40 deletions.
15 changes: 8 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ log_wrappers = { path = "components/log_wrappers" }
engine = { path = "components/engine" }
tikv_util = { path = "components/tikv_util" }
farmhash = "1.1.5"
external_storage = { path = "components/external_storage" }

[dependencies.murmur3]
git = "https://github.com/pingcap/murmur3.git"
Expand Down
12 changes: 12 additions & 0 deletions src/import/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,18 @@ quick_error! {
ResourceTemporarilyUnavailable(msg: String) {
display("{}", msg)
}
CannotReadExternalStorage(url: String, name: String, err: IoError) {
cause(err)
display("Cannot read {}/{}", url, name)
}
WrongKeyPrefix(what: &'static str, key: Vec<u8>, prefix: Vec<u8>) {
display("\
{} has wrong prefix: key {} does not start with {}",
what,
hex::encode_upper(&key),
hex::encode_upper(&prefix),
)
}
}
}

Expand Down
55 changes: 27 additions & 28 deletions src/import/sst_importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use engine::rocks::{IngestExternalFileOptions, SeekKey, SstReader, SstWriterBuil
use external_storage::create_storage;

use super::{Error, Result};
use crate::raftstore::store::keys;

/// SSTImporter manages SST files that are waiting for ingesting.
pub struct SSTImporter {
Expand Down Expand Up @@ -83,7 +84,7 @@ impl SSTImporter {
// region info in PD.
pub fn download(
&self,
meta: &SstMeta,
meta: &SSTMeta,
url: &str,
name: &str,
rewrite_rule: &RewriteRule,
Expand All @@ -110,7 +111,7 @@ impl SSTImporter {

fn do_download(
&self,
meta: &SstMeta,
meta: &SSTMeta,
url: &str,
name: &str,
rewrite_rule: &RewriteRule,
Expand Down Expand Up @@ -278,7 +279,7 @@ impl SSTImporter {
}
}

pub fn list_ssts(&self) -> Result<Vec<SstMeta>> {
pub fn list_ssts(&self) -> Result<Vec<SSTMeta>> {
self.dir.list_ssts()
}
}
Expand Down Expand Up @@ -384,9 +385,7 @@ impl ImportDir {
let path = e.path();
match path_to_sst_meta(&path) {
Ok(sst) => ssts.push(sst),
Err(e) => {
error!("path_to_sst_meta failed"; "path" => %path.to_str().unwrap(), "err" => %e)
}
Err(e) => error!("path_to_sst_meta failed"; "path" => %path.to_str().unwrap(), "err" => %e),
}
}
Ok(ssts)
Expand Down Expand Up @@ -671,8 +670,8 @@ mod tests {
assert_eq!(meta, new_meta);
}

fn create_sample_external_sst_file() -> Result<(tempfile::TempDir, SstMeta)> {
let ext_sst_dir = tempfile::tempdir()?;
fn create_sample_external_sst_file() -> Result<(TempDir, SSTMeta)> {
let ext_sst_dir = TempDir::new("external_storage")?;
let mut sst_writer = SstWriterBuilder::new()
.build(ext_sst_dir.path().join("sample.sst").to_str().unwrap())?;
sst_writer.put(b"zt123_r01", b"abc")?;
Expand All @@ -683,7 +682,7 @@ mod tests {
let sst_info = sst_writer.finish()?;

// make up the SST meta for downloading.
let mut meta = SstMeta::default();
let mut meta = SSTMeta::default();
let uuid = Uuid::new_v4();
meta.set_uuid(uuid.as_bytes().to_vec());
meta.set_cf_name("default".to_owned());
Expand All @@ -708,8 +707,8 @@ mod tests {
let (ext_sst_dir, meta) = create_sample_external_sst_file().unwrap();

// performs the download.
let importer_dir = tempfile::tempdir().unwrap();
let importer = SSTImporter::new(&importer_dir).unwrap();
let importer_dir = TempDir::new("importer_dir").unwrap();
let importer = SSTImporter::new(importer_dir.path()).unwrap();

let range = importer
.download(
Expand Down Expand Up @@ -755,8 +754,8 @@ mod tests {
let (ext_sst_dir, meta) = create_sample_external_sst_file().unwrap();

// performs the download.
let importer_dir = tempfile::tempdir().unwrap();
let importer = SSTImporter::new(&importer_dir).unwrap();
let importer_dir = TempDir::new("importer_dir").unwrap();
let importer = SSTImporter::new(importer_dir.path()).unwrap();

let range = importer
.download(
Expand Down Expand Up @@ -799,8 +798,8 @@ mod tests {
let (ext_sst_dir, mut meta) = create_sample_external_sst_file().unwrap();

// performs the download.
let importer_dir = tempfile::tempdir().unwrap();
let importer = SSTImporter::new(&importer_dir).unwrap();
let importer_dir = TempDir::new("importer_dir").unwrap();
let importer = SSTImporter::new(importer_dir.path()).unwrap();

let range = importer
.download(
Expand All @@ -817,7 +816,7 @@ mod tests {
assert_eq!(range.get_end(), b"t9102_r14");

// performs the ingest
let ingest_dir = tempfile::tempdir().unwrap();
let ingest_dir = TempDir::new("ingest_dir").unwrap();
let db = new_engine(
ingest_dir.path().to_str().unwrap(),
None,
Expand Down Expand Up @@ -847,8 +846,8 @@ mod tests {
#[test]
fn test_download_sst_partial_range() {
let (ext_sst_dir, mut meta) = create_sample_external_sst_file().unwrap();
let importer_dir = tempfile::tempdir().unwrap();
let importer = SSTImporter::new(&importer_dir).unwrap();
let importer_dir = TempDir::new("importer_dir").unwrap();
let importer = SSTImporter::new(importer_dir.path()).unwrap();

// note: the range doesn't contain the DATA_PREFIX 'z'.
meta.mut_range().set_start(b"t123_r02".to_vec());
Expand Down Expand Up @@ -890,8 +889,8 @@ mod tests {
#[test]
fn test_download_sst_partial_range_with_key_rewrite() {
let (ext_sst_dir, mut meta) = create_sample_external_sst_file().unwrap();
let importer_dir = tempfile::tempdir().unwrap();
let importer = SSTImporter::new(&importer_dir).unwrap();
let importer_dir = TempDir::new("importer_dir").unwrap();
let importer = SSTImporter::new(importer_dir.path()).unwrap();

meta.mut_range().set_start(b"t5_r02".to_vec());
meta.mut_range().set_end(b"t5_r13".to_vec());
Expand Down Expand Up @@ -930,13 +929,13 @@ mod tests {

#[test]
fn test_download_sst_invalid() {
let ext_sst_dir = tempfile::tempdir().unwrap();
let ext_sst_dir = TempDir::new("external_storage").unwrap();
fs::write(ext_sst_dir.path().join("sample.sst"), b"not an SST file").unwrap();

let importer_dir = tempfile::tempdir().unwrap();
let importer = SSTImporter::new(&importer_dir).unwrap();
let importer_dir = TempDir::new("importer_dir").unwrap();
let importer = SSTImporter::new(importer_dir.path()).unwrap();

let mut meta = SstMeta::new();
let mut meta = SSTMeta::new();
meta.set_uuid(vec![0u8; 16]);

let result = importer.download(
Expand All @@ -955,8 +954,8 @@ mod tests {
#[test]
fn test_download_sst_empty() {
let (ext_sst_dir, mut meta) = create_sample_external_sst_file().unwrap();
let importer_dir = tempfile::tempdir().unwrap();
let importer = SSTImporter::new(&importer_dir).unwrap();
let importer_dir = TempDir::new("importer_dir").unwrap();
let importer = SSTImporter::new(importer_dir.path()).unwrap();

meta.mut_range().set_start(vec![b'x']);
meta.mut_range().set_end(vec![b'y']);
Expand All @@ -978,8 +977,8 @@ mod tests {
#[test]
fn test_download_sst_wrong_key_prefix() {
let (ext_sst_dir, meta) = create_sample_external_sst_file().unwrap();
let importer_dir = tempfile::tempdir().unwrap();
let importer = SSTImporter::new(&importer_dir).unwrap();
let importer_dir = TempDir::new("importer_dir").unwrap();
let importer = SSTImporter::new(importer_dir.path()).unwrap();

let result = importer.download(
&meta,
Expand Down
7 changes: 2 additions & 5 deletions tests/integrations/import/sst_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,7 @@ fn test_download_sst() {
use grpcio::{Error, RpcStatus};

let (_cluster, ctx, tikv, import) = new_cluster_and_tikv_import_client();
let temp_dir = Builder::new()
.prefix("test_download_sst")
.tempdir()
.unwrap();
let temp_dir = TempDir::new("test_download_sst").unwrap();

let sst_path = temp_dir.path().join("test.sst");
let sst_range = (0, 100);
Expand Down Expand Up @@ -255,7 +252,7 @@ fn check_ingested_kvs(tikv: &TikvClient, ctx: &Context, sst_range: (u8, u8)) {
}
}

fn check_sst_deleted(client: &ImportSstClient, meta: &SstMeta, data: &[u8]) {
fn check_sst_deleted(client: &ImportSstClient, meta: &SSTMeta, data: &[u8]) {
for _ in 0..10 {
if send_upload_sst(client, meta, data).is_ok() {
// If we can upload the file, it means the previous file has been deleted.
Expand Down

0 comments on commit 7771a86

Please sign in to comment.