Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor channels.trySend/tryRecv and improve tests #74

Merged
merged 22 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions tests/tchannels_cooperative.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,50 +10,50 @@ const
sentmsg = "task sent"

type
Payload = tuple[chan: ptr Chan[int16], idx: int16]
Payload = tuple[chan: Chan[int16], idx: int16]

var
sentmessages = newSeqOfCap[string](NTasks)
receivedmessages = newSeqOfCap[int16](NTasks)

# A prototype of a task executing thread
proc runner(tasksCh: ptr Chan[Payload]) {.thread.} =
proc runner(tasksCh: Chan[Payload]) {.thread.} =
var p: Payload
while true:
tasksCh[].recv(p) # Get a message from the main thread
tasksCh.recv(p) # Get a message from the main thread
if p.idx == -1: break # Check for an ad hoc stop signal
else:
sleep(SleepDurationMS) # Hard work
p.chan[].send(p.idx) # Notify a consumer
p.chan.send(p.idx) # Notify a consumer

# A single thread receiving result from runner threads
proc consumer(args: tuple[resultsCh: ptr Chan[int16], tasks: int16]) {.thread.} =
proc consumer(args: tuple[resultsCh: Chan[int16], tasks: int16]) {.thread.} =
var idx: int16
for _ in 0..<args.tasks: # We know the number of tasks and wait for them all
args.resultsCh[].recv(idx)
args.resultsCh.recv(idx)
{.gcsafe.}: # Don't do this. Here we know it's an exclusive access
receivedmessages.add(idx) # Store which task was completed

proc main(chanSize: Natural) =
sentmessages.setLen(0)
receivedmessages.setLen(0)
var
taskThreads = newSeq[Thread[ptr Chan[Payload]]](countProcessors())
taskThreads = newSeq[Thread[Chan[Payload]]](countProcessors())
tasksCh = newChan[Payload](chanSize)
consumerTh: Thread[(ptr Chan[int16], int16)]
consumerTh: Thread[(Chan[int16], int16)]
resultsCh = newChan[int16](chanSize)

# Consumer must be ready first to not block
createThread(consumerTh, consumer, (resultsCh.addr, NTasks))
createThread(consumerTh, consumer, (resultsCh, NTasks))
# Start runner threads
for i in 0..high(taskThreads): createThread(taskThreads[i], runner, tasksCh.addr)
for i in 0..high(taskThreads): createThread(taskThreads[i], runner, tasksCh)
# Loop iterating fake data
for idx in 0'i16..<NTasks:
tasksCh.send((resultsCh.addr, idx))
for idx in 0'i16..<NTasks:
tasksCh.send((resultsCh, idx))
sentmessages.add(sentmsg)

for _ in taskThreads: # Stopping worker threads
tasksCh.send((resultsCh.addr, -1'i16)) # A thread can't get more than 1 stop signal
tasksCh.send((resultsCh, -1'i16)) # A thread can't get more than 1 stop signal
joinThreads(taskThreads)
joinThread(consumerTh)

Expand All @@ -69,10 +69,10 @@ template runTests(bufferSize: Positive) =
var set = {0..NTasks-1}
for i in receivedmessages: set.excl(i)
doAssert set == {}


block buffered_channels:
runTests(bufferSize = 2)
runTests(bufferSize = 2)

block unbuffered_channels:
runTests(bufferSize = 1)
44 changes: 32 additions & 12 deletions tests/tchannels_singlebuf.nim
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,60 @@
## https://github.com/nim-lang/threading/pull/27#issue-1652851878
## Also tests `trySend` and `tryRecv` templates.

import threading/channels
import threading/channels, std/os
const Message = "Hello"

block trySend_recv:
proc test(chan: ptr Chan[string]) {.thread.} =
var attempts = 0

proc test(chan: Chan[string]) {.thread.} =
var notSent = true
var msg = Message
let msg = Message
while notSent:
notSent = not chan[].trySend(msg)
notSent = not chan.trySend(msg)
if notSent:
atomicInc(attempts)

var chan = newChan[string](elements = 1)
var thread: Thread[ptr Chan[string]]
var dest: string
# Fill the channel before spawning the thread
chan.send("Dummy message")

var thread: Thread[Chan[string]]
createThread(thread, test, chan)
sleep 10

createThread(thread, test, chan.addr)
# Receive the dummy message to make room for the real message
discard chan.recv()

var dest: string
chan.recv(dest)
doAssert dest == Message

thread.joinThread()
doAssert attempts > 0, "trySend should have been attempted multiple times"


block send_tryRecv:
proc test(chan: ptr Chan[string]) {.thread.} =
var attempts = 0

proc test(chan: Chan[string]) {.thread.} =
var notReceived = true
var msg: string
while notReceived:
notReceived = not chan[].tryRecv(msg)
notReceived = not chan.tryRecv(msg)
if notReceived:
atomicInc(attempts)
doAssert msg == Message

var chan = newChan[string](elements = 1)
var thread: Thread[ptr Chan[string]]
let src = Message

createThread(thread, test, chan.addr)
var thread: Thread[Chan[string]]
createThread(thread, test, chan)
sleep 10

let src = Message
chan.send(src)

thread.joinThread()
doAssert attempts > 0, "tryRecv should have been attempted multiple times"

10 changes: 5 additions & 5 deletions tests/tsmartptrsleak.nim
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import threading/smartptrs
import std/isolation
import std/locks
import threading/atomics
import std/atomics
import threading/channels

var
Expand All @@ -12,10 +12,10 @@ type

when defined(nimAllowNonVarDestructor):
proc `=destroy`(obj: TestObj) =
discard freeCounts.fetchAdd(1, Release)
discard freeCounts.fetchAdd(1, moRelease)
else:
proc `=destroy`(obj: var TestObj) =
discard freeCounts.fetchAdd(1, Release)
discard freeCounts.fetchAdd(1, moRelease)

var
thr: array[0..1, Thread[void]]
Expand Down Expand Up @@ -50,6 +50,6 @@ createThread(thr[0], threadA)
createThread(thr[1], threadB)
joinThreads(thr)

echo "freeCounts: got: ", $int(freeCounts), " expected: ", N
echo "freeCounts: got: ", load(freeCounts, moRelaxed), " expected: ", N
echo ""
assert freeCounts.load(Acquire) == N
assert freeCounts.load(moRelaxed) == N
85 changes: 53 additions & 32 deletions threading/channels.nim
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
## the underlying resources and synchronization. It has to be initialized using
## the `newChan` proc. Sending and receiving operations are provided by the
## blocking `send` and `recv` procs, and non-blocking `trySend` and `tryRecv`
## procs. Send operations add messages to the channel, receiving operations
## procs. Send operations add messages to the channel, receiving operations
## remove them.
##
##
## See also:
## * [std/isolation](https://nim-lang.org/docs/isolation.html)
##
Expand Down Expand Up @@ -289,68 +289,90 @@ proc `=copy`*[T](dest: var Chan[T], src: Chan[T]) =
proc trySend*[T](c: Chan[T], src: sink Isolated[T]): bool {.inline.} =
## Tries to send the message `src` to the channel `c`.
##
## The memory of `src` is moved, not copied.
## The memory of `src` will be moved if possible.
## Doesn't block waiting for space in the channel to become available.
## Instead returns after an attempt to send a message was made.
##
## .. warning:: Blocking is still possible if another thread uses the blocking
## version of the `send proc`_ / `recv proc`_ and waits for the
## data/space to appear in the channel, thus holding the internal lock to
## channel's buffer.
##
## .. warning:: In high-concurrency situations, consider using an exponential
## backoff strategy to reduce contention and improve the success rate of
## operations.
##
## Returns `false` if the message was not sent because the number of pending
## messages in the channel exceeded its capacity.
var data = src.extract
result = channelSend(c.d, data.unsafeAddr, sizeof(T), false)
result = channelSend(c.d, src.addr, sizeof(T), false)
if result:
wasMoved(data)
wasMoved(src)

template trySend*[T](c: Chan[T], src: T): bool =
## Helper template for `trySend <#trySend,Chan[T],sinkIsolated[T]>`_.
##
## .. warning:: For repeated sends of the same value, consider using the
## `tryTake <#tryTake,Chan[T],varIsolated[T]>`_ proc with a pre-isolated
## value to avoid unnecessary copying.
mixin isolate
trySend(c, isolate(src))

proc tryTake*[T](c: Chan[T], src: var Isolated[T]): bool {.inline.} =
## Tries to send the message `src` to the channel `c`.
##
## The memory of `src` is moved directly. Be careful not to reuse `src` afterwards.
## This proc is suitable when `src` cannot be copied.
##
## Doesn't block waiting for space in the channel to become available.
## Instead returns after an attempt to send a message was made.
##
## .. warning:: In high-concurrency situations, consider using an exponential
## backoff strategy to reduce contention and improve the success rate of
## operations.
##
## Returns `false` if the message was not sent because the number of pending
## messages in the channel exceeded its capacity.
result = channelSend(c.d, src.addr, sizeof(T), false)
if result:
wasMoved(src)

proc tryRecv*[T](c: Chan[T], dst: var T): bool {.inline.} =
## Tries to receive a message from the channel `c` and fill `dst` with its value.
##
##
## Doesn't block waiting for messages in the channel to become available.
## Instead returns after an attempt to receive a message was made.
##
## .. warning:: Blocking is still possible if another thread uses the blocking
## version of the `send proc`_ / `recv proc`_ and waits for the data/space to
## appear in the channel, thus holding the internal lock to channel's buffer.
##
##
## .. warning:: In high-concurrency situations, consider using an exponential
## backoff strategy to reduce contention and improve the success rate of
## operations.
##
## Returns `false` and does not change `dist` if no message was received.
channelReceive(c.d, dst.addr, sizeof(T), false)

proc send*[T](c: Chan[T], src: sink Isolated[T]) {.inline.} =
## Sends the message `src` to the channel `c`.
## Sends the message `src` to the channel `c`.
## This blocks the sending thread until `src` was successfully sent.
##
## The memory of `src` is moved, not copied.
##
##
## The memory of `src` is moved, not copied.
##
## If the channel is already full with messages this will block the thread until
## messages from the channel are removed.
var data = src.extract
when defined(gcOrc) and defined(nimSafeOrcSend):
GC_runOrc()
discard channelSend(c.d, data.unsafeAddr, sizeof(T), true)
wasMoved(data)
discard channelSend(c.d, src.addr, sizeof(T), true)
wasMoved(src)

template send*[T](c: Chan[T]; src: T) =
## Helper template for `send`.
mixin isolate
send(c, isolate(src))

proc recv*[T](c: Chan[T], dst: var T) {.inline.} =
## Receives a message from the channel `c` and fill `dst` with its value.
##
## Receives a message from the channel `c` and fill `dst` with its value.
##
## This blocks the receiving thread until a message was successfully received.
##
##
## If the channel does not contain any messages this will block the thread until
## a message get sent to the channel.
discard channelReceive(c.d, dst.addr, sizeof(T), true)

proc recv*[T](c: Chan[T]): T {.inline.} =
## Receives a message from the channel.
## Receives a message from the channel.
## A version of `recv`_ that returns the message.
discard channelReceive(c.d, result.addr, sizeof(result), true)

Expand All @@ -367,9 +389,8 @@ proc peek*[T](c: Chan[T]): int {.inline.} =

proc newChan*[T](elements: Positive = 30): Chan[T] =
## An initialization procedure, necessary for acquiring resources and
## initializing internal state of the channel.
##
## `elements` is the capacity of the channel and thus how many messages it can hold
## initializing internal state of the channel.
##
## `elements` is the capacity of the channel and thus how many messages it can hold
## before it refuses to accept any further messages.
assert elements >= 1, "Elements must be positive!"
result = Chan[T](d: allocChannel(sizeof(T), elements))
Loading