diff --git a/api.go b/api.go index 8d6d7811..4dd05aa8 100644 --- a/api.go +++ b/api.go @@ -65,6 +65,12 @@ type PushConsumer interface { // Unsubscribe a topic Unsubscribe(topic string) error + + // Suspend the consumption + Suspend() + + // Resume the consumption + Resume() } func NewPushConsumer(opts ...consumer.Option) (PushConsumer, error) { diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go index 4ad5ee36..37b1f4a1 100644 --- a/consumer/push_consumer.go +++ b/consumer/push_consumer.go @@ -255,6 +255,14 @@ func (pc *pushConsumer) Unsubscribe(topic string) error { return nil } +func (pc *pushConsumer) Suspend() { + pc.suspend() +} + +func (pc *pushConsumer) Resume() { + pc.resume() +} + func (pc *pushConsumer) Rebalance() { pc.defaultConsumer.doBalance() }