Skip to content

Commit

Permalink
Merge pull request #74 from planetis-m/chan
Browse files Browse the repository at this point in the history
refactor channels.trySend/tryRecv and improve tests
  • Loading branch information
Araq authored Aug 26, 2024
2 parents 6fbc0df + 36317e5 commit 5501a4a
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 65 deletions.
32 changes: 16 additions & 16 deletions tests/tchannels_cooperative.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,50 +10,50 @@ const
sentmsg = "task sent"

type
Payload = tuple[chan: ptr Chan[int16], idx: int16]
Payload = tuple[chan: Chan[int16], idx: int16]

var
sentmessages = newSeqOfCap[string](NTasks)
receivedmessages = newSeqOfCap[int16](NTasks)

# A prototype of a task executing thread
proc runner(tasksCh: ptr Chan[Payload]) {.thread.} =
proc runner(tasksCh: Chan[Payload]) {.thread.} =
var p: Payload
while true:
tasksCh[].recv(p) # Get a message from the main thread
tasksCh.recv(p) # Get a message from the main thread
if p.idx == -1: break # Check for an ad hoc stop signal
else:
sleep(SleepDurationMS) # Hard work
p.chan[].send(p.idx) # Notify a consumer
p.chan.send(p.idx) # Notify a consumer

# A single thread receiving result from runner threads
proc consumer(args: tuple[resultsCh: ptr Chan[int16], tasks: int16]) {.thread.} =
proc consumer(args: tuple[resultsCh: Chan[int16], tasks: int16]) {.thread.} =
var idx: int16
for _ in 0..<args.tasks: # We know the number of tasks and wait for them all
args.resultsCh[].recv(idx)
args.resultsCh.recv(idx)
{.gcsafe.}: # Don't do this. Here we know it's an exclusive access
receivedmessages.add(idx) # Store which task was completed

proc main(chanSize: Natural) =
sentmessages.setLen(0)
receivedmessages.setLen(0)
var
taskThreads = newSeq[Thread[ptr Chan[Payload]]](countProcessors())
taskThreads = newSeq[Thread[Chan[Payload]]](countProcessors())
tasksCh = newChan[Payload](chanSize)
consumerTh: Thread[(ptr Chan[int16], int16)]
consumerTh: Thread[(Chan[int16], int16)]
resultsCh = newChan[int16](chanSize)

# Consumer must be ready first to not block
createThread(consumerTh, consumer, (resultsCh.addr, NTasks))
createThread(consumerTh, consumer, (resultsCh, NTasks))
# Start runner threads
for i in 0..high(taskThreads): createThread(taskThreads[i], runner, tasksCh.addr)
for i in 0..high(taskThreads): createThread(taskThreads[i], runner, tasksCh)
# Loop iterating fake data
for idx in 0'i16..<NTasks:
tasksCh.send((resultsCh.addr, idx))
for idx in 0'i16..<NTasks:
tasksCh.send((resultsCh, idx))
sentmessages.add(sentmsg)

for _ in taskThreads: # Stopping worker threads
tasksCh.send((resultsCh.addr, -1'i16)) # A thread can't get more than 1 stop signal
tasksCh.send((resultsCh, -1'i16)) # A thread can't get more than 1 stop signal
joinThreads(taskThreads)
joinThread(consumerTh)

Expand All @@ -69,10 +69,10 @@ template runTests(bufferSize: Positive) =
var set = {0..NTasks-1}
for i in receivedmessages: set.excl(i)
doAssert set == {}


block buffered_channels:
runTests(bufferSize = 2)
runTests(bufferSize = 2)

block unbuffered_channels:
runTests(bufferSize = 1)
44 changes: 32 additions & 12 deletions tests/tchannels_singlebuf.nim
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,60 @@
## https://github.com/nim-lang/threading/pull/27#issue-1652851878
## Also tests `trySend` and `tryRecv` templates.

import threading/channels
import threading/channels, std/os
const Message = "Hello"

block trySend_recv:
proc test(chan: ptr Chan[string]) {.thread.} =
var attempts = 0

proc test(chan: Chan[string]) {.thread.} =
var notSent = true
var msg = Message
let msg = Message
while notSent:
notSent = not chan[].trySend(msg)
notSent = not chan.trySend(msg)
if notSent:
atomicInc(attempts)

var chan = newChan[string](elements = 1)
var thread: Thread[ptr Chan[string]]
var dest: string
# Fill the channel before spawning the thread
chan.send("Dummy message")

var thread: Thread[Chan[string]]
createThread(thread, test, chan)
sleep 10

createThread(thread, test, chan.addr)
# Receive the dummy message to make room for the real message
discard chan.recv()

var dest: string
chan.recv(dest)
doAssert dest == Message

thread.joinThread()
doAssert attempts > 0, "trySend should have been attempted multiple times"


block send_tryRecv:
proc test(chan: ptr Chan[string]) {.thread.} =
var attempts = 0

proc test(chan: Chan[string]) {.thread.} =
var notReceived = true
var msg: string
while notReceived:
notReceived = not chan[].tryRecv(msg)
notReceived = not chan.tryRecv(msg)
if notReceived:
atomicInc(attempts)
doAssert msg == Message

var chan = newChan[string](elements = 1)
var thread: Thread[ptr Chan[string]]
let src = Message

createThread(thread, test, chan.addr)
var thread: Thread[Chan[string]]
createThread(thread, test, chan)
sleep 10

let src = Message
chan.send(src)

thread.joinThread()
doAssert attempts > 0, "tryRecv should have been attempted multiple times"

10 changes: 5 additions & 5 deletions tests/tsmartptrsleak.nim
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import threading/smartptrs
import std/isolation
import std/locks
import threading/atomics
import std/atomics
import threading/channels

var
Expand All @@ -12,10 +12,10 @@ type

when defined(nimAllowNonVarDestructor):
proc `=destroy`(obj: TestObj) =
discard freeCounts.fetchAdd(1, Release)
discard freeCounts.fetchAdd(1, moRelease)
else:
proc `=destroy`(obj: var TestObj) =
discard freeCounts.fetchAdd(1, Release)
discard freeCounts.fetchAdd(1, moRelease)

var
thr: array[0..1, Thread[void]]
Expand Down Expand Up @@ -50,6 +50,6 @@ createThread(thr[0], threadA)
createThread(thr[1], threadB)
joinThreads(thr)

echo "freeCounts: got: ", $int(freeCounts), " expected: ", N
echo "freeCounts: got: ", load(freeCounts, moRelaxed), " expected: ", N
echo ""
assert freeCounts.load(Acquire) == N
assert freeCounts.load(moRelaxed) == N
85 changes: 53 additions & 32 deletions threading/channels.nim
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
## the underlying resources and synchronization. It has to be initialized using
## the `newChan` proc. Sending and receiving operations are provided by the
## blocking `send` and `recv` procs, and non-blocking `trySend` and `tryRecv`
## procs. Send operations add messages to the channel, receiving operations
## procs. Send operations add messages to the channel, receiving operations
## remove them.
##
##
## See also:
## * [std/isolation](https://nim-lang.org/docs/isolation.html)
##
Expand Down Expand Up @@ -289,68 +289,90 @@ proc `=copy`*[T](dest: var Chan[T], src: Chan[T]) =
proc trySend*[T](c: Chan[T], src: sink Isolated[T]): bool {.inline.} =
## Tries to send the message `src` to the channel `c`.
##
## The memory of `src` is moved, not copied.
## The memory of `src` will be moved if possible.
## Doesn't block waiting for space in the channel to become available.
## Instead returns after an attempt to send a message was made.
##
## .. warning:: Blocking is still possible if another thread uses the blocking
## version of the `send proc`_ / `recv proc`_ and waits for the
## data/space to appear in the channel, thus holding the internal lock to
## channel's buffer.
##
## .. warning:: In high-concurrency situations, consider using an exponential
## backoff strategy to reduce contention and improve the success rate of
## operations.
##
## Returns `false` if the message was not sent because the number of pending
## messages in the channel exceeded its capacity.
var data = src.extract
result = channelSend(c.d, data.unsafeAddr, sizeof(T), false)
result = channelSend(c.d, src.addr, sizeof(T), false)
if result:
wasMoved(data)
wasMoved(src)

template trySend*[T](c: Chan[T], src: T): bool =
## Helper template for `trySend <#trySend,Chan[T],sinkIsolated[T]>`_.
##
## .. warning:: For repeated sends of the same value, consider using the
## `tryTake <#tryTake,Chan[T],varIsolated[T]>`_ proc with a pre-isolated
## value to avoid unnecessary copying.
mixin isolate
trySend(c, isolate(src))

proc tryTake*[T](c: Chan[T], src: var Isolated[T]): bool {.inline.} =
## Tries to send the message `src` to the channel `c`.
##
## The memory of `src` is moved directly. Be careful not to reuse `src` afterwards.
## This proc is suitable when `src` cannot be copied.
##
## Doesn't block waiting for space in the channel to become available.
## Instead returns after an attempt to send a message was made.
##
## .. warning:: In high-concurrency situations, consider using an exponential
## backoff strategy to reduce contention and improve the success rate of
## operations.
##
## Returns `false` if the message was not sent because the number of pending
## messages in the channel exceeded its capacity.
result = channelSend(c.d, src.addr, sizeof(T), false)
if result:
wasMoved(src)

proc tryRecv*[T](c: Chan[T], dst: var T): bool {.inline.} =
## Tries to receive a message from the channel `c` and fill `dst` with its value.
##
##
## Doesn't block waiting for messages in the channel to become available.
## Instead returns after an attempt to receive a message was made.
##
## .. warning:: Blocking is still possible if another thread uses the blocking
## version of the `send proc`_ / `recv proc`_ and waits for the data/space to
## appear in the channel, thus holding the internal lock to channel's buffer.
##
##
## .. warning:: In high-concurrency situations, consider using an exponential
## backoff strategy to reduce contention and improve the success rate of
## operations.
##
## Returns `false` and does not change `dist` if no message was received.
channelReceive(c.d, dst.addr, sizeof(T), false)

proc send*[T](c: Chan[T], src: sink Isolated[T]) {.inline.} =
## Sends the message `src` to the channel `c`.
## Sends the message `src` to the channel `c`.
## This blocks the sending thread until `src` was successfully sent.
##
## The memory of `src` is moved, not copied.
##
##
## The memory of `src` is moved, not copied.
##
## If the channel is already full with messages this will block the thread until
## messages from the channel are removed.
var data = src.extract
when defined(gcOrc) and defined(nimSafeOrcSend):
GC_runOrc()
discard channelSend(c.d, data.unsafeAddr, sizeof(T), true)
wasMoved(data)
discard channelSend(c.d, src.addr, sizeof(T), true)
wasMoved(src)

template send*[T](c: Chan[T]; src: T) =
## Helper template for `send`.
mixin isolate
send(c, isolate(src))

proc recv*[T](c: Chan[T], dst: var T) {.inline.} =
## Receives a message from the channel `c` and fill `dst` with its value.
##
## Receives a message from the channel `c` and fill `dst` with its value.
##
## This blocks the receiving thread until a message was successfully received.
##
##
## If the channel does not contain any messages this will block the thread until
## a message get sent to the channel.
discard channelReceive(c.d, dst.addr, sizeof(T), true)

proc recv*[T](c: Chan[T]): T {.inline.} =
## Receives a message from the channel.
## Receives a message from the channel.
## A version of `recv`_ that returns the message.
discard channelReceive(c.d, result.addr, sizeof(result), true)

Expand All @@ -367,9 +389,8 @@ proc peek*[T](c: Chan[T]): int {.inline.} =

proc newChan*[T](elements: Positive = 30): Chan[T] =
## An initialization procedure, necessary for acquiring resources and
## initializing internal state of the channel.
##
## `elements` is the capacity of the channel and thus how many messages it can hold
## initializing internal state of the channel.
##
## `elements` is the capacity of the channel and thus how many messages it can hold
## before it refuses to accept any further messages.
assert elements >= 1, "Elements must be positive!"
result = Chan[T](d: allocChannel(sizeof(T), elements))

0 comments on commit 5501a4a

Please sign in to comment.