Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When spawning many tasks, some tasks never run. #1388

Closed
DevQps opened this issue Aug 4, 2019 · 22 comments
Closed

When spawning many tasks, some tasks never run. #1388

DevQps opened this issue Aug 4, 2019 · 22 comments

Comments

@DevQps
Copy link

DevQps commented Aug 4, 2019

Version

tokio = "0.1.22"

Platform

Windows 10

Description

Before I fill in the complete form, I'll first give a small description (hope that's fine).

I am trying to scrape the Google Certificate Transparency Logs using Tokio, hyper, h2 and the new async-await syntax. The problem arises when I spawn many requests over a single HTTP2 connection. This occurs around 25 tasks looping the following psuedocode:

Worker:

  1. Retrieve the next index to call.
  2. Send a request (a bunch of awaits here)
  3. Wait for the response (a bunch of awaits here as well)
  4. Go back to 1.

The behavior I see is that the first 10 tasks get up to point 3 but then never go to point for. Other tasks spawned after that are able to complete the whole progress.

Question: Can this be due to the scheduling algorithm of tokio? That if many tasks are being spawned that all have work to do when polled, that some old tasks might never be triggered anymore, because newer tasks keep the runtime busy? If this is the case: Is there any workaround around this?

Thanks in advance! If this is not the case I will rework my code to a minimum example and post it here!

@Aaron1011
Copy link
Contributor

@DevQps: Are you able to post your current project?

@DevQps
Copy link
Author

DevQps commented Aug 5, 2019

Hey Aaron,

I am sorry for not doing so before. I created a more or less minimum viable project to reproduce the bug. It's around 200 lines, but most are about setting up the connection and spawning tasks, so I think it should not be too hard to understand. It contains many comments as well (hope that helps!)

Here it is:

#![feature(async_await)]

use std::sync::mpsc::{Receiver, SyncSender};
use std::sync::{mpsc, Arc, Mutex};

use tokio::runtime::Runtime;
use tokio::prelude::*;
use rustls::ClientConfig;
use tokio_rustls::ClientConfigExt;
use webpki::DNSNameRef;
use tokio::net::TcpStream;

use futures::compat::Future01CompatExt;
use futures::future::{FutureExt, TryFutureExt};

use future_old::future::Future;
use rustls::Session;

use std::net::ToSocketAddrs;
use hyper::{Method, Request};

#[derive(Debug, Clone, Default)]
pub struct Log
{
    /// The description of the CT log.
    pub description: String,

    /// TODO: Unknown what this field means.
    pub key: String,

    /// The URL through which the Certificate Transparency Log should be retrieved.
    pub url: String,

    /// The maximum time between the moment that a certificate has been accepted and pushed to
    /// the log.
    pub maximum_merge_delay: u64,

    /// The list of operators that operators this log.
    pub operated_by: Vec<String>,

    /// The maximum amount of certificates we can retrieve per HTTP call. Default: 64.
    pub step_size: u64,
}

/// A struct that holds the data required to scrape a Certificate Transparency log.
#[derive(Clone)]
pub struct CTScanner
{
    /// The log that this CTScanner will scrape.
    pub log: Log,

    /// The amount of parallel HTTP2 connections that will be spawned for scraping.
    pub connections: u32,

    /// The amount of tasks that will be generated over each HTTP2 connection.
    pub tasks: u32,
}

impl CTScanner
{
    pub fn new(log: Log, connections: u32, tasks: u32) -> CTScanner
    {
        CTScanner {
            log: log,
            connections,
            tasks,
        }
    }

    /// Scans a Certificate Transparency log.
    pub async fn scan(self, index: u64)
    {
        // The index that is atomically updated on each thread.
        let index: Arc<Mutex<u64>> = Arc::new(Mutex::new(index));

        // For now set the maximum tree_size manually. This should be changed.
        let tree_size = 7842537;

        for _ in 0..self.connections
            {
                // Clone the log such that it can be accessed without performance overhead.
                let log = self.log.clone();

                // Spawn an HTTP2 connection that will query the CT logs.
                tokio::spawn(CTScanner::connect(self.tasks, log, index.clone(), tree_size).unit_error().boxed().compat());
            }
    }

    async fn connect(tasks: u32, log: Log, index: Arc<Mutex<u64>>, max: u64)
    {
        // Set the address to run our socket on.
        let address = "ct.googleapis.com:443"
            .to_socket_addrs()
            .unwrap()
            .next()
            .unwrap();

        let config = CTScanner::setup_tls_config();

        let dns_name = DNSNameRef::try_from_ascii_str("ct.googleapis.com").unwrap();

        // Open a TCP connection.
        let tcp = TcpStream::connect(&address).compat().await.unwrap();

        // Create an HTTPS connection
        let tls = config.connect_async(dns_name, tcp).compat().await.unwrap();

        // Perform an ALPN exchange.
        let (_, session) = tls.get_ref();
        let negotiated_protocol = session.get_alpn_protocol();
        assert_eq!(Some("h2"), negotiated_protocol.as_ref().map(|x| &**x));

        // If HTTP2 is supported, open an HTTP2 connection.
        let (client, conn) = h2::client::handshake(tls).compat().await.unwrap();

        // Spawn the connection handler that maintains the connection.
        tokio::spawn(conn.map_err(|_| panic!("connection failed")));

        // For each task on the connection, spawn a request runner.
        for i in 0..tasks
            {
                let client = client.clone();
                tokio::spawn(CTScanner::request(client.clone(), log.clone(), index.clone(), max).unit_error().boxed().compat());
            }
    }

    async fn request(client: h2::client::SendRequest<bytes::Bytes>, log: Log, index: Arc<Mutex<u64>>, max: u64)
    {
        // Increment the index atomically.
        let mut i = CTScanner::increment_index(&index, log.step_size);

        while i <= max
            {
                let client = client.clone();

                // Construct the URL to scrape.
                let url: String = format!(
                    "https://{}ct/v1/get-entries?start={}&end={}",
                    log.url,
                    i,
                    i + log.step_size
                ).parse().unwrap();

                // Construct the request.
                let request = Request::builder()
                    .method(Method::GET)
                    .uri(url)
                    .body(())
                    .unwrap();

                // Send the request.
                let (response, _) = client.ready().compat().await.unwrap().send_request(request, true).unwrap();

                println!("Waiting for: {}", i);

                // Receive the response.
                let response = response.compat().await.unwrap();
                let (parts, mut body) = response.into_parts();

                // Create a release handle.
                let mut release = body.release_capacity().clone();

                body.for_each(|chunk|{
                    release.release_capacity(chunk.len());
                    Ok(())
                }).compat().await;

                println!("Queried: {}", i);

                // Increment the index atomically.
                i = CTScanner::increment_index(&index, log.step_size);
            }
    }

    fn setup_tls_config() -> Arc<ClientConfig>
    {
        std::sync::Arc::new({
            let mut c = rustls::ClientConfig::new();
            c.root_store
                .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
            c.alpn_protocols.push("h2".to_owned());
            c
        })
    }

    fn increment_index(index: &Arc<Mutex<u64>>, increment: u64) -> u64
    {
        let mut guard = index.lock().unwrap();
        let counter = *guard;
        *guard += increment;
        counter
    }
}

pub fn main()
{
    let log = Log {
        url: "ct.googleapis.com/logs/argon2017/".to_string(),
        step_size: 1000,
        ..Default::default()
    };

    // Create a scanner with 1 HTTP2 connection and 20 tasks.
    let scanner = CTScanner::new(log, 1, 20);
    tokio::run(scanner.scan(0).unit_error().boxed().compat());
}

Cargo.toml

[dependencies]
# Futures v0.1 Compatability.
futures-preview = { version = "=0.3.0-alpha.16", features = ["compat"] }

# Futures v0.1
future_old = {package = "futures", version = "0.1.28"}

hyper = "0.12.33"
tokio = "0.1.22"
bytes = "0.4.12"
h2 = "0.1.26"

# RustTLS
rustls = "0.12"
tokio-rustls = "0.5.0"
webpki = "0.18"
webpki-roots = "0.14"

Output:

Waiting for: 0
Waiting for: 1000
Waiting for: 2000
Waiting for: 3000
Waiting for: 4000
Waiting for: 5000
Waiting for: 6000
Waiting for: 7000
Waiting for: 8000
Waiting for: 9000
Waiting for: 10000
Waiting for: 11000
Waiting for: 12000
Waiting for: 13000
Waiting for: 14000
Waiting for: 15000
Waiting for: 16000
Waiting for: 17000
Waiting for: 18000
Waiting for: 19000
Queried: 19000
Waiting for: 20000
Queried: 20000
Waiting for: 21000
Queried: 18000
Waiting for: 22000
Queried: 21000
Waiting for: 23000
Queried: 17000
Waiting for: 24000
Queried: 16000
Waiting for: 25000
Queried: 22000
Waiting for: 26000
Queried: 24000
Waiting for: 27000
Queried: 26000
Waiting for: 28000
Queried: 25000
Waiting for: 29000
Queried: 23000
Waiting for: 30000
Queried: 15000
Waiting for: 31000
Queried: 30000
Waiting for: 32000
Queried: 29000
Waiting for: 33000
Queried: 31000
Waiting for: 34000
Queried: 28000
Waiting for: 35000
Queried: 27000
Waiting for: 36000
Queried: 36000
Waiting for: 37000
Queried: 35000
Waiting for: 38000
Queried: 34000
Waiting for: 39000
Queried: 33000
Waiting for: 40000
Queried: 14000
Waiting for: 41000
Queried: 32000
Waiting for: 42000
Queried: 13000
Waiting for: 43000
Queried: 42000
Waiting for: 44000
Queried: 41000
Waiting for: 45000
Queried: 43000
Waiting for: 46000
Queried: 40000
Waiting for: 47000
Queried: 44000
Waiting for: 48000
Queried: 39000
Waiting for: 49000
Queried: 45000
Waiting for: 50000
Queried: 46000
Waiting for: 51000
Queried: 49000
Waiting for: 52000
Queried: 48000
Waiting for: 53000
Queried: 47000
Waiting for: 54000
Queried: 38000
Waiting for: 55000
Queried: 51000
Waiting for: 56000
Queried: 52000
Waiting for: 57000
Queried: 50000
Waiting for: 58000
Queried: 37000
Waiting for: 59000
Queried: 54000
Waiting for: 60000
Queried: 55000
Waiting for: 61000
Queried: 53000
Waiting for: 62000
Queried: 57000
Waiting for: 63000
Queried: 61000
Waiting for: 64000
Queried: 63000
Waiting for: 65000

As you can see here: It reached the part where it actually sent the request, but it seems like the first queried tasks are new executed to completion anymore (tasks with index 0 till 12000). If you have more questions please ask!

So my basic question is: Is this due to tokio's scheduling algorithm? (and do you have any cool tips and tricks on how I can fix this?) Or is this because of something else I did wrong?

Ps. If you cannot reproduce the problem, increasing the amount of tasks in main might help to reproduce it! I was able to reproduce it on my desktop and laptop.

Thanks!

@DevQps
Copy link
Author

DevQps commented Aug 8, 2019

@Aaron1011 Hey Aaron! Were you able to take a look at it? Just wondering if you missed my previous message maybe because I didn't tag you!

@dbcfd
Copy link
Contributor

dbcfd commented Aug 9, 2019

+1 on this. I'm seeing similar behavior where some tasks just stop getting run, especially in cases where you have more tasks than cores.

My next step is to be adjust the runtime by setting a higher number of core threads to see if it alleviates the problem.

@Darksonn
Copy link
Contributor

It could be google disliking many requests at once.

@DevQps
Copy link
Author

DevQps commented Aug 10, 2019 via email

@ntkoopman
Copy link

I have a similar/the same issue, but I can only reproduce it using PollEvented and the current_thread executor:

[package]
name = "untitled"
version = "0.1.0"
edition = "2018"

[dependencies]
tun = { version = "0.4.4", features = ["mio"] }
tokio = "0.2.0-alpha.1"
#![feature(async_await)]

use std::error::Error;

use tokio;
use tokio::io::AsyncReadExt;
use tokio::reactor::PollEvented;
use tun;

#[tokio::main(single_thread)]
async fn main() -> Result<(), Box<dyn Error>> {
    let mut config = tun::Configuration::default();
    config
        .address((10, 0, 0, 1))
        .netmask((255, 255, 255, 0))
        .up();
    let dev = tun::create(&config)?;
    let mut dev = PollEvented::new(dev);
    let mut buf = [0; 1024];
    loop {
        tokio::spawn(async { println!("spawn") });
        let size = dev.read(&mut buf).await?;
        println!("read: {}", size);
    }
}
spawn
read: 48
read: 76
read: 40
read: 48
read: 68
read: 64
...

Any future spawned before the first await on the PollEvented works fine, but afterwards it doesn't.

@carllerche
Copy link
Member

What does CPU usage look like?

@ntkoopman
Copy link

Usage is zero, checking strace shows it's actually blocking on a read

epoll_wait(3, [{EPOLLOUT, {u32=0, u64=0}}], 1024, 0) = 1
epoll_wait(3, [{EPOLLIN|EPOLLOUT, {u32=0, u64=0}}], 1024, 0) = 1
read(6, "`\0\0\0\0\10:\377\376\...\216\226g\377\2\0\0\0\0\0\0"..., 1024) = 48
...
read(6, 

I just tried tokio 0.1 without async and get the same result there.

@dbcfd
Copy link
Contributor

dbcfd commented Aug 15, 2019

Even with an enlarged thread pool and no timeout on threads, still occurs for me, and with low cpu as well.

Would love to be able to debug this better, but threadpool::worker logging is trace only and a little spammy for figuring out what is going on. Luckily I can debug on the box, but perf wasn't informative, will try attaching gdb next.

@colepoirier
Copy link

I'm having the same issue. I was trying to do a tokio 'hello world' exercise to get started in the tokio/async world.

`#![feature(async_await)]

extern crate tokio;

#[tokio::main(multi_thread)]
async fn main() {
for x in 0..10 {
tokio::spawn(async move {
println!("Hello {}", x);
});
}
}
Output:Finished release [optimized] target(s) in 23.76s
Running target/release/tokio_test_2
Hello 0
Hello 1
Hello 3
Hello 2
Hello 4
Hello 6
Hello 5`

Cargo.toml:
`[package]
name = "tokio_test_2"
version = "0.1.0"
authors = ["Me"]
edition = "2018"

[dependencies]
tokio = "0.2.0-alpha.2"`

@dbcfd
Copy link
Contributor

dbcfd commented Aug 21, 2019

@colepoirier Your issue is likely due to an early exit. You'll need someway to indicate your main is done, possibly using a channel to send a result when your async function completes, and your main receives all the completes.

@colepoirier
Copy link

colepoirier commented Aug 23, 2019

Thanks for your help @dbcfd! Unfortunately, with the modification of the channel sending a result, its somewhat better but still not consistent. Am I not using the channel correctly?

With the code:

#![feature(async_await)]

extern crate crossbeam_channel;
extern crate tokio;

#[tokio::main(multi_thread)]
async fn main() {
    let (s, r) = crossbeam_channel::bounded::<Result<(), ()>>(10);
    for x in 0..10 {
        let s = s.clone();
        tokio::spawn(async move {
            println!("Hello {}", x);
            if let Ok(_) = s.try_send(Ok(())) {
                ()
            };
        });
    }
    if let Ok(_) = r.recv() {
        ()
    }
}

I still get these results:

tokio_test_2 colepoirier $ cargo run --release
    Finished release [optimized] target(s) in 0.07s
     Running `target/release/tokio_test_2`
Hello 3
Hello 0
Hello 1
Hello 2
Hello 4
Hello 5

@dbcfd
Copy link
Contributor

dbcfd commented Aug 23, 2019

Possibly it's error'ing on the send, but going to grab your code and try it as well.

Still seeing my issue after upgrading to tokio 0.2, but this gives me a lot better test to debug.

@dbcfd
Copy link
Contributor

dbcfd commented Aug 23, 2019

Oh, you need to receive 10.

@dbcfd
Copy link
Contributor

dbcfd commented Aug 23, 2019

use std::time::{Duration, Instant};

#[tokio::main(multi_thread)]
async fn main() -> Result<(), ()> {
    let tasks = 100;

    let (s, r) = crossbeam_channel::bounded::<usize>(tasks);
    for x in 0..tasks {
        let s = s.clone();
        tokio::spawn(async move {
            println!("Running {}", x);
            tokio::timer::Delay::new(Instant::now() + Duration::from_millis(100)).await;
            if let Err(e) = s.try_send(x) {
                panic!("Failed to send: {:?}", e);
            }
        });
    }
    let mut received_messages = 0;
    let mut received_values = vec![];
    while received_messages < tasks {
        match r.recv() {
            Err(e) => panic!("Failed to receive: {:?}", e),
            Ok(v) => {
                received_values.push(v);
                received_messages += 1;
            }
        }
    }

    received_values.dedup_by_key(|x| *x);
    println!("Received: {:?}", received_values);

    assert_eq!(received_values.len(), tasks);

    Ok(())
}

Although all the tasks run, some tasks take quite a while to run.

Received: [9, 2, 3, 13, 27, 43, 0, 1, 40, 34, 93, 33, 92, 89, 91, 32, 4, 31, 30, 39, 29, 88, 74, 70, 87, 73, 25, 52, 90, 51, 86, 78, 84, 50, 24, 28, 85, 77, 41, 11, 49, 8, 38, 7, 83, 37, 60, 47, 59, 81, 58, 26, 19, 46, 45, 5, 44, 48, 63, 18, 17, 71, 36, 23, 76, 35, 12, 94, 95, 22, 10, 62, 96, 67, 21, 98, 97, 61, 82, 66, 20, 75, 16, 65, 57, 15, 6, 64, 14, 99, 69, 56, 68, 80, 54, 53, 55, 42, 72, 79]

@cynecx
Copy link
Contributor

cynecx commented Sep 1, 2019

@ntkoopman I've done some digging with your test-case and it seems that it's not related to tokio. The blocking read somewhat indicates that the actual file-descriptor is not in non-blocking mode, hence the blocking read. I've modified rust-tun, so that setting the fd to non-blocking mode is possible (https://github.com/cynecx/rust-tun/tree/non_blocking). The test-case seems to run correctly with the tun fd set in non-blocking mode.

@cynecx
Copy link
Contributor

cynecx commented Sep 1, 2019

@dbcfd Btw, I am not 100% sure if using crossbeam-channel inside an async-fn is a good idea. The issue is that the wake-up-model is different with crossbeam-channel, a recv-operation (with crossbeam) might actually park the executing system-thread, which might block one of the threads which tokio's runtime uses.

#[tokio::main(multi_thread)]
async fn main() -> Result<(), ()> {
    let tasks = 100;

    let (s, mut r) = tokio::sync::mpsc::channel(tasks);
    for x in 0..tasks {
        let mut s = s.clone();
        tokio::spawn(async move {
            println!("Running {}", x);
            tokio::timer::delay(Instant::now() + Duration::from_millis(100)).await;
            s.send(1).await;
        });
    }

    let mut received_messages = 0;
    while received_messages < tasks {
        match r.recv().await {
            None => panic!("Failed to receive"),
            Some(v) => {
                received_messages += v;
            }
        }
    }
    assert_eq!(received_messages, tasks);

    Ok(())
}

The modified test-case completes instantaneously, however I haven't really experienced a noticeable delay with the original test-case.

@dbcfd
Copy link
Contributor

dbcfd commented Sep 1, 2019

@cynecx I do think that the issue that I'm seeing is something with the interplay between crossbeam and tokio, even with a try_recv, rather than a receive. On my test case though, it completes fairly quickly, but some tasks get starved, e.g. task 5. There's about 40 some tasks that run before task 5 runs.

@colepoirier
Copy link

Thank you for your help! Trying to use tokio became more trouble than it was worth, so I switched to the runtime crate, and it works flawlessly. No weird bugs like this so far.

@carllerche
Copy link
Member

Sorry to hear you have been hitting trouble, but I will close the issue as I see no actionable items.

For others, the issue is the use of blocking operations on the event loop. The solution is to use tokio::sync::mpsc instead of crossbeam. Using runtime would have the same problems, so I am not immediately sure why the issue is not immediately apparent. My guess is runtime spawns way more threads by default tokio spawns threads based on physical cores and my guess is that runtime does not by default.

@colepoirier
Copy link

Now I’ve been using runtime for a little while and everything just works as you’d expect it to i.e. it does not have the same problems with crossbeam queues and channels. I’ve had nothing but problems with tokio 0.1 and tokio 0.2-alpha.x.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants