Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix kill after write issue #17

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

safareli
Copy link
Contributor

@safareli safareli commented Sep 21, 2019

This approach fixes #11 i.e. if you have fibers reading from bus and you have kill immediately after write to the bus, then the write will be missed. I was able to fix that, but using same AVar.put on cell as write is doing and the kill is happening in the loop forked on initialization of the bus.

I have added some comments to the code but if something is not clear or seams odd let me know.

@erisco
Copy link

erisco commented Sep 21, 2019

The original implementation has many bugs. Just an example:

-- | Blocks until a new value is pushed to the Bus, returning the value.
read ∷ ∀ a r. BusR' r a → Aff a
read (Bus _ consumers) = do
  res' ← AVar.empty
  cs ← AVar.take consumers
  -- *** Kill here leaves consumers empty ***
  AVar.put (res' : cs) consumers
  AVar.take res'

Also, I suspect there is a conceptual problem with consumers. If a consumer is merely an actor doing the read effect, then any time at which the actor is not reading it is also not a consumer.

Lets say a particular actor just reads, then logs what was read, then starts again. While the actor is reading it is a consumer, but while the actor is logging it is not a consumer. Therefore, messages written while the actor is logging will not be read by that actor.

Is that what anyone is expecting of bus? What is actually the expectation? What is meant by "consumer"?

@safareli
Copy link
Contributor Author

safareli commented Sep 21, 2019

I think that bug you outlined can be fixed like this:

generalBracket (pure unit)
    { killed: const pure
    , failed: const pure
    , completed: \cs _ → AVar.put (res' : cs) consumers
    }
    (AVar.take consumers)

(UPDATE: not really, as if take is successful this fiber can still be killed such that completed will not be called, the purescript-contrib/purescript-aff#169 pr is supposed to fix this kind of issues and allow this snippet to actually work)

consumer now represent actors currently blocked by read.
I've been using it usually like this:

forever $ Bus.read >>= \res -> forkAff do ...

but I understand that in some case it might miss some values tho i haven't noticed that yet.

@safareli
Copy link
Contributor Author

This is what's actually needed I think:

-- | Blocks until a new value is pushed to the Bus, returning the value.
read ∷ ∀ a r. BusR' r a → Aff a
read bus = consume bus $ Done >>> pure

data Step res = Loop | Done res

-- | Registers a new consumer on the bus. Blocks until the consumer returns
-- | `Done x` and then computation is resolved with the `x`. 
-- | if Effect raises an exception it will be propagated
consume ∷ ∀ a r output. BusR' r a → (a → Effect (Step output)) → Aff output

This way consuming function can never miss anything. using just that I was able to implement:

-- | Same as `consume` but consuming function returns Aff instead of Effect.
-- | When consuming function is not completed and new value comes in, it gets
-- | killed and is invoked with new value.
consumeLatest ∷ ∀ a r output. BusR' r a → (a → Aff (Step output)) → Aff output

@thomashoneyman thomashoneyman changed the base branch from master to main October 4, 2020 01:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

Avoid delay before kill?
2 participants