Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ImportKV service #2881

Merged
merged 2 commits into from
Apr 4, 2018
Merged

Add ImportKV service #2881

merged 2 commits into from
Apr 4, 2018

Conversation

huachaohuang
Copy link
Contributor

Implement Open/Write/Close APIs to write key-value pairs to import
server.

Open opens an engine identified by an UUID.

Write creates a write stream to write key-value batch to the opened
engine. Different clients can write to the same engine concurrently.

Close closes the engine after all write batches are finished. An
engine can only be closed when all write streams are closed. An engine
can only be closed once, and it can not be opened again once it is
closed.

After these three steps, the data in the engine is ready to be
imported to TiKV, but the Import API is not included in this PR.

@breezewish
Copy link
Member

breezewish commented Mar 30, 2018

in concept, what's the relationship of engine, engineFile, client, token, uuid?

@huachaohuang
Copy link
Contributor Author

Engine: A storage engine to store key-value pairs.
EngineDir/EngineFile: Just simple wrappers to manage different engines in a directory.
UUID: The ID of an engine, different clients can operate on the same engine concurrently.
Token: The ID of a client, and it will be bound to an UUID to write to the engine in streaming.

Copy link
Member

@breezewish breezewish left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rest LGTM


pub struct KVImporter {
dir: EngineDir,
token: AtomicUsize,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to rename to a more obvious name, i.e. clientId

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use this token name in different places in TiKV, so I think it is fine.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

self.token.fetch_add(1, Ordering::SeqCst)
}

fn insert(&self, token: Token, engine: Arc<EngineFile>) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function name is misleading. It can be easily mis-interpreted as something related to the KvEngine's data insert.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, HashMap has an insert method too. KVImporter is a struct to maintain the relationship between clients and engines, it has nothing to do with data insert/remove.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a HashMap's insert, we know exactly what it inserts. When it is interpreted as "insert into HashMap" the purpose is obvious. However, for KVImporter's insert, usually we have no idea unless take a look at the implementation. I mean, "insert into KVImporter" is confusing, because we neither know what KVImporter really does, nor what insert really does, according to their names. I suggest to either document as comment for at least one of them, or change the name of the function so that it is self-explaining.

assert!(inner.clients.insert(token, engine).is_none());
}

pub fn remove(&self, token: Token) -> Option<Arc<EngineFile>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto. The function name is misleading.

}

pub fn write(&self, token: Token, batch: WriteBatch) -> Result<usize> {
match self.remove(token) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's wired that write contains a detach operation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We remove it here so we don't need to lock when writing to the engine.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it an implication that a token can be only used to write once?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, a token can be used to write multiple times, that's why we insert it back after a successful write.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

Ok(engine) => engine,
Err(engine) => {
inner.engines.insert(uuid, engine);
return Err(Error::EngineInUse(uuid));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The usage of engine is actually maintained by whether or not there are references to the engine. However semantically they are not exactly the same concept. Are there better solutions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, it is exactly the same concept. When a client writes to it, it acquires a reference, and when the client finishes the write, it releases the reference. So if there are no references to the engine, there is no one using the engine, that's exactly what I want to check here.

Copy link
Member

@breezewish breezewish Apr 2, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can you ensure that "when the client finishes the write, it releases the reference"? If this is a guarantee according to implementations, I think it is somewhat weak because of no strong restrictions. If it is a guarantee by interface I accept.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a client finishes a write stream or any errors occur, the stream will be closed and the reference will be released. And if the client doesn't close all streams, it can't close the engine, that's a protocol between the client and the server documented here https://github.com/huachaohuang/kvproto/blob/import-1/proto/importpb.proto#L22.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we can ensure the client uses the unique engine with UUID, so here only the same client can close the engine. If the client closes the engine, the following write will be failed, I think it is acceptable. So I think here we can close the engine directly, no need to check in used any more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really. Different clients from different machines can write to the same engine concurrently, and any client can close the engine if it knows that all writes are done. Moreover, I must use the unsafe way to close the engine with an Arc if I don't check the reference.

@siddontang siddontang added the priority/critical Priority: Critical label Apr 2, 2018
@huachaohuang
Copy link
Contributor Author

@breeswish I have changed the function name and add tests, PTAL thanks.

@huachaohuang
Copy link
Contributor Author

@zhangjinpeng1987 PTAL

Copy link
Member

@breezewish breezewish left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rest LGTM

Cargo.toml Outdated
@@ -82,7 +82,8 @@ git = "https://github.com/pingcap/murmur3.git"
git = "https://github.com/pingcap/rust-rocksdb.git"

[dependencies.kvproto]
git = "https://github.com/pingcap/kvproto.git"
git = "https://github.com/huachaohuang/kvproto.git"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs update?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait until I get two LGTMs.

@@ -442,6 +442,7 @@
# key-path = ""

[import]
# import-dir = "/tmp/tikv/import"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment for this field.

pub fn open_engine(&self, uuid: Uuid) -> Result<()> {
let mut inner = self.inner.lock().unwrap();
if inner.engines.contains_key(&uuid) {
return Ok(());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it proper to return Ok if the engine is already opened using a uuid?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, clients may want to open the engine concurrently.

impl Engine {
pub fn new<P: AsRef<Path>>(path: P, uuid: Uuid, opts: DbConfig) -> Result<Engine> {
let db = {
let (db_opts, cf_opts) = tune_dboptions_for_bulk_load(&opts);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you only import to the default CF?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the data for the write CF will be generated from the default CF when we actually import the SST files.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so here no need to change the Write CF options?

Copy link
Contributor Author

@huachaohuang huachaohuang Apr 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, no need.

}

pub fn write(&self, mut batch: WriteBatch) -> Result<usize> {
let wb = RawBatch::new();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you new with a predefined capacity?


/// Write to the attached engine.
pub fn write_engine(&self, token: Token, batch: WriteBatch) -> Result<usize> {
match self.remove_client(token) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove and insert client every time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid locking while the client is writing to the engine.

@tikv tikv deleted a comment from breezewish Apr 3, 2018
@tikv tikv deleted a comment from breezewish Apr 3, 2018

use super::{ImportKVService, KVImporter};

const MAX_GRPC_MSG_LEN: usize = 32 * 1024 * 1024;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be configurable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't seem like necessary for now.

let batch_size = import1.write_engine(token, batch)?;
IMPORT_WRITE_CHUNK_BYTES.observe(batch_size as f64);
}
IMPORT_WRITE_CHUNK_DURATION.observe(start.elapsed_secs());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Histogram has timers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, but I only want to record the time of successful writes.

}

/// Write to the attached engine.
pub fn write_engine(&self, token: Token, batch: WriteBatch) -> Result<usize> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about get_engine? So we can avoid lock-removal and lock-insertion over and over again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any hint about how to do that? I have no idea how to carry the engine outside.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically, I think it should look like:

let engine = {
  let mut inner = self.inner.lock().unwrap();
  inner.clients.get(&token).clone()
}

stream.fold(engine, |engine| { ... });

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your advice, the code looks simpler now, PTAL.

@@ -299,6 +299,15 @@ fn run_raft_server(pd_client: RpcClient, cfg: &TiKvConfig, security_mgr: Arc<Sec
}
}

fn run_import_server(cfg: &TiKvConfig) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may want enable mertics.

pub num_threads: usize,
pub stream_channel_window: usize,
}

impl Default for Config {
fn default() -> Config {
Config {
import_dir: "/tmp/tikv/import".to_owned(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not data-dir/import?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no data-dir here, and it is conceptually not related to the tikv data-dir.

let wb = RawBatch::with_capacity(wb_cap);
let commit_ts = batch.get_commit_ts();
for m in batch.take_mutations().iter_mut() {
let k = Key::from_raw(m.get_key()).append_ts(commit_ts);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you reuse the k?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's that mean? The key is different every time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reuse its alloction.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't seem we have a way to reuse it now, we can optimize this in the future when necessary.

@@ -350,7 +363,7 @@ fn overwrite_config_with_cmd_args(config: &mut TiKvConfig, matches: &ArgMatches)
config.raft_store.capacity = capacity;
}

if matches.is_present("import-mode") {
if matches.is_present("import-mode") || matches.is_present("import-server") {
Copy link
Contributor

@siddontang siddontang Apr 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the different between import-mode and import-server?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import-mode tunes some configs automatically, while import-server runs as an import server, which has nothing to do with tikv. Don't pay too much attention here now, I will try to move this to other place and build import server as a standalone binary in next pr.

Copy link
Member

@overvenus overvenus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@huachaohuang
Copy link
Contributor Author

/run-integration-tests

Implement Open/Write/Close APIs to write key-value pairs to import
server.

`Open` opens an engine identified by an UUID.

`Write` creates a write stream to write key-value batch to the opened
engine. Different clients can write to the same engine concurrently.

`Close` closes the engine after all write batches are finished. An
engine can only be closed when all write streams are closed. An engine
can only be closed once, and it can not be opened again once it is
closed.

After these three steps, the data in the engine is ready to be
imported to TiKV, but the `Import` API is not included in this PR.
@huachaohuang
Copy link
Contributor Author

/run-integration-tests

@huachaohuang huachaohuang merged commit 6f5e9d2 into tikv:master Apr 4, 2018
@huachaohuang huachaohuang deleted the import-1 branch April 4, 2018 09:30
sticnarf pushed a commit to sticnarf/tikv that referenced this pull request Oct 27, 2019
Implement Open/Write/Close APIs to write key-value pairs to import
server.

`Open` opens an engine identified by an UUID.

`Write` creates a write stream to write key-value batch to the opened
engine. Different clients can write to the same engine concurrently.

`Close` closes the engine after all write batches are finished. An
engine can only be closed when all write streams are closed. An engine
can only be closed once, and it can not be opened again once it is
closed.

After these three steps, the data in the engine is ready to be
imported to TiKV, but the `Import` API is not included in this PR.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
priority/critical Priority: Critical
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants