Skip to content

Commit

Permalink
global refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
ergoz committed Jun 24, 2019
1 parent 44196e8 commit 4de38d3
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 96 deletions.
46 changes: 46 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
language: go

#dist: trusty
#sudo: false

os:
- linux

go:
- 1.11.x
- 1.12.x
- tip

env:
- GO111MODULE=on

matrix:
allow_failures:
- go: tip

#branches:
# only:
# - master
# - develop
# - /^v*.*.*$/

notifications:
email:
on_success: change
on_failure: always

install:
- go get -d
- go get -u golang.org/x/lint/golint
- go get -u golang.org/x/tools/cmd/goimports
- go get -u golang.org/x/tools/cmd/cover
- go get -u github.com/mattn/goveralls

before_script:

script:
- go vet ./...
- diff <(goimports -d .) <(printf "")
- diff <(golint ./...) <(printf "")
- go test -v -covermode=count -coverprofile=coverage.out
- goveralls -coverprofile=coverage.out -service=travis-ci -repotoken $COVERALLS_TOKEN
25 changes: 15 additions & 10 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package workercore

import (
"time"

"github.com/riftbit/golif"

"github.com/streadway/amqp"
)

// Configuration is basic configureation for worker
const (
defaultAsyncPoolSize = 1
)

// Configuration is basic configuration for worker
type Configuration struct {
AsyncWorker bool
AsyncPoolSize int32
Expand All @@ -28,22 +31,24 @@ type Configuration struct {
DefaultRetryDelay time.Duration
}

// ProcessFunction
// service field needs to pass application data, services, configs, etc.
type ProcessFunction func(service interface{}, amqpMSG *amqp.Delivery) (retryCnt int32, retryDelay time.Duration, err error)
// RunnableConsumer interface for processing function
type RunnableConsumer interface {
ProcessQueueTask(amqpMSG *amqp.Delivery) (retryCnt int32, retryDelay time.Duration, err error)
}

// Worker basic client
type Worker struct {
config *Configuration
logger golif.Logger
amqpConnection *amqp.Connection
amqpChannel *amqp.Channel
amqpQueue amqp.Queue
amqpQueueDelayed amqp.Queue
amqpQueueDelayedName string
amqpMessages <-chan amqp.Delivery
amqpNotifyCloseConnection chan *amqp.Error
amqpQueueDelayedName string
shutdownCh chan bool
processFunction ProcessFunction
externalService interface{}
errorCh chan error
amqpMessagesPoolCh chan *amqp.Delivery
runnerForProcessor func(*amqp.Delivery)
processorStorage RunnableConsumer
}
13 changes: 13 additions & 0 deletions helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package workercore

import (
"github.com/streadway/amqp"
)

func (wrk *Worker) processorRunnerAsync(msg *amqp.Delivery) {
wrk.amqpMessagesPoolCh <- msg
}

func (wrk *Worker) processorRunnerSync(msg *amqp.Delivery) {
wrk.processMessage(msg)
}
24 changes: 5 additions & 19 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,23 @@ import (
)

func (wrk *Worker) processMessage(amqpMSG *amqp.Delivery) {

retryCnt, retryDelay, err := wrk.processFunction(wrk.externalService, amqpMSG)

retryCnt, retryDelay, err := wrk.processorStorage.ProcessQueueTask(amqpMSG)
if err != nil {

if wrk.config.UseDelayedQueue {

var repeatCnt int32

if retryCnt != 0 {
repeatCnt = retryCnt
} else {
repeatCnt = wrk.config.DefaultRetryCount
}

if repeatCnt > 0 {

var currentTryId int32
var nextTryId int32

if amqpMSG.Headers == nil {
amqpMSG.Headers = make(map[string]interface{})
}

if n, ok := amqpMSG.Headers["x-retry-id"].(int32); ok {
currentTryId = n
nextTryId = currentTryId + 1
Expand All @@ -54,35 +47,28 @@ func (wrk *Worker) processMessage(amqpMSG *amqp.Delivery) {

err = wrk.repubToDelayed(amqpMSG, nextTryId, delay)
if err != nil {
wrk.logger.Errorf("cannot repub with delay: %v", err)
} else {
wrk.logger.Infof("sended repub with that had delivery tag: %d and next try id %d and delay %d", amqpMSG.DeliveryTag, nextTryId, delay)
wrk.errorCh <- fmt.Errorf("cannot repub with delay: %v", err)
}
}
}
}

err = amqpMSG.Nack(false, false)
if err != nil {
wrk.logger.Errorf("cannot send NAck: %v", err)
} else {
wrk.logger.Infof("deleted as unsuccess message with delivery tag %d", amqpMSG.DeliveryTag)
wrk.errorCh <- fmt.Errorf("cannot send NAck: %v", err)
}

return
}

err = amqpMSG.Ack(false)
if err != nil {
wrk.logger.Errorf("cannot send Ack for delivery tag $d: %v", amqpMSG.DeliveryTag, err)
wrk.errorCh <- fmt.Errorf("cannot send Ack for delivery tag $d: %v", amqpMSG.DeliveryTag, err)
return
}
}

func (wrk *Worker) repubToDelayed(amqpMSG *amqp.Delivery, retryID int32, delay time.Duration) error {

wrk.logger.Debugf("republishing to %s with delay %dms and retry-id %d", wrk.amqpQueueDelayedName, delay.Nanoseconds()/1e6, retryID)

return wrk.amqpChannel.Publish("", wrk.amqpQueueDelayedName, false, false, amqp.Publishing{
Headers: amqpMSG.Headers,
ContentType: amqpMSG.ContentType,
Expand All @@ -91,7 +77,7 @@ func (wrk *Worker) repubToDelayed(amqpMSG *amqp.Delivery, retryID int32, delay t
Priority: amqpMSG.Priority,
CorrelationId: amqpMSG.CorrelationId,
ReplyTo: amqpMSG.ReplyTo,
Expiration: fmt.Sprintf("%d", delay.Nanoseconds()/1e6),
Expiration: fmt.Sprintf("%d", int64(delay/time.Millisecond)),
MessageId: amqpMSG.MessageId,
Timestamp: time.Now(),
Type: amqpMSG.Type,
Expand Down
Loading

0 comments on commit 4de38d3

Please sign in to comment.