From a62064ef649da5e6760277b19a13b1753f5f8847 Mon Sep 17 00:00:00 2001 From: kritzcreek Date: Thu, 15 Feb 2018 16:37:07 +0100 Subject: [PATCH] Moves back to using a List of AVars for the listeners --- src/Control/Monad/Aff/Bus.purs | 43 ++++++++++++++++++++++++++-------- test/Main.purs | 7 +++--- 2 files changed, 37 insertions(+), 13 deletions(-) diff --git a/src/Control/Monad/Aff/Bus.purs b/src/Control/Monad/Aff/Bus.purs index ae7334c..e09e118 100644 --- a/src/Control/Monad/Aff/Bus.purs +++ b/src/Control/Monad/Aff/Bus.purs @@ -1,5 +1,5 @@ {- -Copyright 2016 SlamData, Inc. +Copyright 2018 SlamData, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -30,13 +30,18 @@ module Control.Monad.Aff.Bus ) where import Prelude -import Control.Monad.Aff (Aff, Error) -import Control.Monad.Aff.AVar (AVar, AVAR, makeEmptyVar, takeVar, tryPutVar, readVar, killVar) + +import Control.Monad.Aff (Aff, attempt, forkAff) +import Control.Monad.Aff.AVar (AVAR, AVar, killVar, makeEmptyVar, makeVar, putVar, takeVar) +import Control.Monad.Eff.Exception as Exn +import Data.Foldable (foldl, sequence_, traverse_) +import Data.List (List, (:)) +import Data.Monoid (mempty) import Data.Tuple (Tuple(..)) data Cap -newtype Bus (r ∷ # Type) a = Bus (AVar a) +data Bus (r ∷ # Type) a = Bus (AVar a) (AVar (List (AVar a))) type BusR = BusR' () @@ -50,20 +55,38 @@ type BusRW = Bus (read ∷ Cap, write ∷ Cap) -- | Creates a new bidirectional Bus which can be read from and written to. make ∷ ∀ eff a. Aff (avar ∷ AVAR | eff) (BusRW a) -make = Bus <$> makeEmptyVar +make = do + cell ← makeEmptyVar + consumers ← makeVar mempty + let + loop = do + attempt (takeVar cell) >>= traverse_ \res → do + vars ← takeVar consumers + putVar mempty consumers + sequence_ (foldl (\xs a → putVar res a : xs) mempty vars) + loop + _ ← forkAff loop + pure $ Bus cell consumers -- | Blocks until a new value is pushed to the Bus, returning the value. read ∷ ∀ eff a r. BusR' r a → Aff (avar ∷ AVAR | eff) a -read (Bus avar) = readVar avar +read (Bus _ consumers) = do + res' ← makeEmptyVar + cs ← takeVar consumers + putVar (res' : cs) consumers + takeVar res' -- | Pushes a new value to the Bus, yieldig immediately. write ∷ ∀ eff a r. a → BusW' r a → Aff (avar ∷ AVAR | eff) Unit -write a (Bus avar) = tryPutVar a avar *> void (takeVar avar) +write a (Bus cell _) = putVar a cell -- | Splits a bidirectional Bus into separate read and write Buses. split ∷ ∀ a. BusRW a → Tuple (BusR a) (BusW a) -split (Bus avar) = Tuple (Bus avar) (Bus avar) +split (Bus a b) = Tuple (Bus a b) (Bus a b) -- | Kills the Bus and propagates the exception to all consumers. -kill ∷ ∀ eff a r. Error → BusW' r a → Aff (avar ∷ AVAR | eff) Unit -kill err (Bus avar) = killVar err avar +kill ∷ ∀ eff a r. Exn.Error → BusW' r a → Aff (avar ∷ AVAR | eff) Unit +kill err (Bus cell consumers) = do + killVar err cell + vars ← takeVar consumers + traverse_ (killVar err) vars diff --git a/test/Main.purs b/test/Main.purs index a289caf..cbc43c3 100644 --- a/test/Main.purs +++ b/test/Main.purs @@ -1,12 +1,12 @@ {- -Copyright 2016 SlamData, Inc. +Copyright 2018 SlamData, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,7 +17,8 @@ limitations under the License. module Test.Main where import Prelude -import Control.Monad.Aff (Aff, forkAff, launchAff, joinFiber, attempt) + +import Control.Monad.Aff (Aff, attempt, forkAff, joinFiber, launchAff) import Control.Monad.Aff.AVar (AVAR) import Control.Monad.Aff.Bus as Bus import Control.Monad.Aff.Console (log)