diff --git a/README.md b/README.md index 8b1ccf6..d52920b 100644 --- a/README.md +++ b/README.md @@ -747,6 +747,10 @@ func handler(msgs []*memphis.Msg, err error, ctx context.Context) { consumer.Consume(handler, memphis.ConsumerPartitionKey() // use the partition key to consume from a spacific partition (if not specified consume in a Round Robin fashion) ) + +consumer.Consume(handler, + memphis.ConsumerPartitionNumber() +) ``` #### Consumer schema deserialization @@ -780,6 +784,7 @@ msgs, err := conn.FetchMessages("", "", memphis.FetchStartConsumeFromSeq()// start consuming from a specific sequence. defaults to 1 memphis.FetchLastMessages()// consume the last N messages, defaults to -1 (all messages in the station)) memphis.FetchPartitionKey()// use the partition key to consume from a spacific partition (if not specified consume in a Round Robin fashion) +) ``` ### Fetch a single batch of messages after creating a consumer diff --git a/connect.go b/connect.go index a8ee7c0..b49358a 100644 --- a/connect.go +++ b/connect.go @@ -983,7 +983,7 @@ func (c *Conn) GetPartitionFromKey(key string, stationName string) (int, error) } func (c *Conn) ValidatePartitionNumber(partitionNumber int, stationName string) error { - if partitionNumber < 0 || partitionNumber >= len(c.stationPartitions[stationName].PartitionsList) { + if partitionNumber < 0 || partitionNumber > len(c.stationPartitions[stationName].PartitionsList) { return errors.New("Partition number is out of range") } for _, partition := range c.stationPartitions[stationName].PartitionsList {