diff --git a/eth/filters/api.go b/eth/filters/api.go index 30d7b71c31..83c1d3ba30 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -107,20 +107,16 @@ func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID { pendingTxs = make(chan []common.Hash) pendingTxSub = api.events.SubscribePendingTxs(pendingTxs) ) - + f := &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: pendingTxSub} api.filtersMu.Lock() - api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: pendingTxSub} + api.filters[pendingTxSub.ID] = f api.filtersMu.Unlock() go func() { for { select { case ph := <-pendingTxs: - api.filtersMu.Lock() - if f, found := api.filters[pendingTxSub.ID]; found { - f.hashes = append(f.hashes, ph...) - } - api.filtersMu.Unlock() + f.hashes = append(f.hashes, ph...) case <-pendingTxSub.Err(): api.filtersMu.Lock() delete(api.filters, pendingTxSub.ID) diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index a105ec51c3..ca360ec86a 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -174,6 +174,17 @@ func (sub *Subscription) Unsubscribe() { // this ensures that the manager won't use the event channel which // will probably be closed by the client asap after this method returns. <-sub.Err() + + drainLoop: + for { + select { + case <-sub.f.logs: + case <-sub.f.hashes: + case <-sub.f.headers: + default: + break drainLoop + } + } }) }