Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

os.sched_yield() + range() unexpected influence #121512

Closed
socketpair opened this issue Jul 8, 2024 · 14 comments
Closed

os.sched_yield() + range() unexpected influence #121512

socketpair opened this issue Jul 8, 2024 · 14 comments
Assignees
Labels
type-bug An unexpected behavior, bug, or error

Comments

@socketpair
Copy link
Contributor

socketpair commented Jul 8, 2024

Bug report

Bug description:

The counters printed are expected to be almost the same. They are. But some arguments for range(), sched_yield() significantly change the picture. Note, sched_yield() slows down ANOTHER thread, and unexpectedly speeds up the current one.

python3 --version
Python 3.12.3
Linux Fedora 39, kernel 6.9.4-100.fc39.x86_64
from os import sched_yield
from queue import SimpleQueue
from threading import Thread
from time import monotonic

period = 2


def _thread(queue1: SimpleQueue) -> None:
    consumed = 0
    start = monotonic()
    while queue1.get():
        consumed += 1
    end = monotonic()
    print(f'Consumed: {consumed / 1000 / (end - start):12.2f} Kitems/sec.')


def main():
    for arg in (1, 10, 100, 1000, 10_000, 100_000, 1000_000, 10_000_000):
        print(f'Testing for {arg=}')
        queue: SimpleQueue[bool] = SimpleQueue()
        thread = Thread(target=_thread, args=(queue,), daemon=True)
        thread.start()
        cnt = 0
        deadline = monotonic() + period
        while monotonic() < deadline:
            for i in range(arg):
                cnt += 1
                if queue.qsize() <= 1048576:
                    queue.put(True)
            sched_yield()  # with commented out - no bugs for any `range()`
            # time.sleep(0) # no bugs for any `range()`
        stop = monotonic()
        queue.put(False)
        thread.join()
        print(f'Produced: {cnt / 1000 / (stop - (deadline - period)):12.2f} Kitems/sec.')


main()
Testing for arg=1
Consumed:       644.82 Kitems/sec.
Produced:       644.83 Kitems/sec.
Testing for arg=10
Consumed:      3981.64 Kitems/sec.
Produced:      3981.68 Kitems/sec.
Testing for arg=100
Consumed:      3809.66 Kitems/sec.  <----- BUG
Produced:     15519.08 Kitems/sec.
Testing for arg=1000
Consumed:      1192.78 Kitems/sec.  <----- BUG
Produced:     19605.59 Kitems/sec.
Testing for arg=10000
Consumed:       585.87 Kitems/sec.  <----- BUG
Produced:     19829.86 Kitems/sec.
Testing for arg=100000
Consumed:     10839.18 Kitems/sec.
Produced:     10883.60 Kitems/sec.
Testing for arg=1000000
Consumed:     11005.64 Kitems/sec.
Produced:     11011.80 Kitems/sec.
Testing for arg=10000000
Consumed:     10800.30 Kitems/sec.
Produced:     10805.39 Kitems/sec.

CPython versions tested on:

3.12

Operating systems tested on:

Linux

@socketpair socketpair added the type-bug An unexpected behavior, bug, or error label Jul 8, 2024
@socketpair
Copy link
Contributor Author

P.S.

            for i in range(arg):
                cnt += 1
                queue.put(True)

gives the same, but in range 1000...100_000

@socketpair
Copy link
Contributor Author

#96078 ?

@Zheaoli
Copy link
Contributor

Zheaoli commented Jul 9, 2024

@Eclips4 would you mind to assign this issue for me?

@Zheaoli
Copy link
Contributor

Zheaoli commented Jul 28, 2024

Problem solved.. I guess you use Intel CPU( >= 10th?)

sched_yield may move the process to a different CPU core. so we got different speed on the power core and efficiency core

you can bind the core

from os import sched_yield
from queue import SimpleQueue
from threading import Thread
from time import monotonic

period = 2

import os


pid = os.getpid()


cpu_set = {1}


mask = 0
for cpu in cpu_set:
    mask |= (1 << cpu)


os.sched_setaffinity(pid, cpu_set)


affinity = os.sched_getaffinity(pid)
print(f"Current CPU affinity: {affinity}")

def _thread(queue1: SimpleQueue) -> None:
    consumed = 0
    start = monotonic()
    while queue1.get():
        consumed += 1
    end = monotonic()
    print(f'Consumed: {consumed / 1000 / (end - start):12.2f} Kitems/sec.')


def main():
    for arg in (1, 10, 100, 1000, 10_000, 100_000, 1000_000, 10_000_000):
        print(f'Testing for {arg=}')
        queue: SimpleQueue[bool] = SimpleQueue()
        thread = Thread(target=_thread, args=(queue,), daemon=True)
        thread.start()
        cnt = 0
        deadline = monotonic() + period
        while monotonic() < deadline:
            for i in range(arg):
                cnt += 1
                if queue.qsize() <= 1048576:
                    queue.put(True)
            sched_yield()  # with commented out - no bugs for any `range()`
            # time.sleep(0) # no bugs for any `range()`
        stop = monotonic()
        queue.put(False)
        thread.join()
        print(f'Produced: {cnt / 1000 / (stop - (deadline - period)):12.2f} Kitems/sec.')


main()

@socketpair
Copy link
Contributor Author

Yes, 12th Gen Intel(R) Core(TM) i7-1260P

@Zheaoli
Copy link
Contributor

Zheaoli commented Jul 28, 2024

Yes, 12th Gen Intel(R) Core(TM) i7-1260P

You can try sched_setaffinity to find out the problem has been solved

@CharlesChen0823
Copy link

CharlesChen0823 commented Jul 28, 2024

I using AMD 7950X, using first test case, the result is same as origin issue.

Testing for arg=1
Consumed:       535.82 Kitems/sec.
Produced:       535.84 Kitems/sec.
Testing for arg=10
Consumed:      2799.96 Kitems/sec.
Produced:      2800.04 Kitems/sec.
Testing for arg=100
Consumed:      9362.83 Kitems/sec.
Produced:      9363.19 Kitems/sec.
Testing for arg=1000
Consumed:     11650.44 Kitems/sec.
Produced:     14064.84 Kitems/sec.
Testing for arg=10000
Consumed:       658.91 Kitems/sec.
Produced:     33276.69 Kitems/sec.
Testing for arg=100000
Consumed:       657.21 Kitems/sec.
Produced:     34852.55 Kitems/sec.
Testing for arg=1000000
Consumed:     17089.16 Kitems/sec.
Produced:     17111.93 Kitems/sec.
Testing for arg=10000000
Consumed:     17127.53 Kitems/sec.
Produced:     17141.92 Kitems/sec.

using sched_setaffinity the result is

Current CPU affinity: {1}
Testing for arg=1
Consumed:       113.57 Kitems/sec.
Produced:       113.57 Kitems/sec.
Testing for arg=10
Consumed:      1012.79 Kitems/sec.
Produced:      1012.80 Kitems/sec.
Testing for arg=100
Consumed:      6451.54 Kitems/sec.
Produced:      6451.63 Kitems/sec.
Testing for arg=1000
Consumed:     14954.24 Kitems/sec.
Produced:     14954.53 Kitems/sec.
Testing for arg=10000
Consumed:     15170.43 Kitems/sec.
Produced:     15170.62 Kitems/sec.
Testing for arg=100000
Consumed:     17175.67 Kitems/sec.
Produced:     17175.93 Kitems/sec.
Testing for arg=1000000
Consumed:     15528.18 Kitems/sec.
Produced:     15528.39 Kitems/sec.
Testing for arg=10000000
Consumed:     15520.81 Kitems/sec.
Produced:     15520.98 Kitems/sec.

If I remember correctly, AMD cpu doesn't have power core and efficiency core?

this test result is using ubuntu in vmware with python3.12.4

@Zheaoli
Copy link
Contributor

Zheaoli commented Jul 28, 2024

If I remember correctly, AMD cpu doesn't have power core and efficiency core?

Yes you are right

let's find more detail about this

@Zheaoli
Copy link
Contributor

Zheaoli commented Jul 28, 2024

Interesting, I write a C code

For no cpu binding version

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>
#include <stdbool.h>
#include <sys/time.h>
#include <sched.h>
#include <errno.h>

#define QUEUE_SIZE 1048576
#define PERIOD 2
#define WAKEUP_INTERVAL 100

typedef struct {
    bool buffer[QUEUE_SIZE];
    int head;
    int tail;
    pthread_mutex_t lock;
} SimpleQueue;

void init_queue(SimpleQueue *queue) {
    queue->head = 0;
    queue->tail = 0;
    pthread_mutex_init(&queue->lock, NULL);
    for (int i = 0; i < QUEUE_SIZE; i++) {
        queue->buffer[i] = false;
    }
}

bool enqueue(SimpleQueue *queue, bool value) {
    pthread_mutex_lock(&queue->lock);
    int next_tail = (queue->tail + 1) % QUEUE_SIZE;

    if (next_tail == queue->head) {
        // Queue is full
        pthread_mutex_unlock(&queue->lock);
        return false;
    }

    queue->buffer[queue->tail] = value;
    queue->tail = next_tail;
    pthread_mutex_unlock(&queue->lock);
    return true;
}

bool dequeue(SimpleQueue *queue, bool *value) {
    pthread_mutex_lock(&queue->lock);

    if (queue->head == queue->tail) {
        // Queue is empty
        pthread_mutex_unlock(&queue->lock);
        return false;
    }

    *value = queue->buffer[queue->head];
    queue->head = (queue->head + 1) % QUEUE_SIZE;
    pthread_mutex_unlock(&queue->lock);
    return true;
}

double monotonic_time() {
    struct timespec ts;
    clock_gettime(CLOCK_MONOTONIC, &ts);
    return ts.tv_sec + ts.tv_nsec / 1000000000.0;
}

void *thread_func(void *arg) {
    SimpleQueue *queue = (SimpleQueue *)arg;
    int consumed = 0;
    double start = monotonic_time();

    bool value;
    while (true) {
        if (dequeue(queue, &value) && value) {
            consumed++;
        } else {
            break; // exit loop when dequeue returns false or value is false
        }
    }

    double end = monotonic_time();
    printf("Consumed: %12.2f Kitems/sec.\n", consumed / 1000.0 / (end - start));

    return NULL;
}

int main() {
    int args[] = {1, 10, 100, 1000, 10000, 100000, 1000000, 10000000};

    for (int i = 0; i < sizeof(args) / sizeof(args[0]); i++) {
        int arg = args[i];
        printf("Testing for arg=%d\n", arg);

        SimpleQueue queue;
        init_queue(&queue);

        pthread_t thread;
        pthread_create(&thread, NULL, thread_func, &queue);

        int cnt = 0;
        double deadline = monotonic_time() + PERIOD;

        while (monotonic_time() < deadline) {
            for (int j = 0; j < arg; j++) {
                cnt++;
                if (enqueue(&queue, true)) {
                    if (cnt % WAKEUP_INTERVAL == 0) {
                        sched_yield(); 
                    }
                }
            }
        }

        double stop = monotonic_time();
        enqueue(&queue, false);
        pthread_join(thread, NULL);

        printf("Produced: %12.2f Kitems/sec.\n", cnt / 1000.0 / (stop - (deadline - PERIOD)));
    }

    return 0;
}

The result here

╰─ ./demo1-binary 
Testing for arg=1
Consumed:     12440.90 Kitems/sec.
Produced:     41095.88 Kitems/sec.
Testing for arg=10
Consumed:     10455.09 Kitems/sec.
Produced:     86675.52 Kitems/sec.
Testing for arg=100
Consumed:     10505.89 Kitems/sec.
Produced:     97815.24 Kitems/sec.
Testing for arg=1000
Consumed:     10384.88 Kitems/sec.
Produced:     99435.68 Kitems/sec.
Testing for arg=10000
Consumed:     10458.63 Kitems/sec.
Produced:     99189.41 Kitems/sec.
Testing for arg=100000
Consumed:     10258.47 Kitems/sec.
Produced:     99326.16 Kitems/sec.
Testing for arg=1000000
Consumed:     10368.36 Kitems/sec.
Produced:     97798.35 Kitems/sec.
Testing for arg=10000000
Consumed:     10711.63 Kitems/sec.
Produced:     94015.14 Kitems/sec.

For CPU binding version

#define _GNU_SOURCE
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>
#include <stdbool.h>
#include <sys/time.h>
#include <sched.h>
#include <errno.h>

#define QUEUE_SIZE 1048576
#define PERIOD 2
#define WAKEUP_INTERVAL 100

typedef struct {
    bool buffer[QUEUE_SIZE];
    int head;
    int tail;
    pthread_mutex_t lock;
} SimpleQueue;

void init_queue(SimpleQueue *queue) {
    queue->head = 0;
    queue->tail = 0;
    pthread_mutex_init(&queue->lock, NULL);
    for (int i = 0; i < QUEUE_SIZE; i++) {
        queue->buffer[i] = false;
    }
}

bool enqueue(SimpleQueue *queue, bool value) {
    pthread_mutex_lock(&queue->lock);
    int next_tail = (queue->tail + 1) % QUEUE_SIZE;

    if (next_tail == queue->head) {
        // Queue is full
        pthread_mutex_unlock(&queue->lock);
        return false;
    }

    queue->buffer[queue->tail] = value;
    queue->tail = next_tail;
    pthread_mutex_unlock(&queue->lock);
    return true;
}

bool dequeue(SimpleQueue *queue, bool *value) {
    pthread_mutex_lock(&queue->lock);

    if (queue->head == queue->tail) {
        // Queue is empty
        pthread_mutex_unlock(&queue->lock);
        return false;
    }

    *value = queue->buffer[queue->head];
    queue->head = (queue->head + 1) % QUEUE_SIZE;
    pthread_mutex_unlock(&queue->lock);
    return true;
}

double monotonic_time() {
    struct timespec ts;
    clock_gettime(CLOCK_MONOTONIC, &ts);
    return ts.tv_sec + ts.tv_nsec / 1000000000.0;
}

void *thread_func(void *arg) {
    SimpleQueue *queue = (SimpleQueue *)arg;
    int consumed = 0;
    double start = monotonic_time();

    bool value;
    while (true) {
        if (dequeue(queue, &value) && value) {
            consumed++;
        } else {
            break; // exit loop when dequeue returns false or value is false
        }
    }

    double end = monotonic_time();
    printf("Consumed: %12.2f Kitems/sec.\n", consumed / 1000.0 / (end - start));

    return NULL;
}

void set_cpu_affinity() {
    cpu_set_t mask;
    CPU_ZERO(&mask);
    CPU_SET(1, &mask); 
    if (sched_setaffinity(0, sizeof(cpu_set_t), &mask) == -1) {
        perror("sched_setaffinity");
        exit(EXIT_FAILURE);
    }
}

int main() {

    set_cpu_affinity();


    cpu_set_t mask;
    CPU_ZERO(&mask);
    if (sched_getaffinity(0, sizeof(cpu_set_t), &mask) == -1) {
        perror("sched_getaffinity");
        exit(EXIT_FAILURE);
    }

    printf("Current CPU affinity: ");
    for (int i = 0; i < CPU_SETSIZE; i++) {
        if (CPU_ISSET(i, &mask)) {
            printf("%d ", i);
        }
    }
    printf("\n");

    int args[] = {1, 10, 100, 1000, 10000, 100000, 1000000, 10000000};

    for (int i = 0; i < sizeof(args) / sizeof(args[0]); i++) {
        int arg = args[i];
        printf("Testing for arg=%d\n", arg);

        SimpleQueue queue;
        init_queue(&queue);

        pthread_t thread;
        pthread_create(&thread, NULL, thread_func, &queue);

        int cnt = 0;
        double deadline = monotonic_time() + PERIOD;

        while (monotonic_time() < deadline) {
            for (int j = 0; j < arg; j++) {
                cnt++;
                if (enqueue(&queue, true)) {
                    if (cnt % WAKEUP_INTERVAL == 0) {
                        sched_yield(); 
                    }
                }
            }
        }

        double stop = monotonic_time();
        enqueue(&queue, false);
        pthread_join(thread, NULL);

        printf("Produced: %12.2f Kitems/sec.\n", cnt / 1000.0 / (stop - (deadline - PERIOD)));
    }

    return 0;
}

The result here

╰─ ╰─ ./demo2-binary
Current CPU affinity: 1
Testing for arg=1
Consumed:     99700.91 Kitems/sec.
Produced:     42461.31 Kitems/sec.
Testing for arg=10
Consumed:     96153.85 Kitems/sec.
Produced:     91034.38 Kitems/sec.
Testing for arg=100
Consumed:     93984.99 Kitems/sec.
Produced:    102509.55 Kitems/sec.
Testing for arg=1000
Consumed:     82101.88 Kitems/sec.
Produced:    103914.51 Kitems/sec.
Testing for arg=10000
Consumed:     85397.10 Kitems/sec.
Produced:    104227.12 Kitems/sec.
Testing for arg=100000
Consumed:     83542.15 Kitems/sec.
Produced:    104308.01 Kitems/sec.
Testing for arg=1000000
Consumed:     95147.54 Kitems/sec.
Produced:    103941.68 Kitems/sec.
Testing for arg=10000000
Consumed:    100100.11 Kitems/sec.
Produced:    103952.10 Kitems/sec.

@Zheaoli
Copy link
Contributor

Zheaoli commented Jul 28, 2024

@Eclips4 would you mind helping me reopen this issue? I think we do not find root cause yet.

@Eclips4
Copy link
Member

Eclips4 commented Jul 28, 2024

@Eclips4 would you mind helping me reopen this issue? I think we do not find root cause yet.

Do you think the problem is on the CPython side?

@Zheaoli
Copy link
Contributor

Zheaoli commented Jul 28, 2024

Do you think the problem is on the CPython side?

99.999% not in CPython side... lol

@Eclips4
Copy link
Member

Eclips4 commented Jul 28, 2024

Do you think the problem is on the CPython side?

99.999% not in CPython side... lol

If this isn't on the CPython side, then I think this issue doesn't need to be reopened. :)

@Zheaoli
Copy link
Contributor

Zheaoli commented Jul 28, 2024

If this isn't on the CPython side, then I think this issue doesn't need to be reopened. :)

Got it, I will try to get more detail about this and I may ask you for reopen this if I find the issue in CPython side

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type-bug An unexpected behavior, bug, or error
Projects
None yet
Development

No branches or pull requests

4 participants