-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor channels.trySend/tryRecv and improve tests #74
Conversation
Currently trySend does not follow the API that was introduced in nim-lang/RFCs#347 which produces different code as evident by the tchannels_singlebuf test. proc trySend*[T](c: Chan[T], src: sink Isolated[T]): bool
template trySend*[T](c: Chan[T], src: T): bool =
trySend(c, isolate(src)) # --expandarc:trySend
var data
data = extract(src)
result = channelSend(c.d, addr(data), 16, 0)
if result:
wasMoved(data)
`=destroy`(src)
`=destroy`(data)
# --expandarc:test
var msg_cursor
var notSent = true
msg_cursor = "Hello"
block :tmp:
while notSent:
var :tmpD
notSent = not
mixin isolate
trySend(chan, isolate do:
:tmpD = `=dup`(msg_cursor)
:tmpD)
if notSent:
atomicInc(attempts, 1) While the following modification generates: proc trySend*[T](c: Chan[T], src: var Isolated[T]): bool
template trySend*[T](c: Chan[T], src: T): bool =
mixin isolate
var p = isolate(src)
trySend(c, p) # --expandarc:trySend
var data
data = extract(src)
result = channelSend(c.d, addr(data), 16, 0)
if result:
wasMoved(data)
`=destroy`(data)
# --expandarc:test
var msg_cursor
var notSent = true
msg_cursor = "Hello"
block :tmp:
while notSent:
var
p`gensym0
:tmpD
notSent = not
mixin isolate
p`gensym0 = isolate do:
:tmpD = `=dup`(msg_cursor)
:tmpD
trySend(chan, p`gensym0)
if notSent:
atomicInc(attempts, 1)
`=destroy`(p`gensym0) |
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
@Araq the changes for now seem correct. Fwiw the trySend template should be avoided in favor of: proc test(chan: Chan[string]) {.thread.} =
var notSent = true
var msg = Message
var p = isolate(msg)
while notSent:
let notSent = not chan.trySend(p)
if notSent:
atomicInc(attempts) Done. |
This comment was marked as outdated.
This comment was marked as outdated.
This PR makes a significant change to the channel implementation with adding the |
@ZoomRmc it fixes a major issue with the non-blocking variant namely:
at least for me, it makes no sense to keep this limitation as the function's contract. Can you name a reason, I may be missing? |
Now that I re-read the warning, I believe it's incorrect to begin with as waiting is done only by the blocking send/recv calls and that usually causes the lock to be released automatically by the wait function. So the tryAcquire change if anything, I would assume, reduces lock contention caused by the trySend/tryRecv calls when the channel is full/empty. |
Here's what claude 3.5 thinks about this change, after correcting it (it assumed that trySend/tryRecv used the condition variables before) https://claude.site/artifacts/fec48ef8-43da-4d49-949b-f3c94c305d21 |
Sorry, LLMs are useful for rubberducking but they are laughably bad at reasoning about concurrency unless driven to consider all the edge cases by the user, at which point it most probably means user already has gathered the mental model in their head and there's no point in providing LLM's output. At the very least, not blocking on lock acquiring means you're probably trading one possible failure mode to another: deadlock to an infinite loop. I can't think of anything being wrong with the change off the cuff, but my gut tells me it needs careful consideration and discussion. Even if it's all sound, this is a significant change that needs to be thought through for multiple possible combinations of [try]send/[try]receive and numbers of senders and receivers. It would be proper to have the expected logic laid out in the PR description. |
@ZoomRmc alright, I will run a benchmark latter tonight to counter your criticism, and I would have a better impression of this change. However I am not sure if I can test every variation that you require unless you can propose some existing benchmark.
I will update the PR's description, however I am curious if you can point me to another PR where your reasoning is explained in detail, if anything I am interested in learning. Cheer's! |
Can you be more explicit on which part(s) of the artifact you disagree with, just so we're in the same page. |
I'm just against using LLM's output as supporting evidence in support of a point in a hard context in general. They are designed to produce extremely convincing and at the same time satisfying answers, just as any eager person with a huge confirmation bias.
Unfortunately, I don't think I really can. There's a few questions regarding the correctness of the implementation that I posed at different times (such as this "BTW" part). I also commented on a couple of issues going through some parts of the logic. However, I'm neither the author nor the expert. The current implementation mostly follows the original by @aprell and relies on his thesis being correct plus gradual fixing of discovered bugs and accompanying discussions happening in this repo. I don't really have any criticism at the moment, as I'm at a different headspace. (I might have some later, though :D) I'm just warning against making changes to the logic in a "test" PR so it doesn't fly under the radars of anyone who's interested in the module's implementation. In my opinion this requires splitting the PR and, perhaps, grants its own Issue. |
Another part of the PR that changed in accordance to RFC#347 is the removal of the extract logic: # Current implementation
# a sink parameter in loops creates copies
proc trySend*[T](c: Chan[T], src: sink Isolated[T]): bool {.inline.} =
var data = src.extract # this is basically (bitwiseCopy data, src.value; wasMoved(src.value))
result = channelSend(c.d, data.addr, sizeof(T), false)
if result:
wasMoved(data)
# data need to be destroyed
# Proposed changes
proc trySend*[T](c: Chan[T], src: var Isolated[T]): bool {.inline.} =
result = channelSend(c.d, src.addr, sizeof(T), false) # no extract call
if result:
wasMoved(src)
# no destructor call here as src is a var parameter which I also posted on discord for feedback, and I am reposting here so it doesn't get lost. Imo it's correct, as my spsc channel implementation in planetis/sync went through similar changes. |
@ZoomRmc from your comment in #27:
This is definitely a valid concern for using broadcast, or we might even need to eventually rethink how channels is implemented and reuse similar logic as in https://github.com/planetis-m/sync/blob/f0174325d4c66e28475781bc0a436e7ab245acaf/tests/tsemaphore.nim
I see that trySend/tryRecv definitely follows the logic from that repo, but send/recv are not in there. And a big change is that there're TWO locks in that design but here's only one! So it's definitely changed significantly, so I don't see your point about a thesis from another project being correct. This is definitely something I need to change in a follow up PR. |
Alright benchmarks results: Benchmark codeimport threading/channels, std/os, times, stats, cpuinfo, atomics
type
Message = array[10, int]
const
NumIterations = 1_000_000
ChannelSize = 100
type
ThreadArg = object
chan: Chan[Message]
iterations: int
successCount: ptr Atomic[int]
failureCount: ptr Atomic[int]
proc producerThread(arg: ThreadArg) {.thread.} =
var msg: Message
for i in 1..arg.iterations:
msg[0] = i # Just to have some changing data
if arg.chan.trySend(msg):
discard arg.successCount[].fetchAdd(1)
else:
discard arg.failureCount[].fetchAdd(1)
cpuRelax()
proc consumerThread(arg: ThreadArg) {.thread.} =
var msg: Message
for i in 1..arg.iterations:
if arg.chan.tryRecv(msg):
discard arg.successCount[].fetchAdd(1)
else:
discard arg.failureCount[].fetchAdd(1)
cpuRelax()
proc runBenchmark(): (float, int, int, int, int) =
let numThreads = countProcessors()
let numProducers = numThreads div 2
let numConsumers = numThreads - numProducers
var chan = newChan[Message](elements = ChannelSize)
var producers: seq[Thread[ThreadArg]]
var consumers: seq[Thread[ThreadArg]]
producers.setLen(numProducers)
consumers.setLen(numConsumers)
var sendSuccessCount, sendFailureCount, recvSuccessCount, recvFailureCount: Atomic[int]
sendSuccessCount.store(0)
sendFailureCount.store(0)
recvSuccessCount.store(0)
recvFailureCount.store(0)
let iterationsPerThread = NumIterations div numThreads
let producerArg = ThreadArg(
chan: chan,
iterations: iterationsPerThread,
successCount: addr sendSuccessCount,
failureCount: addr sendFailureCount
)
let consumerArg = ThreadArg(
chan: chan,
iterations: iterationsPerThread,
successCount: addr recvSuccessCount,
failureCount: addr recvFailureCount
)
let start = cpuTime()
# Start producer threads
for i in 0 ..< numProducers:
createThread(producers[i], producerThread, producerArg)
# Start consumer threads
for i in 0 ..< numConsumers:
createThread(consumers[i], consumerThread, consumerArg)
# Wait for all threads to finish
for thread in producers:
joinThread(thread)
for thread in consumers:
joinThread(thread)
let duration = cpuTime() - start
return (duration, sendSuccessCount.load, sendFailureCount.load,
recvSuccessCount.load, recvFailureCount.load)
when isMainModule:
var benchmarkTimes: RunningStat
let numRuns = 5
let numThreads = countProcessors()
let numProducers = numThreads div 2
let numConsumers = numThreads - numProducers
echo "Running MPMC benchmark using ", numThreads, " threads"
echo "Producers: ", numProducers, ", Consumers: ", numConsumers
echo "Channel size: ", ChannelSize
echo "Message type: array[10, int]"
echo "Each benchmark will run ", numRuns, " times"
for i in 1..numRuns:
echo "\nRun ", i
let (time, sendSuccess, sendFailure, recvSuccess, recvFailure) = runBenchmark()
benchmarkTimes.push time
echo "Time: ", time, " seconds"
echo "trySend success: ", sendSuccess, ", failure: ", sendFailure
echo "trySend success rate: ", (sendSuccess.float / (sendSuccess + sendFailure).float * 100), "%"
echo "tryRecv success: ", recvSuccess, ", failure: ", recvFailure
echo "tryRecv success rate: ", (recvSuccess.float / (recvSuccess + recvFailure).float * 100), "%"
sleep(100) # Short pause between runs
echo "\nMPMC Benchmark Results:"
echo "Mean time: ", benchmarkTimes.mean, " seconds"
echo "Standard deviation: ", benchmarkTimes.standardDeviation, " seconds" Results without the changesRunning MPMC benchmark using 8 threads Producers: 4, Consumers: 4 Channel size: 100 Message type: array[10, int] Each benchmark will run 5 timesRun 1 Run 2 Run 3 Run 4 Run 5 MPMC Benchmark Results: Changing acquire to tryAcquireRunning MPMC benchmark using 8 threads Producers: 4, Consumers: 4 Channel size: 100 Message type: array[10, int] Each benchmark will run 5 timesRun 1 Run 2 Run 3 Run 4 Run 5 MPMC Benchmark Results: Most noticeable change is the sharp increase in failure rate. Will investigate further tomorrow. |
Well, the other benefit of a lock is being able to wait on it, leveraging the OS and not just constantly polling. Some thoughts to consider: Why does one need guaranteed non-blocking ops anyway? I think it's not just so you can burn cycles in a loop, but perhaps to be able to further manipulate the task that uses the channel. For example: cancel it or set a timeout limit. However, this module does not provide any means for that, the user needs to implement it either by bolting on some additional synchronization mechanism to the channel or to use a signal sent via the channel itself. So, what possible cases really suffer from I suspect that the scenarios above will be much less common for the use-cases of this module than what your benchmark demonstrates: simply trying in a loop. So, to me it looks like a tradeoff in which using this module as a building block for a more complex abstraction will possibly benefit but the basic use-case will be at some degree of performance disadvantage. It's late and I'm probably missing something, but I hope I'm not totally off. |
Another benchmark measuring throughput. Benchmark codeimport threading/channels, std/os, times, stats, cpuinfo, atomics, strutils, sequtils
type
Message = array[10, int]
const
BenchmarkDuration = 5.0 # seconds
ChannelSize = 100
type
ThreadArg = object
chan: Chan[Message]
runningFlag: ptr Atomic[bool]
successCount: ptr Atomic[int]
failureCount: ptr Atomic[int]
proc producerThread(arg: ThreadArg) {.thread.} =
var msg: Message
var i = 0
while arg.runningFlag[].load(moRelaxed):
inc i
msg[0] = i # Just to have some changing data
if arg.chan.trySend(msg):
discard arg.successCount[].fetchAdd(1)
else:
discard arg.failureCount[].fetchAdd(1)
cpuRelax()
proc consumerThread(arg: ThreadArg) {.thread.} =
var msg: Message
while arg.runningFlag[].load(moRelaxed):
if arg.chan.tryRecv(msg):
discard arg.successCount[].fetchAdd(1)
else:
discard arg.failureCount[].fetchAdd(1)
cpuRelax()
proc runBenchmark(): (float, int, int, int, int) =
let numThreads = countProcessors()
let numProducers = numThreads div 2
let numConsumers = numThreads - numProducers
var chan = newChan[Message](elements = ChannelSize)
var producers: seq[Thread[ThreadArg]]
var consumers: seq[Thread[ThreadArg]]
producers.setLen(numProducers)
consumers.setLen(numConsumers)
var runningFlag: Atomic[bool]
runningFlag.store(true)
var sendSuccessCount, sendFailureCount, recvSuccessCount, recvFailureCount: Atomic[int]
sendSuccessCount.store(0)
sendFailureCount.store(0)
recvSuccessCount.store(0)
recvFailureCount.store(0)
let producerArg = ThreadArg(
chan: chan,
runningFlag: addr runningFlag,
successCount: addr sendSuccessCount,
failureCount: addr sendFailureCount
)
let consumerArg = ThreadArg(
chan: chan,
runningFlag: addr runningFlag,
successCount: addr recvSuccessCount,
failureCount: addr recvFailureCount
)
# Start producer threads
for i in 0 ..< numProducers:
createThread(producers[i], producerThread, producerArg)
# Start consumer threads
for i in 0 ..< numConsumers:
createThread(consumers[i], consumerThread, consumerArg)
# Run for fixed duration
sleep(int(BenchmarkDuration * 1000))
# Stop all threads
runningFlag.store(false)
# Wait for all threads to finish
for thread in producers:
joinThread(thread)
for thread in consumers:
joinThread(thread)
return (BenchmarkDuration, sendSuccessCount.load, sendFailureCount.load,
recvSuccessCount.load, recvFailureCount.load)
when isMainModule:
var sendThroughput, recvThroughput: RunningStat
let numRuns = 5
let numThreads = countProcessors()
let numProducers = numThreads div 2
let numConsumers = numThreads - numProducers
echo "Running MPMC throughput benchmark using ", numThreads, " threads"
echo "Producers: ", numProducers, ", Consumers: ", numConsumers
echo "Channel size: ", ChannelSize
echo "Message type: array[10, int]"
echo "Benchmark duration: ", BenchmarkDuration, " seconds"
echo "Number of runs: ", numRuns
for i in 1..numRuns:
echo "\nRun ", i
let (duration, sendSuccess, sendFailure, recvSuccess, recvFailure) = runBenchmark()
let sendTotal = sendSuccess + sendFailure
let recvTotal = recvSuccess + recvFailure
let sendThroughputOps = sendTotal.float / duration
let recvThroughputOps = recvTotal.float / duration
sendThroughput.push sendThroughputOps
recvThroughput.push recvThroughputOps
echo "Send throughput: ", sendThroughputOps.formatFloat(ffDecimal, 2), " ops/s"
echo "Send success rate: ", (sendSuccess.float / sendTotal.float * 100).formatFloat(ffDecimal, 2), "%"
echo "Receive throughput: ", recvThroughputOps.formatFloat(ffDecimal, 2), " ops/s"
echo "Receive success rate: ", (recvSuccess.float / recvTotal.float * 100).formatFloat(ffDecimal, 2), "%"
sleep(100) # Short pause between runs
echo "\nMPMC Benchmark Results:"
echo "Mean send throughput: ", sendThroughput.mean.formatFloat(ffDecimal, 2), " ops/s"
echo "Send throughput std dev: ", sendThroughput.standardDeviation.formatFloat(ffDecimal, 2), " ops/s"
echo "Mean receive throughput: ", recvThroughput.mean.formatFloat(ffDecimal, 2), " ops/s"
echo "Receive throughput std dev: ", recvThroughput.standardDeviation.formatFloat(ffDecimal, 2), " ops/s" Results without the changesRunning MPMC throughput benchmark using 8 threads Producers: 4, Consumers: 4 Channel size: 100 Message type: array[10, int] Benchmark duration: 5.0 seconds Number of runs: 5Run 1 Run 2 Run 3 Run 4 Run 5 MPMC Benchmark Results: Changing acquire to tryAcquireRunning MPMC throughput benchmark using 8 threads Producers: 4, Consumers: 4 Channel size: 100 Message type: array[10, int] Benchmark duration: 5.0 seconds Number of runs: 5Run 1 Run 2 Run 3 Run 4 Run 5 MPMC Benchmark Results: Notice the 4x-6x increase in throughput. |
Well, no surprise you get such an increase if you count messages that weren't sent in the throughput. Counting only successfully sent/received messages:
Old:
New (change below):
9% decrease in throughput The only change to the # channelSend
when not blocking: if chan.isFull() or not tryAcquire(chan.lock: return false
else: acquire(chan.lock)
# ... channelReceive
when not blocking: if chan.isEmpty() or not tryAcquire(chan.lock: return false
else: acquire(chan.lock) |
EDIT: Adding exponential backoff makes the benchmark results indifferent. Both implementations reach 98% success rate. const
InitialBackoff = 1 # microseconds
MaxBackoff = 16 # microseconds
proc exponentialBackoff(backoff: var int) =
if backoff < MaxBackoff:
backoff *= 2
sleep(backoff)
backoff = max(backoff, InitialBackoff)
proc producerThread(arg: ThreadArg) {.thread.} =
var msg: Message
var backoff = InitialBackoff
for i in 1..arg.iterations:
msg[0] = i # Just to have some changing data
if arg.chan.trySend(msg):
discard arg.successCount[].fetchAdd(1)
backoff = InitialBackoff # Reset backoff on success
else:
discard arg.failureCount[].fetchAdd(1)
exponentialBackoff(backoff)
cpuRelax() |
@Araq it's your call. This PR is complete, please review. |
There's still no reasoning provided for this change. For the basic case in your first benchmark, the throughput is getting worse. Another thing is that I suspect this can facilitate worse designs down the line. If your program spends any considerable time blocking on the lock, that means the provisioning (number or workers and channel size) is most probably off. |
threading/channels.nim
Outdated
@@ -286,39 +288,43 @@ proc `=copy`*[T](dest: var Chan[T], src: Chan[T]) = | |||
`=destroy`(dest) | |||
dest.d = src.d | |||
|
|||
proc trySend*[T](c: Chan[T], src: sink Isolated[T]): bool {.inline.} = | |||
proc trySend*[T](c: Chan[T], src: var Isolated[T]): bool {.inline.} = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was this change necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because trySend can fail and that means whatever was sinked should be destroyed. And if trySend is used in a loop that creates copies. Maybe T cannot be copied. This PR removes all possible copies.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But then trySend(chan, isolate MyObject(...))
does not compile anymore. Maybe introduce trySendMut
instead (or something with a better name).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, I added trySendMut (name seems fine).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, the name sucks as it stresses what happens but not why it happens. The name should be trySendNocopy
but then people (myself included) wonder why the sink
does not ensure "no copy" already. What is really going on here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no problem here with sink
or copies per se. sink
takes over ownership, always, and ideally trySend
takes over ownership conditionally. As such var
works better but one needs to watch out not to reuse the object afterwards.
Maybe it should be named tryTake
.
In your previous comment you said that you noticed a The reasoning for this change I explained earlier in the comments, if you have something to contest, let's do it in good faith. I am willing to listen if you have some actual channels usage that's impacted by this change not possible scenarios. Here's an idea, let's take the discussion on to discord. Maybe we can come up with something. |
threading/channels.nim
Outdated
@@ -187,8 +187,9 @@ proc channelSend(chan: ChannelRaw, data: pointer, size: int, blocking: static bo | |||
|
|||
when not blocking: | |||
if chan.isFull(): return false | |||
|
|||
acquire(chan.lock) | |||
if not tryAcquire(chan.lock): return false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused. Didn't you say you removed this new logic again?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, the documentation states that this version of send doesn't block. I assume it would need to use a non-blocking version for acquiring the locks. I made two multiple producers-multiple consumers benchmarks and I see high contention. Notice that there's a single lock used by both producers and consumers and I assume that's the primary reason for the high contention. Using tryAcquire
as expected increases the number of attempted calls (4x-6x), but using acquire
instead improves the ratio of successful calls (from 10% to 50%). (however the total number of successful calls remains about the same). I wonder if that's just a side-effect from the fairness guarantees that are implemented for normal locks. Then I implemented exponential backoff for both benchmarks. The success rates improved to 98%, it's clearly the better mechanism to fix the issue. As such I claim that acquire
is not needed here, if anything trySend/tryRecv now behaves similarly to same named functions from a lock-free channel.
I haven't confirmed anything like that. As I said above, to get the idea we'd need to at the very least check a couple of other scenarios (one-to-many, many-to-one).
The only things I see that could pass as that is "it makes no sense to keep this limitation as the function's contract", the link to the RFC that's not accepted yet and this comment from you.
Absolutely, I've laid some of my concerns above openly and don't have any hidden motivation in any of the questions in previous comments.
It's your PR so perhaps it should be the other way round. |
This PR contains several important bug-fixes and API additions for channels, and there's no reason not to merge it. After all the docs state:
|
My bad, I missed the "revert to plain acquire" which kept me from merging it. |
Thank you very much! |
This PR refactors the logic for the non-blocking channels as well as the relevant test.
proc trySend*[T](c: Chan[T], src: var Isolated[T]): bool
as discussed in the comments.