diff --git a/service.go b/service.go index e7c7f25..0a02d2a 100644 --- a/service.go +++ b/service.go @@ -66,9 +66,12 @@ func (s *DefaultService) Subscribe(ctx context.Context, topic string) (*subscrip s.subsMu.Lock() defer s.subsMu.Unlock() - defer log.Debugw("subscribed to topic") + log.Debugw("creating topic if new") ps := s.createSubIfNew(topic) - return ps.subscribe(ctx), nil + + log.Debugw("creating subscription") + sub := ps.subscribe(ctx) + return sub, nil } func (s *DefaultService) createSubIfNew(topic string) *pubsub { @@ -88,16 +91,29 @@ func (s *DefaultService) createSubIfNew(topic string) *pubsub { } func (s *DefaultService) Barrier(ctx context.Context, state string, target int) error { + log := log.With("state", state, "target", target) + log.Debugw("subscribing to topic") + if target == 0 { - log.Warnw("requested a barrier with target zero; satisfying immediately", "state", state) + log.Warnw("requested a barrier with target zero; satisfying immediately") return nil } s.barriersMu.Lock() + log.Debugw("creating state if new") barrier := s.createBarrierIfNew(state) s.barriersMu.Unlock() - return barrier.wait(ctx, target) + log.Debugw("waiting for barrier") + err := barrier.wait(ctx, target) + + if err != nil { + log.Debugw("barrier errored", "err", err) + } else { + log.Debugw("barrier satisfied") + } + + return err } func (s *DefaultService) SignalEntry(ctx context.Context, state string) (int, error) {