Skip to content

Commit

Permalink
✨ Add workq redis implemention
Browse files Browse the repository at this point in the history
  • Loading branch information
tosone committed Sep 20, 2023
1 parent b5558b0 commit b72c6c6
Show file tree
Hide file tree
Showing 23 changed files with 342 additions and 129 deletions.
2 changes: 1 addition & 1 deletion cmd/imports/cronjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@
package imports

import (
_ "github.com/go-sigma/sigma/pkg/cronjob"
_ "github.com/go-sigma/sigma/pkg/cronjob/builder"
)
7 changes: 0 additions & 7 deletions cmd/imports/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,3 @@
// limitations under the License.

package imports

import (
_ "github.com/go-sigma/sigma/pkg/modules/locker/database"
_ "github.com/go-sigma/sigma/pkg/modules/locker/redis"
_ "github.com/go-sigma/sigma/pkg/modules/workq/database"
_ "github.com/go-sigma/sigma/pkg/modules/workq/kafka"
)
5 changes: 4 additions & 1 deletion conf/config-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ cache:
workqueue:
# the workqueue type available: redis, kafka, database
type: database
redis: {}
redis:
concurrency: 10
kafka: {}
database: {}

locker:
# the locker type available: redis, database
Expand Down
4 changes: 4 additions & 0 deletions conf/config-full.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ cache:
workqueue:
# the workqueue type available: redis, kafka, database
type: redis
redis:
concurrency: 10
kafka: {}
database: {}

locker:
# the locker type available: redis, database
Expand Down
3 changes: 2 additions & 1 deletion conf/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ cache:
workqueue:
# the workqueue type available: redis, kafka, database
type: redis
redis: {}
redis:
concurrency: 10
kafka: {}
database: {}

Expand Down
15 changes: 14 additions & 1 deletion pkg/configs/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,22 @@ type ConfigurationCache struct {
Database ConfigurationCacheDatabase `yaml:"database"`
}

type ConfigurationWorkQueueRedis struct {
Concurrency int `yaml:"concurrency"`
}

type ConfigurationWorkQueueDatabase struct {
}

type ConfigurationWorkQueueKafka struct {
}

// ConfigurationWorkQueue ...
type ConfigurationWorkQueue struct {
Type enums.WorkQueueType `yaml:"type"`
Type enums.WorkQueueType `yaml:"type"`
Redis ConfigurationWorkQueueRedis `yaml:"redis"`
Database ConfigurationWorkQueueDatabase `yaml:"database"`
Kafka ConfigurationWorkQueueKafka `yaml:"kafka"`
}

// ConfigurationLocker ...
Expand Down
34 changes: 19 additions & 15 deletions pkg/cronjob/builder.go → pkg/cronjob/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@ import (
"context"
"time"

"github.com/go-redsync/redsync/v4"
"github.com/go-redsync/redsync/v4/redis/goredis/v9"
"github.com/robfig/cron/v3"
"github.com/rs/zerolog/log"

"github.com/go-sigma/sigma/pkg/consts"
"github.com/go-sigma/sigma/pkg/cronjob"
"github.com/go-sigma/sigma/pkg/daemon"
"github.com/go-sigma/sigma/pkg/dal"
"github.com/go-sigma/sigma/pkg/dal/dao"
"github.com/go-sigma/sigma/pkg/dal/query"
"github.com/go-sigma/sigma/pkg/modules/locker"
"github.com/go-sigma/sigma/pkg/modules/timewheel"
"github.com/go-sigma/sigma/pkg/service/builder"
"github.com/go-sigma/sigma/pkg/types"
Expand All @@ -39,16 +38,16 @@ import (
var builderTw timewheel.TimeWheel

func init() {
starter = append(starter, builderJob)
stopper = append(stopper, func() {
cronjob.Starter = append(cronjob.Starter, builderJob)
cronjob.Stopper = append(cronjob.Stopper, func() {
if builderTw != nil {
builderTw.Stop()
}
})
}

func builderJob() {
builderTw = timewheel.NewTimeWheel(context.Background(), cronjobIterDuration)
builderTw = timewheel.NewTimeWheel(context.Background(), cronjob.CronjobIterDuration)

runner := builderRunner{
builderServiceFactory: dao.NewBuilderServiceFactory(),
Expand All @@ -61,21 +60,26 @@ type builderRunner struct {
}

func (r builderRunner) runner(ctx context.Context, tw timewheel.TimeWheel) {
rs := redsync.New(goredis.NewPool(dal.RedisCli))
mutex := rs.NewMutex(consts.LockerCronjobBuilder, redsync.WithRetryDelay(time.Second*3), redsync.WithTries(10), redsync.WithExpiry(time.Second*30))
err := mutex.Lock()
locker, err := locker.New()
if err != nil {
log.Error().Err(err).Msg("Require redis lock failed")
log.Error().Err(err).Msg("New locker failed")
return
}
lock, err := locker.Lock(context.Background(), consts.LockerMigration, time.Second*30)
if err != nil {
log.Error().Err(err).Msg("Cronjob builder get locker failed")
return
}
defer func() {
if ok, err := mutex.Unlock(); !ok || err != nil {
log.Error().Err(err).Msg("Release redis lock failed")
err := lock.Unlock()
if err != nil {
log.Error().Err(err).Msg("Migrate locker release failed")
}
}()

ctx = log.Logger.WithContext(ctx)
builderService := r.builderServiceFactory.New()
builderObjs, err := builderService.GetByNextTrigger(ctx, time.Now(), maxJob)
builderObjs, err := builderService.GetByNextTrigger(ctx, time.Now(), cronjob.MaxJob)
if err != nil {
log.Error().Err(err).Msg("Get builders by next trigger failed")
return
Expand Down Expand Up @@ -125,7 +129,7 @@ func (r builderRunner) runner(ctx context.Context, tw timewheel.TimeWheel) {
log.Error().Interface("builder", builderObj).Err(err).Msg("Cronjob create builder runner failed")
}
}
if len(builderObjs) >= maxJob {
tw.TickNext(tickNextDuration)
if len(builderObjs) >= cronjob.MaxJob {
tw.TickNext(cronjob.TickNextDuration)
}
}
14 changes: 8 additions & 6 deletions pkg/cronjob/cronjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,22 @@ import "time"

const (
// cronjobIterDuration each job iterate duration
cronjobIterDuration = time.Second * 30
CronjobIterDuration = time.Second * 30
// tickNextDuration tick the next runner if current get full of jobs
tickNextDuration = time.Second * 3
TickNextDuration = time.Second * 3
// maxJob each iterate get the maximum jobs
maxJob = 100
MaxJob = 100
)

var starter []func()
// Starter ...
var Starter []func()

var stopper []func()
// Stopper ...
var Stopper []func()

// Initialize ...
func Initialize() {
for _, start := range starter {
for _, start := range Starter {
start()
}
}
3 changes: 2 additions & 1 deletion pkg/daemon/sbom/sbom.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@ import (
"github.com/go-sigma/sigma/pkg/daemon"
"github.com/go-sigma/sigma/pkg/dal/models"
"github.com/go-sigma/sigma/pkg/modules/workq"
"github.com/go-sigma/sigma/pkg/modules/workq/definition"
"github.com/go-sigma/sigma/pkg/types/enums"
"github.com/go-sigma/sigma/pkg/utils"
"github.com/go-sigma/sigma/pkg/utils/compress"
)

func init() {
workq.TopicConsumers[enums.DaemonSbom.String()] = workq.Consumer{
workq.TopicHandlers[enums.DaemonSbom.String()] = definition.Consumer{
Handler: daemon.DecoratorArtifact(runner),
MaxRetry: 6,
Concurrency: 10,
Expand Down
3 changes: 2 additions & 1 deletion pkg/daemon/vulnerability/vulnerability.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,14 @@ import (
"github.com/go-sigma/sigma/pkg/daemon"
"github.com/go-sigma/sigma/pkg/dal/models"
"github.com/go-sigma/sigma/pkg/modules/workq"
"github.com/go-sigma/sigma/pkg/modules/workq/definition"
"github.com/go-sigma/sigma/pkg/types/enums"
"github.com/go-sigma/sigma/pkg/utils"
"github.com/go-sigma/sigma/pkg/utils/compress"
)

func init() {
workq.TopicConsumers[enums.DaemonVulnerability.String()] = workq.Consumer{
workq.TopicHandlers[enums.DaemonVulnerability.String()] = definition.Consumer{
Handler: daemon.DecoratorArtifact(runner),
MaxRetry: 6,
Concurrency: 10,
Expand Down
14 changes: 14 additions & 0 deletions pkg/dal/dao/cache.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
// Copyright 2023 sigma
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dao

import (
Expand Down
14 changes: 14 additions & 0 deletions pkg/dal/models/cache.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
// Copyright 2023 sigma
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package models

import (
Expand Down
14 changes: 14 additions & 0 deletions pkg/modules/cacher/definition/definition.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
// Copyright 2023 sigma
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package definition

import (
Expand Down
14 changes: 14 additions & 0 deletions pkg/modules/locker/definition/definition.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
// Copyright 2023 sigma
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package definition

import (
Expand Down
18 changes: 5 additions & 13 deletions pkg/modules/workq/database/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ package database
import (
"context"
"errors"
"path"
"reflect"
"strings"
"time"

Expand All @@ -28,20 +26,14 @@ import (

"github.com/go-sigma/sigma/pkg/configs"
"github.com/go-sigma/sigma/pkg/dal/dao"
"github.com/go-sigma/sigma/pkg/modules/workq"
"github.com/go-sigma/sigma/pkg/modules/workq/definition"
"github.com/go-sigma/sigma/pkg/types/enums"
)

func init() {
workq.ConsumerClientFactories[path.Base(reflect.TypeOf(consumerFactory{}).PkgPath())] = &consumerFactory{}
}

type consumerFactory struct{}

// NewWorkQueueConsumer ...
func (f consumerFactory) New(_ configs.Configuration) error {
for topic, c := range workq.TopicConsumers {
go func(consumer workq.Consumer, topic string) {
func NewWorkQueueConsumer(_ configs.Configuration, topicHandlers map[string]definition.Consumer) error {
for topic, c := range topicHandlers {
go func(consumer definition.Consumer, topic string) {
handler := &consumerHandler{
processingSemaphore: make(chan struct{}, consumer.Concurrency),
consumer: consumer,
Expand All @@ -54,7 +46,7 @@ func (f consumerFactory) New(_ configs.Configuration) error {

type consumerHandler struct {
processingSemaphore chan struct{}
consumer workq.Consumer
consumer definition.Consumer
}

func (h *consumerHandler) Consume(topic string) {
Expand Down
12 changes: 2 additions & 10 deletions pkg/modules/workq/database/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,22 @@ package database

import (
"context"
"path"
"reflect"

"github.com/google/uuid"

"github.com/go-sigma/sigma/pkg/configs"
"github.com/go-sigma/sigma/pkg/dal/dao"
"github.com/go-sigma/sigma/pkg/dal/models"
"github.com/go-sigma/sigma/pkg/modules/workq"
"github.com/go-sigma/sigma/pkg/modules/workq/definition"
"github.com/go-sigma/sigma/pkg/utils"
)

func init() {
workq.ProducerClientFactories[path.Base(reflect.TypeOf(producerFactory{}).PkgPath())] = &producerFactory{}
}

type producer struct {
workQueueServiceFactory dao.WorkQueueServiceFactory
}

type producerFactory struct{}

// NewWorkQueueProducer ...
func (f producerFactory) New(_ configs.Configuration) (workq.WorkQueueProducer, error) {
func NewWorkQueueProducer(_ configs.Configuration, _ map[string]definition.Consumer) (definition.WorkQueueProducer, error) {
p := &producer{
workQueueServiceFactory: dao.NewWorkQueueServiceFactory(),
}
Expand Down
Loading

0 comments on commit b72c6c6

Please sign in to comment.