-
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reimplement fifo as a proper lock free SCSP ring buffer #13877
base: 2.5
Are you sure you want to change the base?
Conversation
There are loads of ringbuffers in mixxx already and we already vendor https://github.com/rigtorp/SPSCQueue which has been proven to be correct. |
Sorry for not making mention of this earlier. I leave the decision to you whether you want to keep this implementation or use |
Oh, it is unfortunate I didn't know that. Also unfortunate that FIFO wasn't using that. Can we bring all these ring buffers back to a single implementations? |
rigtorp/SPSCQueue.h is not comparable though. It uses queue semantics, to push and pop single items. The ringbuffer I implemented (and what PaUtilRingBuffer does) is to read / write buffers (typically of samples), either by copy or by accessing regions in the ringbuffer directly. |
Thanks. I'll try to give this a review. Though I unfortunately don't have the most experience with multithreaded programming. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple surface-level comments. I'll follow up with a proper audit of the thread safety aspects later.
src/test/fifotest.cpp
Outdated
std::vector<float> data(1024); | ||
FIFO<float> fifo(1024); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a (potentially Value-Parameterized) test that tests a couple more (non power-of-two) queue sizes would make sense, wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, also the offset near the numeric limit of the indices should be parametrized
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note though that internally the queue sizes are always rounded up to the next power of two. Still, good to add parametrized tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, but if the size and the rounded up size are always the same and the implementation mistakenly uses the wrong values, the edge case is not caught. So even though it may not matter with the current implementation we should add a unit test to ensure this mistake is not made accidentally in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, sure, and once parametrized, we can add whatever size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done. please check and resolve.
src/util/fifo.h
Outdated
std::memcpy(pData, m_data.data() + (readIndex & m_mask), n * sizeof(DataType)); | ||
std::memcpy(pData + n, m_data.data(), (count - n) * sizeof(DataType)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pretty sure this requires https://en.cppreference.com/w/cpp/types/is_trivially_copyable, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, so then we should either consider adding a static_assert
for that or use the appropriate more generic algorithm (std::copy
?).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I’ll change it to std::copy. Old habits die hard and I often notice that you are not plagued by the baggage of over 30 years of hardcore C and primitive C++ 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
std::copy is slow:
Line 62 in 2397016
// Benchmark results on 32 bit SSE2 Atom Cpu (Linux) |
We want here a replacement for the original implementation, that only works with trivial types. For C++ objects, we have already an alternative.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fyi, A quick and dirty quickbench reveals that there is zero difference (apart from the zero-sized case, in which case std::copy
wins).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice. Thank you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I agree with Niko, my tests show the same. I find the for loop in mixxx/src/util/sample.h more dubious!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that probably worked around a missed inefficiency on 32-bit platforms, but I think that edge case is negligible nowadays. Who runs 32-bit binaries let alone 32-bit operating systems anymore (if 64-bit binaries are available)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using std::copy, please check and resolve.
src/util/fifo.h
Outdated
|
||
#include "util/class.h" | ||
#include "util/math.h" | ||
|
||
using ring_buffer_size_t = uint32_t; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you share some insight why not just std::size_t
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also works for me. Or uint_fast32_t. I gladly adapt to what you think fits the mixxx codebase best.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think for indexing into std::vector
it always converts to a std::size_t
anyways, so if we want to avoid that one sign extension instruction we should use whats most compatible with the underlying container (and avoid exposing it if possible).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. I’ll change it to size_t
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using std::size_t, please check and resolve.
std::vector<DataType> m_data; | ||
PaUtilRingBuffer m_ringBuffer; | ||
std::atomic<uint32_t> m_writeIndex; | ||
std::atomic<uint32_t> m_readIndex; | ||
DISALLOW_COPY_AND_ASSIGN(FIFO); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you implement move semantics and adhere to the rule of 5?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’m just following the original FIFO here, apparently this was sufficient for the current use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haven't looked at the diff, but I have a suspicion that the original implementation was written before move semantics were a thing (C++11
), so we might as well go the extra step. We then may also look for unnecessarily heap-allocated FIFO
s that workaround the lack of move semantics and inline those (the eliminated pointer dereference may result in a nice runtime speedup).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, but let’s do that in a follow up PR. The goal of these PRs is to get rid of all the tsan warnings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The heap allocation happens twice. One time by the std::vector and probably a second time for the control structure.
We may consider to use a std::array() based version where the size becomes a template parameter.
This way we get also around of the unneccessary default initalization of the vector.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If its feasible to shift the size calculation to compile-time, I'm all for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't looked at the actual use of this, so I don't know if using size as a template argument is even an option. Anyway, let's keep additional improvements beyond fixing the tsan issue for later PRs.
src/util/fifo.h
Outdated
uint32_t readAvailable() const { | ||
const uint32_t readIndex = m_readIndex.load(std::memory_order_relaxed); | ||
const uint32_t writeIndex = m_writeIndex.load(std::memory_order_acquire); | ||
return writeIndex - readIndex; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a comment about the case if a read or write happens between the wo atomic assesses?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cam you please also explain the use of the memory barriers? Is it correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I can write some comments about the atomics and the memory barriers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comments added. please review and resolve.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not see comment about the memory barriers and what happens if read and write happens in between reading the two atomics. Is it not yet pushed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is the last comment, after the declaration of the two atomics.
Without inspecting the code: Implementing lock-free data structures on your own is most often not a good idea. Not only in terms of correctness. It is also a maintenance burden. I have made this mistake myself. It is never too late to change your mind. Just a friendly advice. |
Agreed. This is probably a good time to post CppCon 2014: Herb Sutter "Lock-Free Programming (or, Juggling Razor Blades) |
This implementation supports bulk operations and should cover most use cases: https://github.com/cameron314/concurrentqueue With a single producer/consumer it should behave like an ordinary SPSC queue. Instead if vendoring a second library it might be worth migrating the |
In that case let's stick with the established Portaudio version. It is exactly designed for our use case of fast bulk transfer of samples and is just missing sanitizer animations. |
No, please! Thread sanitizer flags it up! Sure, it will work, but it is not correct. Also, seeing that Portaudio has not been thread sanitised, I would not be surprised if the hanger I experienced comes from Portaudio itself. But I consider code that does not execute cleanly with thread sanitizer broken! I know the pitfalls of lock-free programming very well. I am confident this implementation is correct. I am not against using other options, but I needed something that would easily adapt to the FIFO class API. None of the mentioned options fit this purpose. They are about reading / writing single items, not about reading / writing blocks of memory and accessing regions in the ringbuffer. If you consider that there is some opensource and established solution that adapts well to the FIFO class API, be my guest. But in the meantime, please don't block this PR until there is a better option. |
I am going to use revert the FIFO API to use int. I am having to touch way to many files to fixes the windows compilation. |
With force push, we need to review the whole PR over and over again. It is better to add commit that we can track the changes. If you like to have finally only one commit, feel free to squash everything before merge into the target branch. |
Yeah, sorry about that. I had all these casts added allover the code to make windows accept size_t instead of int as the return type, and I checkout just have checked out those files from 2.5 instead of rebase. Anyway, the only things to review are fifo.h itself and the fifotest.cpp, nothing else has changed now (in other words, it really is a drop-in replacement). |
Thank you for the coment about the atomics. This puts a certein requirement on readAvailable() and writeAvailable() can you document that at the function itself? Both read two atomics. Can you also described how it is deal with the situation a read and write happens betwwen the both accesses? I am a bit concerned, bcause the original implementation PaUtil_FullMemoryBarrier() diffrently. Is PaUtil_RingBuffer() broken because of this? I am in doubt, because it is established code we use without any issues for years. On the other hand we may introduce here extra barieres that may slow down the queue unnecessarily. However if we are convinced it is broken actually broken, we will suffer the same issue when using it via portaudio and we should contribut a fix back. Thats the reason why need to full undertand the whole topic. |
I think the fact that these are missing is by itself an argument against using it since that indicates IMO that the code is not well maintained (also evident from the commit history). Moreover, does it also make sense to make ourselves less dependent on PortAudio if we ever want to switch to another library that does care about features such as hotplug.
This is a C++ implementation though and the PA code is pure C, a port wouldn't be trivial if even feasible. |
I am not talking abaout contributing this new C++ Fifo back. If we are certein that the Portaudio FIFO has an issue, we need to point it out and propose a fix in the C domain. |
For me the fact that thread sanitizer flags this is up is sufficient. I treat this the same as compiler warnings. One might think that a warning is harmless, and if so, even use pragmas to disable a warning. I much prefer to treat all compiler warnings as errors, and actually fix them. I don't feel like going down the rabbit hole of investigating the PortAudio source code. I am sure this implementation is correct and it allows me to continue my thread sanitizer investigation. If you don't want to use it and prefer to stick with the original PortAudio ringbuffer, I would find that disappointing. But since it is a drop-in replacement, I will use it during my thread sanitizer investigation and that's it. And I will use it for my local builds. As for efficiency, there is one thing missing: the atomics should be aligned with the cache size, as documented here: |
It looks like we have mixed up things in the discussion. Let's clarify this:
Regarding this topic I have some concerns whether that the memory barriers are working but not unnecessarily introduce a performance penalty. Can you please describe it as source code comment?
I am exactly the same opinion.
That's OK for me, but the one who is stepping in needs your probably your help. Is that OK? I have the following ideas:
I think 1. is easy. Just uses these defines:
Can you confirm this? Do you have a CI run or something we can use for this test?
|
I added some multithread R/W tests. This is running without thread sanitizer. The variations are, for both my implementation and the PA ring buffer:
The test duration serves as a simple benchmark. As you can see, the differences are negligible. [----------] 8 tests from FifoTest As there is no winner performance wise, I see the following ways forwards:
The advantage of 2 is that we don't depend on PA in case we ever want to migrate to something else (I might consider using CoreAudio directly on macOS, if I would address the hot plug issues) and that we don't have to go through the hassle of patching PA. The advantage of 1 is that the PA ringbuffer has seen more milage than my implementation, but I am fully confident it is correct (and the tests and tsan show this). I don't have a strong opinion, so unless we all agree on 2, I think 1.b is the easiest way forward, and we keep my implementation lying around just in case we need in the future. And at least we have the FIFO covered with some unit tests! But if you feel like doing 1.a, adding tsan annotations to PA audio and provide me with a patch, I am happy to test that. |
// ringbuffer, the remainder is read from the start. | ||
int read(DataType* pData, size_type count) { | ||
size_type readIndex = m_readIndex.load(std::memory_order_relaxed); | ||
const size_type writeIndex = m_writeIndex.load(std::memory_order_acquire); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here as well, I think m_readIndex.load() can be moved after m_writeIndex.load().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No issue, same as above.
I consider it broken in the sense that it hasn't been updated to run correctly with thread sanitizer. |
Did you consider to add a single option to our CMakeList.txt that adds all the required options for tsan? That would make future work with it more easy? |
// Returns the space in the ringbuffer available for write | ||
int writeAvailable() const { | ||
const size_type readIndex = m_readIndex.load(std::memory_order_acquire); | ||
const size_type writeIndex = m_writeIndex.load(std::memory_order_relaxed); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following the logic below should we swap both loads?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it doesn't matter. The important thing here is that the m_readIndex is loaded with std::memory_order_acquire, as it is changed in the other thread, and we want to make sure we see changed value here. m_writeIndex will not be modified in any other thread than the current thread, so it's correct value will always be guaranteed in this thread. Swapping the two loads would make no difference.
DataType** dataPtr2, | ||
ring_buffer_size_t* sizePtr2) { | ||
const size_type readIndex = m_readIndex.load(std::memory_order_acquire); | ||
size_type writeIndex = m_writeIndex.load(std::memory_order_relaxed); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same here, swap statements?
} | ||
// Advance the read index with count values, or maximum until the write index. | ||
// Returns the new read index (wrapped inside the buffer size) | ||
int flushReadData(size_type count) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this function allowed from reader, writer or both? Do we need a second one for the other counterpart?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only allowed from reader, but apparently it is only used once (in ./engine/sidechain/shoutconnection.cpp). We do not need a second one for the counter part and in fact we might as well remove this one, and simply call releaseReadRegions there, which would amount to the same.
And releaseReadRegions and releaseWriteRegions would be more aptly named advanceReadIndex, advanceWriteIndex. But all of that is IMO beyond the PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, so documenting this as a source comment is sufficient.
// ringbuffer, the remainder is read from the start. | ||
int read(DataType* pData, size_type count) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add overloads with that take std::span
s instead that we use in new code instead? We can keep the ptr+size ones since this is a drop-in replacement if we document that the span-based one is preferred (assuming you agree with that). Ideally the implementation would also live in the std::span
variation instead, but I can understand if you don't want to do that refactoring. I can offer to do that refactoring instead in exchange if you review that refactored code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, but that is a refactoring of this class, independent of the underlying implementation, be it PA ring buffer, or my implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, you're right. Lets concentrate on getting the lock-free stuff right first.
Note: As mentioned with respect to the memory ordering: the read functions should only be called in the consumer thread, and the write functions only in the producer thread. But there is indeed nothing in the API that enforces this. But likewise, there is also nothing that protects the FIFO from being used by more than a single consumer and single producer. I can think of mechanisms to enforce this (at a cost), but I think it should be enough to simply document it. TSAN will detect such abuse anyway :-) |
Here is the pa_ringbuffer with tsan functions: |
I think in terms of functionality this is ready to go. I have two editorial complains:
After our discussion everything is obvious. My goal is that new readers shall not stumble the same way. |
Thanks, I will when I have a moment! |
I added -g flag
but I didn't get debug symbols, so I guess the ringbuffer from libportaudio itself is used? Anyway, I replaced PaUtil with PaMixxx and the local files and now I do get line info. And still tsan warnings, despite the _tsan.... barriers. Details
|
writeIndex = writeIndex & m_mask; | ||
const size_type n = std::min(m_size - writeIndex, count); | ||
std::copy(pData, pData + n, m_data.data() + writeIndex); | ||
std::copy(pData + n, pData + count, m_data.data()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This saves some µs
std::copy(pData + n, pData + count, m_data.data()); | |
if ((count - n) > 0) { | |
std::copy(pData + n, pData + count, m_data.data()); | |
} |
Testes with Intel Core Ultra 5 125U
Before
debug [Main] Stat("read 1","count=2714,sum=594029ns,average=218.876ns,min=29ns,max=6259ns,variance=61965ns^2,stddev=248.928ns")
debug [Main] Stat("read 65536","count=68,sum=1.38058e+06ns,average=20302.6ns,min=61ns,max=187712ns,variance=7.63838e+08ns^2,stddev=27637.6ns")
debug [Main] Stat("write 1","count=187,sum=55599ns,average=297.321ns,min=40ns,max=866ns,variance=12077.8ns^2,stddev=109.899ns")
debug [Main] Stat("write 2048","count=876,sum=1.54796e+06ns,average=1767.07ns,min=712ns,max=14696ns,variance=393266ns^2,stddev=627.109ns")
After
debug [Main] Stat("read 1","count=2726,sum=570707ns,average=209.357ns,min=28ns,max=3766ns,variance=41299.8ns^2,stddev=203.224ns")
debug [Main] Stat("read 65536","count=78,sum=1.51157e+06ns,average=19379.1ns,min=73ns,max=184084ns,variance=7.06956e+08ns^2,stddev=26588.6ns")
debug [Main] Stat("write 1","count=183,sum=44592ns,average=243.672ns,min=70ns,max=619ns,variance=10338.9ns^2,stddev=101.681ns")
debug [Main] Stat("write 2048","count=994,sum=1.70078e+06ns,average=1711.05ns,min=625ns,max=15022ns,variance=464792ns^2,stddev=681.757ns")
``
readIndex = readIndex & m_mask; | ||
const size_type n = std::min(m_size - readIndex, count); | ||
std::copy(m_data.data() + readIndex, m_data.data() + readIndex + n, pData); | ||
std::copy(m_data.data(), m_data.data() + count - n, pData + n); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
std::copy(m_data.data(), m_data.data() + count - n, pData + n); | |
if ((count - n) > 0) { | |
std::copy(m_data.data(), m_data.data() + count - n, pData + n); | |
} |
Reimplement fifo as a proper lock free SCSP ring buffer with atomics for thread-safety instead of using PaUtilRingBuffer as a backend.
Added unit tests.
One might consider that the tsan warnings for PaUtilRingBuffer or not "serious" but they sure are ugly and spam my tsan logs.
Fixes #13864 #13863 #13866 #13868