Skip to content

Commit

Permalink
res
Browse files Browse the repository at this point in the history
  • Loading branch information
ejconlon committed Feb 18, 2024
1 parent 28be946 commit f4d5ef3
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 82 deletions.
71 changes: 4 additions & 67 deletions minipat-dirt/src/Minipat/Dirt/Core.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,8 @@ module Minipat.Dirt.Core where
import Control.Concurrent (forkFinally)
import Control.Concurrent.Async (Async, async, cancel, poll, waitCatch)
import Control.Concurrent.MVar (MVar, newEmptyMVar, putMVar, tryTakeMVar, withMVar)
import Control.Concurrent.STM (STM, atomically, retry)
import Control.Concurrent.STM.TQueue
( TQueue
, flushTQueue
, newTQueueIO
, peekTQueue
, readTQueue
, tryPeekTQueue
, writeTQueue
)
import Control.Concurrent.STM (STM, atomically)
import Control.Concurrent.STM.TQueue (TQueue, flushTQueue, newTQueueIO, writeTQueue)
import Control.Concurrent.STM.TVar (TVar, modifyTVar', newTVarIO, readTVar, readTVarIO, stateTVar, writeTVar)
import Control.Exception (Exception (..), SomeException, bracket, mask_, onException, throwIO)
import Control.Monad (unless, void, when)
Expand All @@ -24,7 +16,6 @@ import Dahdit.Network (Conn (..), HostPort (..), resolveAddr, runDecoder, runEnc
import Data.Acquire (Acquire)
import Data.Either (isRight)
import Data.Foldable (foldl', for_)
import Data.IORef (IORef, newIORef, readIORef, writeIORef)
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Ratio ((%))
Expand All @@ -34,8 +25,8 @@ import Data.Text (Text)
import Data.Text qualified as T
import Minipat.Dirt.Attrs (Attrs, attrsInsert)
import Minipat.Dirt.Logger (LogAction, logDebug, logError, logInfo, logWarn, newLogger)
import Minipat.Dirt.Osc (PlayEnv (..), PlayErr, Timed (..), convertTape, handshakePacket, playPacket)
import Minipat.Dirt.Resources (RelVar, acquireAsync, relVarAcquire, relVarDispose, relVarInit)
import Minipat.Dirt.Osc (PlayEnv (..), PlayErr, convertTape, handshakePacket, playPacket)
import Minipat.Dirt.Resources (RelVar, Timed (..), acquireAwait, acquireLoop, relVarAcquire, relVarDispose, relVarInit)
import Minipat.EStream (EStream (..))
import Minipat.Print (prettyPrint, prettyPrintAll, prettyShow, prettyShowAll)
import Minipat.Stream (Stream, streamRun, tapeToList)
Expand All @@ -44,66 +35,12 @@ import Nanotime
( PosixTime (..)
, TimeDelta
, TimeLike (..)
, awaitDelta
, threadDelayDelta
, timeDeltaFromFracSecs
)
import Network.Socket qualified as NS
import Prettyprinter (Pretty)

newtype NonPosTimeDeltaErr = NonPosTimeDeltaErr TimeDelta
deriving stock (Eq, Ord, Show)

instance Exception NonPosTimeDeltaErr

awaitTime :: TimeDelta -> IORef PosixTime -> IO PosixTime
awaitTime delta timeVar = do
lastTime <- readIORef timeVar
nextTime <-
if lastTime == PosixTime 0
then currentTime
else awaitDelta lastTime delta
writeIORef timeVar nextTime
pure nextTime

acquireLoop :: TVar TimeDelta -> (PosixTime -> IO ()) -> Acquire (Async ())
acquireLoop deltaVar act = do
timeVar <- liftIO (newIORef (PosixTime 0))
let act' = do
delta <- readTVarIO deltaVar
unless (delta > 0) (throwIO (NonPosTimeDeltaErr delta))
time <- awaitTime delta timeVar
act time
act'
acquireAsync act'

acquireAwait :: TVar Bool -> TQueue (Timed a) -> (Timed a -> IO ()) -> Acquire (Async ())
acquireAwait runVar queue act =
let act' = do
-- Peek at the first entry and await it
time <- atomically $ do
run <- readTVar runVar
if run
then fmap timedKey (peekTQueue queue)
else retry
now <- currentTime @PosixTime
threadDelayDelta (diffTime time now)
-- If it's still there (not cleared), act on it
mtimed <- atomically $ do
run <- readTVar runVar
if run
then do
mtimed <- tryPeekTQueue queue
case mtimed of
Just timed
| timedKey timed == time ->
mtimed <$ readTQueue queue
_ -> pure Nothing
else pure Nothing
maybe (pure ()) act mtimed
act'
in acquireAsync act'

data Env = Env
{ envTargetHp :: !HostPort
, envListenHp :: !HostPort
Expand Down
15 changes: 2 additions & 13 deletions minipat-dirt/src/Minipat/Dirt/Osc.hs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
{-# LANGUAGE OverloadedStrings #-}

module Minipat.Dirt.Osc
( Timed (..)
, PlayErr (..)
( PlayErr (..)
, PlayEnv (..)
, convertEvent
, convertTape
Expand All @@ -21,20 +20,10 @@ import Data.Sequence (Seq (..))
import Data.Sequence qualified as Seq
import Data.Text (Text)
import Minipat.Dirt.Attrs (Attrs, IsAttrs (..), attrsDelete, attrsInsert, attrsLookup, attrsToList)
import Minipat.Dirt.Resources (Timed (..))
import Minipat.Stream (Ev (..), Tape, tapeToList)
import Minipat.Time (CycleDelta (..), CycleTime (..), Span, spanCycle, spanDelta)
import Nanotime (PosixTime (..), TimeDelta (..), addTime, timeDeltaFromFracSecs, timeDeltaToNanos)
import Prettyprinter (Pretty (..))
import Prettyprinter qualified as P

data Timed a = Timed
{ timedKey :: !PosixTime
, timedVal :: !a
}
deriving stock (Eq, Ord, Show, Functor, Foldable, Traversable)

instance (Pretty a) => Pretty (Timed a) where
pretty (Timed k v) = P.hsep [pretty (unPosixTime k), pretty v]

namedPayload :: Attrs -> Seq Datum
namedPayload = foldl' go Empty . attrsToList
Expand Down
76 changes: 75 additions & 1 deletion minipat-dirt/src/Minipat/Dirt/Resources.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,26 @@ module Minipat.Dirt.Resources
, relVarAcquire
, withRelVar
, acquireAsync
, acquireLoop
, Timed (..)
, acquireAwait
)
where

import Control.Concurrent.Async (Async, async, cancel)
import Control.Exception (bracket, mask)
import Control.Concurrent.STM (atomically, retry)
import Control.Concurrent.STM.TQueue (TQueue, peekTQueue, readTQueue, tryPeekTQueue)
import Control.Concurrent.STM.TVar (TVar, readTVar, readTVarIO)
import Control.Exception (Exception, bracket, mask, throwIO)
import Control.Monad (unless)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Trans.Resource (InternalState, closeInternalState, createInternalState)
import Control.Monad.Trans.Resource.Internal (registerType)
import Data.Acquire.Internal (Acquire (..), Allocated (..), mkAcquire)
import Data.IORef (IORef, newIORef, readIORef, writeIORef)
import Nanotime (PosixTime (..), TimeDelta, awaitDelta, currentTime, diffTime, threadDelayDelta)
import Prettyprinter (Pretty (..))
import Prettyprinter qualified as P

type RelVar = InternalState

Expand All @@ -33,3 +45,65 @@ withRelVar = bracket relVarInit relVarDispose

acquireAsync :: IO a -> Acquire (Async a)
acquireAsync act = mkAcquire (async act) cancel

newtype NonPosTimeDeltaErr = NonPosTimeDeltaErr TimeDelta
deriving stock (Eq, Ord, Show)

instance Exception NonPosTimeDeltaErr

awaitTime :: TimeDelta -> IORef PosixTime -> IO PosixTime
awaitTime delta timeVar = do
lastTime <- readIORef timeVar
nextTime <-
if lastTime == PosixTime 0
then currentTime
else awaitDelta lastTime delta
writeIORef timeVar nextTime
pure nextTime

acquireLoop :: TVar TimeDelta -> (PosixTime -> IO ()) -> Acquire (Async ())
acquireLoop deltaVar act = do
timeVar <- liftIO (newIORef (PosixTime 0))
let act' = do
delta <- readTVarIO deltaVar
unless (delta > 0) (throwIO (NonPosTimeDeltaErr delta))
time <- awaitTime delta timeVar
act time
act'
acquireAsync act'

data Timed a = Timed
{ timedKey :: !PosixTime
, timedVal :: !a
}
deriving stock (Eq, Ord, Show, Functor, Foldable, Traversable)

instance (Pretty a) => Pretty (Timed a) where
pretty (Timed k v) = P.hsep [pretty (unPosixTime k), pretty v]

acquireAwait :: TVar Bool -> TQueue (Timed a) -> (Timed a -> IO ()) -> Acquire (Async ())
acquireAwait runVar queue act =
let act' = do
-- Peek at the first entry and await it
time <- atomically $ do
run <- readTVar runVar
if run
then fmap timedKey (peekTQueue queue)
else retry
now <- currentTime @PosixTime
threadDelayDelta (diffTime time now)
-- If it's still there (not cleared), act on it
mtimed <- atomically $ do
run <- readTVar runVar
if run
then do
mtimed <- tryPeekTQueue queue
case mtimed of
Just timed
| timedKey timed == time ->
mtimed <$ readTQueue queue
_ -> pure Nothing
else pure Nothing
maybe (pure ()) act mtimed
act'
in acquireAsync act'
3 changes: 2 additions & 1 deletion minipat-dirt/src/Minipat/Dirt/Test.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import Minipat.Dirt.Core
, setTempo
, withSt
)
import Minipat.Dirt.Osc (PlayEnv (..), PlayErr, Timed (..), convertTape, handshakePacket, playPacket)
import Minipat.Dirt.Osc (PlayEnv (..), PlayErr, convertTape, handshakePacket, playPacket)
import Minipat.Dirt.Resources (Timed (..))
import Minipat.Stream (Ev (..), tapeSingleton)
import Minipat.Time (Arc (..), Span (..))
import Nanotime (TimeLike (..), threadDelayDelta, timeDeltaFromFracSecs)
Expand Down

0 comments on commit f4d5ef3

Please sign in to comment.