-
Notifications
You must be signed in to change notification settings - Fork 664
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: State sync from local filesystem #8913
Conversation
…om the local filesystem
Can I review it this week? |
Ready for review. |
*lock = Some(Err(err.to_string())); | ||
} | ||
} | ||
let mut lock = download_response.lock().unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is this lock used for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case of fetching data from S3, the data is written by one of many different threads managed by actix. This access needs to be synchronized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this may be stupid, but wouldn't the different threads be writing to different locations? Why would they need to be synchronized?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yet another thread is going to check if the writes are complete. That's a read-write race condition.
See fn check_external_storage_part_response()
. That function also clears the response after saving it to RocksDB.
Also the way timeouts are handled can be problematic. If a task was spawned to fetch an object, but hasn't done anything in the 60 seconds, a new task will be spawned and the old task isn't cancelled. That will be a write-write race condition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm I think I probably missed this point in the last review, but that feels a little strange to me. Is it not possible to just coordinate the tasks so that we don't start a new one when there's already one running for that part? Each task can just write its state part to a Vec allocated for it, and then when everybody is done we signal the main thread to tell it that the results are ready, and nobody needs a lock anywhere. Unless for some reason that I'm missing that is not possible here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that a mutex on every Vec is sub-optimal, but changing this type is a bigger change. When the Vec gets response, that needs to be communicated to somebody somehow. Maybe ShardSyncDownload needs to get a new AtomicI64 that counts the number of requests alive.
The current mechanism happened to exist, to send out part requests to the peers, and those requests are async.
For external storage a rayon loop could be a simpler approach. Or we can use SyncJobsActor
to fetch state parts.
Let me get back to it in a follow-up PR.
config_validate.rs logic looks good to me. left some other comments |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, with 2 minor questions.
@@ -59,10 +68,11 @@ pub fn spawn_state_sync_dump( | |||
runtime.num_shards(&epoch_id) | |||
}?; | |||
|
|||
let chain_id = client_config.chain_id.clone(); | |||
let keep_running = Arc::new(AtomicBool::new(true)); | |||
// Start a thread for each shard. | |||
let handles = (0..num_shards as usize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not a problem right now, but in a future world where we have 1000 shards or something, one actix_rt::Arbiter::new()
per shard is probably not what we want
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Acknowledged
nearcore/src/state_sync.rs
Outdated
} | ||
|
||
impl Drop for StateSyncDumpHandle { | ||
fn drop(&mut self) { | ||
self.keep_running.store(false, std::sync::atomic::Ordering::Relaxed); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the intended behavior here?
If I go:
fn make_node() -> Addr<ClientActor> {
let NearNode { client, .. } = nearcore::start_with_config(..);
client
}
fn main() {
let client = make_client();
do_stuff(client);
}
Then the node keeps running as usual but dumping state has stopped, which seems a bit unexpected
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestions welcome.
What you're describing seems acceptable. If the user needs these threads to keep working, the user needs to hold on to the state_sync_dump_handle
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what would be wrong with just not changing keep_running
on drop? Is this behavior something we explicitly want for some reason? Could make it work like the join handle returned from std::thread::spawn()
where dropping it does nothing, and just means that you dont have a handle to it anymore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem I'm solving is that the loop isn't interrupt-able while generating parts in
for part_id in parts_dumped..num_parts {
Tested that locally:
- Ignore
keep_running
- Replace the for-loop with an infinite loop obtaining
part_id=0
. - Run a localnet node that dumps state to
/tmp/
. - Hit Ctrl+C, it stops actix actors, disables logging, but the loop keeps running, writing new files over and over.
If dumping state takes only a short time, then the dumping threads respond well to the interrupts.
keep_running
stops that long-running loop and makes the threads interrupt-able.
Could make it work like the join handle returned from std::thread::spawn() where dropping it does nothing, and just means that you dont have a handle to it anymore
That would be problematic, because the for-loop would keep running.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah but I think the behavior you describe occurs because StateSyncDumpHandle::stop()
doesn't set keep_running
, not because we need it to be set in drop()
. So if you add that, it should work as you expect it to. And honestly it feels to me like StateSyncDumpHandle::stop()
should probably set that variable right?
man... github has a bug in reviews.... this happened last time too. Quite a shocking bug tbh unless I am just doing something stupid. So posting here comments I posted that dont show up in an incognito window for me: chain/client/src/sync/state.rs line 192: why chain/client/src/sync/state.rs line 993: I think in the last PR that added chain/client/src/sync/state.rs line 150: throughout this function, info! feels like it's probably a bit too verbose. There are many thousands of parts in mainnet right? chain/client/src/sync/state.rs line 1509: why not just call this function there then? docs/misc/state_sync_from_s3.md line 11: so we are saying it's not temporary anymore? docs/misc/state_sync_from_s3.md line 36: should we still tell people that you should set these if you want to dump the parts though? integration-tests/src/tests/nearcore/sync_state_nodes.rs line 405: when running this test, I get a lot of logspam with also I got this failure:
logs say:
This error happens like every time I run it, so at the moment I think this will probably be failing on nayduck if merged as is. Let me know if you can't reproduce it on your end and I can debug it on my computer integration-tests/src/tests/nearcore/sync_state_nodes.rs line 430 why Arc? same with dir2 below nearcore/src/config_validate.rs line 100 nit: core/chain-configs/src/client_config.rs line 87 nit: u64 is kinda big for this right? |
Yes, last time was awkward, this time I see all comments you quoted. |
/// Start the second node which gets state parts from that temp directory. | ||
#[test] | ||
#[cfg_attr(not(feature = "expensive_tests"), ignore)] | ||
fn sync_state_dump() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah debug!
sounds good. Although I don't think it's because of std::thread::sleep(iteration_delay)
, since nanosleep
should sleep for at least as long as you ask it to. Actually I think the frequency of it was because of the different threads all calling check_new_epoch()
at the same time. No objections to using actix_rt::time::sleep()
though. Don't think it matters much in this case which one you use but might as well.
And this test still fails for me every time with the same error I pasted. Does it pass for you? maybe it's platform dependent for some reason
Fixed and checked that it works on nayduck. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approving to unblock.
Also adds a unit test for state dump and an integration test for state dump and state sync from that dump
Also adds a unit test for state dump and an integration test for state dump and state sync from that dump
Also adds a unit test for state dump and an integration test for state dump and state sync from that dump
Also adds a unit test for state dump and an integration test for state dump and state sync from that dump
Also adds a unit test for state dump and an integration test for state dump and state sync from that dump