Skip to content

Commit

Permalink
Merge pull request #6 from slamdata/fancy-new-old-bus
Browse files Browse the repository at this point in the history
Moves back to using a List of AVars for the listeners
  • Loading branch information
kritzcreek authored Feb 19, 2018
2 parents 835181a + a62064e commit 3eb7a91
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 13 deletions.
43 changes: 33 additions & 10 deletions src/Control/Monad/Aff/Bus.purs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{-
Copyright 2016 SlamData, Inc.
Copyright 2018 SlamData, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,13 +30,18 @@ module Control.Monad.Aff.Bus
) where

import Prelude
import Control.Monad.Aff (Aff, Error)
import Control.Monad.Aff.AVar (AVar, AVAR, makeEmptyVar, takeVar, tryPutVar, readVar, killVar)

import Control.Monad.Aff (Aff, attempt, forkAff)
import Control.Monad.Aff.AVar (AVAR, AVar, killVar, makeEmptyVar, makeVar, putVar, takeVar)
import Control.Monad.Eff.Exception as Exn
import Data.Foldable (foldl, sequence_, traverse_)
import Data.List (List, (:))
import Data.Monoid (mempty)
import Data.Tuple (Tuple(..))

data Cap

newtype Bus (r ∷ # Type) a = Bus (AVar a)
data Bus (r ∷ # Type) a = Bus (AVar a) (AVar (List (AVar a)))

type BusR = BusR' ()

Expand All @@ -50,20 +55,38 @@ type BusRW = Bus (read ∷ Cap, write ∷ Cap)

-- | Creates a new bidirectional Bus which can be read from and written to.
make eff a. Aff (avar AVAR | eff) (BusRW a)
make = Bus <$> makeEmptyVar
make = do
cell ← makeEmptyVar
consumers ← makeVar mempty
let
loop = do
attempt (takeVar cell) >>= traverse_ \res → do
vars ← takeVar consumers
putVar mempty consumers
sequence_ (foldl (\xs a → putVar res a : xs) mempty vars)
loop
_ ← forkAff loop
pure $ Bus cell consumers

-- | Blocks until a new value is pushed to the Bus, returning the value.
read eff a r. BusR' r a Aff (avar AVAR | eff) a
read (Bus avar) = readVar avar
read (Bus _ consumers) = do
res' ← makeEmptyVar
cs ← takeVar consumers
putVar (res' : cs) consumers
takeVar res'

-- | Pushes a new value to the Bus, yieldig immediately.
write eff a r. a BusW' r a Aff (avar AVAR | eff) Unit
write a (Bus avar) = tryPutVar a avar *> void (takeVar avar)
write a (Bus cell _) = putVar a cell

-- | Splits a bidirectional Bus into separate read and write Buses.
split a. BusRW a Tuple (BusR a) (BusW a)
split (Bus avar) = Tuple (Bus avar) (Bus avar)
split (Bus a b) = Tuple (Bus a b) (Bus a b)

-- | Kills the Bus and propagates the exception to all consumers.
kill eff a r. Error BusW' r a Aff (avar AVAR | eff) Unit
kill err (Bus avar) = killVar err avar
kill eff a r. Exn.Error BusW' r a Aff (avar AVAR | eff) Unit
kill err (Bus cell consumers) = do
killVar err cell
vars ← takeVar consumers
traverse_ (killVar err) vars
7 changes: 4 additions & 3 deletions test/Main.purs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
{-
Copyright 2016 SlamData, Inc.
Copyright 2018 SlamData, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -17,7 +17,8 @@ limitations under the License.
module Test.Main where

import Prelude
import Control.Monad.Aff (Aff, forkAff, launchAff, joinFiber, attempt)

import Control.Monad.Aff (Aff, attempt, forkAff, joinFiber, launchAff)
import Control.Monad.Aff.AVar (AVAR)
import Control.Monad.Aff.Bus as Bus
import Control.Monad.Aff.Console (log)
Expand Down

0 comments on commit 3eb7a91

Please sign in to comment.