Skip to content
This repository has been archived by the owner on Dec 29, 2022. It is now read-only.

Commit

Permalink
Merge pull request #923 from matklad/managed-concurrency
Browse files Browse the repository at this point in the history
Managed concurrency
  • Loading branch information
nrc authored Jul 18, 2018
2 parents b7318f0 + 833a2a9 commit f7b4a9b
Show file tree
Hide file tree
Showing 13 changed files with 280 additions and 84 deletions.
62 changes: 62 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ url = "1.1.0"
walkdir = "2.1"
regex = "1"
ordslice = "0.3"
crossbeam-channel = "0.2.1"

[dev-dependencies]
json = "0.11"
Expand Down
14 changes: 14 additions & 0 deletions src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::build::*;
use crate::lsp_data;
use crate::lsp_data::*;
use crate::server::Output;
use crate::concurrency::{ConcurrentJob, Jobs};

use std::collections::HashMap;
use std::path::{Path, PathBuf};
Expand Down Expand Up @@ -150,6 +151,7 @@ pub struct InitActionContext {
prev_changes: Arc<Mutex<HashMap<PathBuf, u64>>>,

config: Arc<Mutex<Config>>,
jobs: Arc<Mutex<Jobs>>,
client_capabilities: Arc<lsp_data::ClientCapabilities>,
client_supports_cmd_run: bool,
/// Whether the server is performing cleanup (after having received
Expand Down Expand Up @@ -199,6 +201,7 @@ impl InitActionContext {
analysis_queue,
vfs,
config,
jobs: Arc::new(Mutex::new(Jobs::new())),
current_project,
previous_build_results: Arc::new(Mutex::new(HashMap::new())),
build_queue,
Expand Down Expand Up @@ -238,6 +241,8 @@ impl InitActionContext {
}

fn build<O: Output>(&self, project_path: &Path, priority: BuildPriority, out: &O) {
let (job, token) = ConcurrentJob::new();
self.add_job(job);

let pbh = {
let config = self.config.lock().unwrap();
Expand All @@ -253,6 +258,7 @@ impl InitActionContext {
use_black_list: config.use_crate_blacklist,
notifier: Box::new(BuildDiagnosticsNotifier::new(out.clone())),
blocked_threads: vec![],
token,
}
};

Expand All @@ -267,6 +273,14 @@ impl InitActionContext {
self.build(&self.current_project, priority, out);
}

pub fn add_job(&self, job: ConcurrentJob) {
self.jobs.lock().unwrap().add(job);
}

pub fn wait_for_concurrent_jobs(&self) {
self.jobs.lock().unwrap().wait_for_all();
}

/// Block until any builds and analysis tasks are complete.
fn block_on_build(&self) {
self.build_queue.block_on_build();
Expand Down
5 changes: 4 additions & 1 deletion src/actions/post_build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ use std::thread::{self, Thread};
use crate::actions::diagnostics::{parse_diagnostics, Diagnostic, ParsedDiagnostics, Suggestion};
use crate::actions::progress::DiagnosticsNotifier;
use crate::build::BuildResult;
use crate::lsp_data::PublishDiagnosticsParams;
use crate::concurrency::JobToken;
use languageserver_types::DiagnosticSeverity;
use itertools::Itertools;
use crate::lsp_data::PublishDiagnosticsParams;

use rls_analysis::AnalysisHost;
use rls_data::Analysis;
Expand All @@ -46,6 +47,8 @@ pub struct PostBuildHandler {
pub active_build_count: Arc<AtomicUsize>,
pub notifier: Box<dyn DiagnosticsNotifier>,
pub blocked_threads: Vec<thread::Thread>,
#[allow(unused)] // for drop
pub token: JobToken,
}

impl PostBuildHandler {
Expand Down
1 change: 0 additions & 1 deletion src/build/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,6 @@ impl BuildQueue {
if needs_compilation_ctx_from_cargo {
priority = BuildPriority::Cargo;
}

let build = PendingBuild {
build_dir: new_build_dir.to_owned(),
built_files: self.internals.dirty_files.lock().unwrap().clone(),
Expand Down
100 changes: 100 additions & 0 deletions src/concurrency.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use std::{thread};

use crossbeam_channel::{bounded, Receiver, Sender};

/// `ConcurrentJob` is a handle for some long-running computation
/// off the main thread. It can be used, indirectly, to wait for
/// the completion of the said computation.
///
/// All `ConncurrentJob`s must eventually be stored in a `Jobs` table.
///
/// All concurrent activities, like spawning a thread or pushing
/// a work item to a job queue, should be covered by `ConcurrentJob`.
/// This way, the set of `Jobs` table will give a complete overview of
/// concurrency in the system, and it will be possinle to wait for all
/// jobs to finish, which helps tremendously with making tests deterministic.
///
/// `JobToken` is the worker-side counterpart of `ConcurrentJob`. Dropping
/// a `JobToken` signals that the corresponding job has finished.
#[must_use]
pub struct ConcurrentJob {
chan: Receiver<Never>,
}

pub struct JobToken {
#[allow(unused)] // for drop
chan: Sender<Never>,
}

pub struct Jobs {
jobs: Vec<ConcurrentJob>,
}

impl Jobs {
pub fn new() -> Jobs {
Jobs { jobs: Vec::new() }
}

pub fn add(&mut self, job: ConcurrentJob) {
self.gc();
self.jobs.push(job);
}

/// Blocks the current thread until all pending jobs are finished.
pub fn wait_for_all(&mut self) {
while !self.jobs.is_empty() {
let done: usize = {
let chans = self.jobs.iter().map(|j| &j.chan);
select! {
recv(chans, msg, from) => {
assert!(msg.is_none());
self.jobs.iter().position(|j| &j.chan == from).unwrap()
}
}
};
drop(self.jobs.swap_remove(done));
}
}

fn gc(&mut self) {
self.jobs.retain(|job| !job.is_completed())
}
}

impl ConcurrentJob {
pub fn new() -> (ConcurrentJob, JobToken) {
let (tx, rx) = bounded(0);
let job = ConcurrentJob { chan: rx };
let token = JobToken { chan: tx };
(job, token)
}

fn is_completed(&self) -> bool {
is_closed(&self.chan)
}
}

impl Drop for ConcurrentJob {
fn drop(&mut self) {
if self.is_completed() || thread::panicking() {
return;
}
panic!("orphaned concurrent job");
}
}

// We don't actually send messages through the channels,
// and instead just check if the channel is closed,
// so we use uninhabited enum as a message type
enum Never {}

/// Nonblocking
fn is_closed(chan: &Receiver<Never>) -> bool {
select! {
recv(chan, msg) => match msg {
None => true,
Some(never) => match never {}
}
default => false,
}
}
3 changes: 3 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ extern crate log;
extern crate serde_derive;
#[macro_use]
extern crate serde_json;
#[macro_use]
extern crate crossbeam_channel;

use std::env;
use std::sync::Arc;
Expand All @@ -57,6 +59,7 @@ pub mod cmd;
pub mod config;
pub mod lsp_data;
pub mod server;
pub mod concurrency;

#[cfg(test)]
mod test;
Expand Down
Loading

0 comments on commit f7b4a9b

Please sign in to comment.