-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
server: direct send raft message to grpc client. #1882
Conversation
# Conflicts: # Cargo.lock # src/server/config.rs # src/server/conn.rs # src/server/coprocessor/endpoint.rs # src/server/metrics.rs # src/server/server.rs
* *: update grpc-rs.
# Conflicts: # Cargo.lock
# Conflicts: # Cargo.lock # src/server/kv.rs # src/server/snap.rs
src/server/transport.rs
Outdated
if !msg.get_message().has_snapshot() { | ||
let addr = self.raft_client.rl().addrs.get(&to_store_id).map(|x| x.to_owned()); | ||
if let Some(addr) = addr { | ||
if let Err(e) = self.raft_client.wl().send(to_store_id, addr.to_owned(), msg) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why to_owned twice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should consider the case of address outdated if the remote store changes its address, although this may be a rare case.
Or if we can't connect remote store many times, do we need to get the address from PD again?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why to_owned twice?
Mistake.
should consider the case of address outdated if the remote store changes its address, although this may be a rare case.
When fail to send a message, the cached address will be removed, then it will be resolved again when next message comes.
LGTM PTAL @BusyJay |
If addresses is stored as |
src/raftstore/store/store.rs
Outdated
@@ -636,6 +636,11 @@ impl<T: Transport, C: PdClient> Store<T, C> { | |||
} | |||
|
|||
fn on_raft_message(&mut self, mut msg: RaftMessage) -> Result<()> { | |||
let store_id = msg.get_to_peer().get_store_id(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
following is_raft_msg_valid already checks this, and here return this error makes no sense, we will only log this error outside.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay, I'll remove it.
Great work 👍 |
PTAL @BusyJay |
src/server/transport.rs
Outdated
Ok(()) | ||
fn send_store(&self, store_id: u64, msg: RaftMessage) { | ||
// check the corresponding token for store. | ||
let addr = self.raft_client.rl().addrs.get(&store_id).map(|x| x.to_owned()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use get().cloned()
directly?
src/bin/tikv-server.rs
Outdated
rocksdb_util::CFOptions::new(CF_RAFT, | ||
get_rocksdb_raftlog_cf_option(config, total_mem))]; | ||
let mut db_path = path.clone(); | ||
db_path.push("db"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kind of strange usage.
let top_dir = Path::new(&cfg.storage.path);
let lock_path = top_dir.join("LOCK");
...
let db_dir = top_dir.join("db");
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also snap_dir
, or to say snap_path
# Conflicts: # tests/raftstore/test_bootstrap.rs
# Conflicts: # src/bin/tikv-server.rs
any update? @disksing |
# Conflicts: # etc/config-template.toml # src/bin/tikv-server.rs # src/server/config.rs # src/server/raft_client.rs # src/server/server.rs # tests/raftstore/server.rs
# Conflicts: # Cargo.lock
please fix the conflict and merge to master, not grpc branch. |
src/server/transport.rs
Outdated
@@ -81,14 +78,13 @@ impl RaftStoreRouter for ServerRaftStoreRouter { | |||
} | |||
|
|||
fn send_raft_msg(&self, msg: RaftMessage) -> RaftStoreResult<()> { | |||
let store_id = msg.get_to_peer().get_store_id(); | |||
try!(self.validate_store_id(store_id)); | |||
if msg.get_region_id() == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to check in grpc_service.rs? So we don't add the counter in metric.
src/server/server.rs
Outdated
let addr = try!(SocketAddr::from_str(&cfg.addr)); | ||
let ip = format!("{}", addr.ip()); | ||
let channel_args = ChannelBuilder::new(env.clone()) | ||
.stream_initial_window_size(cfg.grpc_stream_initial_window_size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why delete it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mistake.
# Conflicts: # Cargo.lock
PTAL @BusyJay @siddontang |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
LGTM |
Signed-off-by: zeminzhou <[email protected]>
No description provided.