diff --git a/tests/tchannels_cooperative.nim b/tests/tchannels_cooperative.nim index d15250e..2ddcb6c 100644 --- a/tests/tchannels_cooperative.nim +++ b/tests/tchannels_cooperative.nim @@ -10,27 +10,27 @@ const sentmsg = "task sent" type - Payload = tuple[chan: ptr Chan[int16], idx: int16] + Payload = tuple[chan: Chan[int16], idx: int16] var sentmessages = newSeqOfCap[string](NTasks) receivedmessages = newSeqOfCap[int16](NTasks) # A prototype of a task executing thread -proc runner(tasksCh: ptr Chan[Payload]) {.thread.} = +proc runner(tasksCh: Chan[Payload]) {.thread.} = var p: Payload while true: - tasksCh[].recv(p) # Get a message from the main thread + tasksCh.recv(p) # Get a message from the main thread if p.idx == -1: break # Check for an ad hoc stop signal else: sleep(SleepDurationMS) # Hard work - p.chan[].send(p.idx) # Notify a consumer + p.chan.send(p.idx) # Notify a consumer # A single thread receiving result from runner threads -proc consumer(args: tuple[resultsCh: ptr Chan[int16], tasks: int16]) {.thread.} = +proc consumer(args: tuple[resultsCh: Chan[int16], tasks: int16]) {.thread.} = var idx: int16 for _ in 0.. 0, "trySend should have been attempted multiple times" block send_tryRecv: - proc test(chan: ptr Chan[string]) {.thread.} = + var attempts = 0 + + proc test(chan: Chan[string]) {.thread.} = var notReceived = true var msg: string while notReceived: - notReceived = not chan[].tryRecv(msg) + notReceived = not chan.tryRecv(msg) + if notReceived: + atomicInc(attempts) doAssert msg == Message var chan = newChan[string](elements = 1) - var thread: Thread[ptr Chan[string]] - let src = Message - createThread(thread, test, chan.addr) + var thread: Thread[Chan[string]] + createThread(thread, test, chan) + sleep 10 + + let src = Message chan.send(src) thread.joinThread() + doAssert attempts > 0, "tryRecv should have been attempted multiple times" + diff --git a/tests/tsmartptrsleak.nim b/tests/tsmartptrsleak.nim index b70ae7d..3a0bc7f 100644 --- a/tests/tsmartptrsleak.nim +++ b/tests/tsmartptrsleak.nim @@ -1,7 +1,7 @@ import threading/smartptrs import std/isolation import std/locks -import threading/atomics +import std/atomics import threading/channels var @@ -12,10 +12,10 @@ type when defined(nimAllowNonVarDestructor): proc `=destroy`(obj: TestObj) = - discard freeCounts.fetchAdd(1, Release) + discard freeCounts.fetchAdd(1, moRelease) else: proc `=destroy`(obj: var TestObj) = - discard freeCounts.fetchAdd(1, Release) + discard freeCounts.fetchAdd(1, moRelease) var thr: array[0..1, Thread[void]] @@ -50,6 +50,6 @@ createThread(thr[0], threadA) createThread(thr[1], threadB) joinThreads(thr) -echo "freeCounts: got: ", $int(freeCounts), " expected: ", N +echo "freeCounts: got: ", load(freeCounts, moRelaxed), " expected: ", N echo "" -assert freeCounts.load(Acquire) == N +assert freeCounts.load(moRelaxed) == N diff --git a/threading/channels.nim b/threading/channels.nim index eab2f32..db68c34 100644 --- a/threading/channels.nim +++ b/threading/channels.nim @@ -28,9 +28,9 @@ ## the underlying resources and synchronization. It has to be initialized using ## the `newChan` proc. Sending and receiving operations are provided by the ## blocking `send` and `recv` procs, and non-blocking `trySend` and `tryRecv` -## procs. Send operations add messages to the channel, receiving operations +## procs. Send operations add messages to the channel, receiving operations ## remove them. -## +## ## See also: ## * [std/isolation](https://nim-lang.org/docs/isolation.html) ## @@ -289,68 +289,90 @@ proc `=copy`*[T](dest: var Chan[T], src: Chan[T]) = proc trySend*[T](c: Chan[T], src: sink Isolated[T]): bool {.inline.} = ## Tries to send the message `src` to the channel `c`. ## - ## The memory of `src` is moved, not copied. + ## The memory of `src` will be moved if possible. ## Doesn't block waiting for space in the channel to become available. ## Instead returns after an attempt to send a message was made. - ## - ## .. warning:: Blocking is still possible if another thread uses the blocking - ## version of the `send proc`_ / `recv proc`_ and waits for the - ## data/space to appear in the channel, thus holding the internal lock to - ## channel's buffer. + ## + ## .. warning:: In high-concurrency situations, consider using an exponential + ## backoff strategy to reduce contention and improve the success rate of + ## operations. ## ## Returns `false` if the message was not sent because the number of pending ## messages in the channel exceeded its capacity. - var data = src.extract - result = channelSend(c.d, data.unsafeAddr, sizeof(T), false) + result = channelSend(c.d, src.addr, sizeof(T), false) if result: - wasMoved(data) + wasMoved(src) template trySend*[T](c: Chan[T], src: T): bool = ## Helper template for `trySend <#trySend,Chan[T],sinkIsolated[T]>`_. + ## + ## .. warning:: For repeated sends of the same value, consider using the + ## `tryTake <#tryTake,Chan[T],varIsolated[T]>`_ proc with a pre-isolated + ## value to avoid unnecessary copying. + mixin isolate trySend(c, isolate(src)) +proc tryTake*[T](c: Chan[T], src: var Isolated[T]): bool {.inline.} = + ## Tries to send the message `src` to the channel `c`. + ## + ## The memory of `src` is moved directly. Be careful not to reuse `src` afterwards. + ## This proc is suitable when `src` cannot be copied. + ## + ## Doesn't block waiting for space in the channel to become available. + ## Instead returns after an attempt to send a message was made. + ## + ## .. warning:: In high-concurrency situations, consider using an exponential + ## backoff strategy to reduce contention and improve the success rate of + ## operations. + ## + ## Returns `false` if the message was not sent because the number of pending + ## messages in the channel exceeded its capacity. + result = channelSend(c.d, src.addr, sizeof(T), false) + if result: + wasMoved(src) + proc tryRecv*[T](c: Chan[T], dst: var T): bool {.inline.} = ## Tries to receive a message from the channel `c` and fill `dst` with its value. - ## + ## ## Doesn't block waiting for messages in the channel to become available. ## Instead returns after an attempt to receive a message was made. - ## - ## .. warning:: Blocking is still possible if another thread uses the blocking - ## version of the `send proc`_ / `recv proc`_ and waits for the data/space to - ## appear in the channel, thus holding the internal lock to channel's buffer. - ## + ## + ## .. warning:: In high-concurrency situations, consider using an exponential + ## backoff strategy to reduce contention and improve the success rate of + ## operations. + ## ## Returns `false` and does not change `dist` if no message was received. channelReceive(c.d, dst.addr, sizeof(T), false) proc send*[T](c: Chan[T], src: sink Isolated[T]) {.inline.} = - ## Sends the message `src` to the channel `c`. + ## Sends the message `src` to the channel `c`. ## This blocks the sending thread until `src` was successfully sent. - ## - ## The memory of `src` is moved, not copied. - ## + ## + ## The memory of `src` is moved, not copied. + ## ## If the channel is already full with messages this will block the thread until ## messages from the channel are removed. - var data = src.extract when defined(gcOrc) and defined(nimSafeOrcSend): GC_runOrc() - discard channelSend(c.d, data.unsafeAddr, sizeof(T), true) - wasMoved(data) + discard channelSend(c.d, src.addr, sizeof(T), true) + wasMoved(src) template send*[T](c: Chan[T]; src: T) = ## Helper template for `send`. + mixin isolate send(c, isolate(src)) proc recv*[T](c: Chan[T], dst: var T) {.inline.} = - ## Receives a message from the channel `c` and fill `dst` with its value. - ## + ## Receives a message from the channel `c` and fill `dst` with its value. + ## ## This blocks the receiving thread until a message was successfully received. - ## + ## ## If the channel does not contain any messages this will block the thread until ## a message get sent to the channel. discard channelReceive(c.d, dst.addr, sizeof(T), true) proc recv*[T](c: Chan[T]): T {.inline.} = - ## Receives a message from the channel. + ## Receives a message from the channel. ## A version of `recv`_ that returns the message. discard channelReceive(c.d, result.addr, sizeof(result), true) @@ -367,9 +389,8 @@ proc peek*[T](c: Chan[T]): int {.inline.} = proc newChan*[T](elements: Positive = 30): Chan[T] = ## An initialization procedure, necessary for acquiring resources and - ## initializing internal state of the channel. - ## - ## `elements` is the capacity of the channel and thus how many messages it can hold + ## initializing internal state of the channel. + ## + ## `elements` is the capacity of the channel and thus how many messages it can hold ## before it refuses to accept any further messages. - assert elements >= 1, "Elements must be positive!" result = Chan[T](d: allocChannel(sizeof(T), elements))