Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: rename collection to collector #81

Merged
merged 1 commit into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"time"

pubsubsdk "cloud.google.com/go/pubsub"
"github.com/raystack/raccoon/collection"
"github.com/raystack/raccoon/collector"
"github.com/raystack/raccoon/config"
"github.com/raystack/raccoon/logger"
"github.com/raystack/raccoon/metrics"
Expand All @@ -30,7 +30,7 @@ type Publisher interface {

// StartServer starts the server
func StartServer(ctx context.Context, cancel context.CancelFunc) {
bufferChannel := make(chan collection.CollectRequest, config.Worker.ChannelSize)
bufferChannel := make(chan collector.CollectRequest, config.Worker.ChannelSize)
httpServices := services.Create(bufferChannel)
logger.Info("Start Server -->")
httpServices.Start(ctx, cancel)
Expand All @@ -50,7 +50,7 @@ func StartServer(ctx context.Context, cancel context.CancelFunc) {
go shutDownServer(ctx, cancel, httpServices, bufferChannel, workerPool, publisher)
}

func shutDownServer(ctx context.Context, cancel context.CancelFunc, httpServices services.Services, bufferChannel chan collection.CollectRequest, workerPool *worker.Pool, pub Publisher) {
func shutDownServer(ctx context.Context, cancel context.CancelFunc, httpServices services.Services, bufferChannel chan collector.CollectRequest, workerPool *worker.Pool, pub Publisher) {
signalChan := make(chan os.Signal)
signal.Notify(signalChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
for {
Expand Down
2 changes: 1 addition & 1 deletion collection/collector.go → collector/collector.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package collection
package collector

import (
"context"
Expand Down
2 changes: 1 addition & 1 deletion collection/mock.go → collector/mock.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package collection
package collector

import (
"context"
Expand Down
2 changes: 1 addition & 1 deletion collection/service.go → collector/service.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package collection
package collector

import (
"context"
Expand Down
2 changes: 1 addition & 1 deletion collection/service_test.go → collector/service_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package collection
package collector

import (
"reflect"
Expand Down
8 changes: 4 additions & 4 deletions services/grpc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"errors"
"time"

"github.com/raystack/raccoon/collection"
"github.com/raystack/raccoon/collector"
"github.com/raystack/raccoon/config"
"github.com/raystack/raccoon/identification"
"github.com/raystack/raccoon/logger"
Expand All @@ -15,7 +15,7 @@ import (
)

type Handler struct {
C collection.Collector
C collector.Collector
pb.UnimplementedEventServiceServer
}

Expand Down Expand Up @@ -49,7 +49,7 @@ func (h *Handler) SendEvent(ctx context.Context, req *pb.SendEventRequest) (*pb.
h.sendEventCounters(req.Events, identifier.Group)

responseChannel := make(chan *pb.SendEventResponse, 1)
h.C.Collect(ctx, &collection.CollectRequest{
h.C.Collect(ctx, &collector.CollectRequest{
ConnectionIdentifier: identifier,
TimeConsumed: timeConsumed,
SendEventRequest: req,
Expand All @@ -59,7 +59,7 @@ func (h *Handler) SendEvent(ctx context.Context, req *pb.SendEventRequest) (*pb.

}

func (h *Handler) Ack(responseChannel chan *pb.SendEventResponse, reqGuid, connGroup string) collection.AckFunc {
func (h *Handler) Ack(responseChannel chan *pb.SendEventResponse, reqGuid, connGroup string) collector.AckFunc {
switch config.Event.Ack {
case config.Asynchronous:
responseChannel <- &pb.SendEventResponse{
Expand Down
6 changes: 3 additions & 3 deletions services/grpc/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"reflect"
"testing"

"github.com/raystack/raccoon/collection"
"github.com/raystack/raccoon/collector"
"github.com/raystack/raccoon/config"
"github.com/raystack/raccoon/logger"
"github.com/raystack/raccoon/metrics"
Expand All @@ -23,7 +23,7 @@ func (v void) Write(_ []byte) (int, error) {

func TestHandler_SendEvent(t *testing.T) {
type fields struct {
C collection.Collector
C collector.Collector
UnimplementedEventServiceServer pb.UnimplementedEventServiceServer
}
type args struct {
Expand All @@ -33,7 +33,7 @@ func TestHandler_SendEvent(t *testing.T) {

logger.SetOutput(void{})
metrics.SetVoid()
collector := new(collection.MockCollector)
collector := new(collector.MockCollector)
ctx := context.Background()
meta := metadata.MD{}
meta.Set(config.ServerWs.ConnGroupHeader, "group")
Expand Down
6 changes: 3 additions & 3 deletions services/grpc/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,18 @@ import (
"fmt"
"net"

"github.com/raystack/raccoon/collection"
"github.com/raystack/raccoon/collector"
"github.com/raystack/raccoon/config"
pb "github.com/raystack/raccoon/proto"
"google.golang.org/grpc"
)

type Service struct {
Collector collection.Collector
Collector collector.Collector
s *grpc.Server
}

func NewGRPCService(c collection.Collector) *Service {
func NewGRPCService(c collector.Collector) *Service {
server := grpc.NewServer()
pb.RegisterEventServiceServer(server, &Handler{C: c})
return &Service{
Expand Down
10 changes: 5 additions & 5 deletions services/rest/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"net/http"
"time"

"github.com/raystack/raccoon/collection"
"github.com/raystack/raccoon/collector"
"github.com/raystack/raccoon/config"
"github.com/raystack/raccoon/deserialization"
"github.com/raystack/raccoon/identification"
Expand All @@ -28,10 +28,10 @@ type serDe struct {
}
type Handler struct {
serDeMap map[string]*serDe
collector collection.Collector
collector collector.Collector
}

func NewHandler(collector collection.Collector) *Handler {
func NewHandler(collector collector.Collector) *Handler {
serDeMap := make(map[string]*serDe)
serDeMap[ContentJSON] = &serDe{
serializer: serialization.SerializeJSON,
Expand Down Expand Up @@ -128,7 +128,7 @@ func (h *Handler) RESTAPIHandler(rw http.ResponseWriter, r *http.Request) {
h.sendEventCounters(req.Events, identifier.Group)

resChannel := make(chan struct{}, 1)
h.collector.Collect(r.Context(), &collection.CollectRequest{
h.collector.Collect(r.Context(), &collector.CollectRequest{
ConnectionIdentifier: identifier,
TimeConsumed: timeConsumed,
SendEventRequest: req,
Expand All @@ -137,7 +137,7 @@ func (h *Handler) RESTAPIHandler(rw http.ResponseWriter, r *http.Request) {
<-resChannel
}

func (h *Handler) Ack(rw http.ResponseWriter, resChannel chan struct{}, s serialization.SerializeFunc, reqGuid string, connGroup string) collection.AckFunc {
func (h *Handler) Ack(rw http.ResponseWriter, resChannel chan struct{}, s serialization.SerializeFunc, reqGuid string, connGroup string) collector.AckFunc {
res := &Response{
SendEventResponse: &pb.SendEventResponse{},
}
Expand Down
6 changes: 3 additions & 3 deletions services/rest/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"time"

"github.com/gorilla/mux"
"github.com/raystack/raccoon/collection"
"github.com/raystack/raccoon/collector"
"github.com/raystack/raccoon/config"
"github.com/raystack/raccoon/metrics"
"github.com/raystack/raccoon/middleware"
Expand All @@ -15,11 +15,11 @@ import (
)

type Service struct {
Collector collection.Collector
Collector collector.Collector
s *http.Server
}

func NewRestService(c collection.Collector) *Service {
func NewRestService(c collector.Collector) *Service {
pingChannel := make(chan connection.Conn, config.ServerWs.ServerMaxConn)
wh := websocket.NewHandler(pingChannel, c)
go websocket.Pinger(pingChannel, config.ServerWs.PingerSize, config.ServerWs.PingInterval, config.ServerWs.WriteWaitInterval)
Expand Down
10 changes: 5 additions & 5 deletions services/rest/websocket/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"time"

"github.com/gorilla/websocket"
"github.com/raystack/raccoon/collection"
"github.com/raystack/raccoon/collector"
"github.com/raystack/raccoon/config"
"github.com/raystack/raccoon/deserialization"
"github.com/raystack/raccoon/logger"
Expand All @@ -23,7 +23,7 @@ type serDe struct {
type Handler struct {
upgrader *connection.Upgrader
serdeMap map[int]*serDe
collector collection.Collector
collector collector.Collector
PingChannel chan connection.Conn
}

Expand All @@ -41,7 +41,7 @@ func getSerDeMap() map[int]*serDe {
return serDeMap
}

func NewHandler(pingC chan connection.Conn, collector collection.Collector) *Handler {
func NewHandler(pingC chan connection.Conn, collector collector.Collector) *Handler {
ugConfig := connection.UpgraderConfig{
ReadBufferSize: config.ServerWs.ReadBufferSize,
WriteBufferSize: config.ServerWs.WriteBufferSize,
Expand Down Expand Up @@ -115,7 +115,7 @@ func (h *Handler) HandlerWSEvents(w http.ResponseWriter, r *http.Request) {
metrics.Increment("batches_read_total", map[string]string{"status": "success", "conn_group": conn.Identifier.Group, "reason": "NA"})
h.sendEventCounters(payload.Events, conn.Identifier.Group)

h.collector.Collect(r.Context(), &collection.CollectRequest{
h.collector.Collect(r.Context(), &collector.CollectRequest{
ConnectionIdentifier: conn.Identifier,
TimeConsumed: timeConsumed,
SendEventRequest: payload,
Expand All @@ -124,7 +124,7 @@ func (h *Handler) HandlerWSEvents(w http.ResponseWriter, r *http.Request) {
}
}

func (h *Handler) Ack(conn connection.Conn, resChannel chan AckInfo, s serialization.SerializeFunc, messageType int, reqGuid string, timeConsumed time.Time) collection.AckFunc {
func (h *Handler) Ack(conn connection.Conn, resChannel chan AckInfo, s serialization.SerializeFunc, messageType int, reqGuid string, timeConsumed time.Time) collector.AckFunc {
switch config.Event.Ack {
case config.Asynchronous:
writeSuccessResponse(conn, s, messageType, reqGuid)
Expand Down
6 changes: 3 additions & 3 deletions services/rest/websocket/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

"github.com/gorilla/mux"
"github.com/gorilla/websocket"
"github.com/raystack/raccoon/collection"
"github.com/raystack/raccoon/collector"
"github.com/raystack/raccoon/logger"
"github.com/raystack/raccoon/metrics"
pb "github.com/raystack/raccoon/proto"
Expand Down Expand Up @@ -66,7 +66,7 @@ func TestNewHandler(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := NewHandler(tt.args.pingC, &collection.MockCollector{}); !reflect.DeepEqual(got, tt.want) {
if got := NewHandler(tt.args.pingC, &collector.MockCollector{}); !reflect.DeepEqual(got, tt.want) {
t.Errorf("NewHandler() = %v, want %v", got, tt.want)
}
})
Expand Down Expand Up @@ -188,7 +188,7 @@ func TestHandler_GETHandlerWSEvents(t *testing.T) {
}

func getRouter(hlr *Handler) http.Handler {
collector := new(collection.MockCollector)
collector := new(collector.MockCollector)
collector.On("Collect", mock.Anything, mock.Anything).Return(nil)
router := mux.NewRouter()
subRouter := router.PathPrefix("/api/v1").Subrouter()
Expand Down
6 changes: 3 additions & 3 deletions services/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"net/http"

"github.com/raystack/raccoon/collection"
"github.com/raystack/raccoon/collector"
"github.com/raystack/raccoon/logger"
"github.com/raystack/raccoon/services/grpc"
"github.com/raystack/raccoon/services/pprof"
Expand Down Expand Up @@ -43,8 +43,8 @@ func (s *Services) Shutdown(ctx context.Context) {
}
}

func Create(b chan collection.CollectRequest) Services {
c := collection.NewChannelCollector(b)
func Create(b chan collector.CollectRequest) Services {
c := collector.NewChannelCollector(b)
return Services{
b: []bootstrapper{
grpc.NewGRPCService(c),
Expand Down
6 changes: 3 additions & 3 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"sync"
"time"

"github.com/raystack/raccoon/collection"
"github.com/raystack/raccoon/collector"
"github.com/raystack/raccoon/logger"
"github.com/raystack/raccoon/metrics"
pb "github.com/raystack/raccoon/proto"
Expand All @@ -25,13 +25,13 @@ type Producer interface {
type Pool struct {
Size int
deliveryChannelSize int
EventsChannel <-chan collection.CollectRequest
EventsChannel <-chan collector.CollectRequest
producer Producer
wg sync.WaitGroup
}

// CreateWorkerPool create new Pool struct given size and EventsChannel worker.
func CreateWorkerPool(size int, eventsChannel <-chan collection.CollectRequest, deliveryChannelSize int, producer Producer) *Pool {
func CreateWorkerPool(size int, eventsChannel <-chan collector.CollectRequest, deliveryChannelSize int, producer Producer) *Pool {
return &Pool{
Size: size,
deliveryChannelSize: deliveryChannelSize,
Expand Down
8 changes: 4 additions & 4 deletions worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"testing"
"time"

"github.com/raystack/raccoon/collection"
"github.com/raystack/raccoon/collector"
"github.com/raystack/raccoon/identification"
pb "github.com/raystack/raccoon/proto"
"github.com/stretchr/testify/assert"
Expand All @@ -14,7 +14,7 @@ import (
)

func TestWorker(t *testing.T) {
request := &collection.CollectRequest{
request := &collector.CollectRequest{
ConnectionIdentifier: identification.Identifier{
ID: "12345",
Group: "viewer",
Expand All @@ -31,7 +31,7 @@ func TestWorker(t *testing.T) {
m.On("Timing", "processing.latency", mock.Anything, "")
m.On("Count", "kafka_messages_delivered_total", 0, "success=true")
m.On("Count", "kafka_messages_delivered_total", 0, "success=false")
bc := make(chan collection.CollectRequest, 2)
bc := make(chan collector.CollectRequest, 2)
worker := Pool{
Size: 1,
deliveryChannelSize: 0,
Expand All @@ -54,7 +54,7 @@ func TestWorker(t *testing.T) {
t.Run("Flush", func(t *testing.T) {
t.Run("Should block until all messages is processed", func(t *testing.T) {
kp := mockKafkaPublisher{}
bc := make(chan collection.CollectRequest, 2)
bc := make(chan collector.CollectRequest, 2)
m := &mockMetric{}
m.On("Timing", "processing.latency", mock.Anything, "")
m.On("Count", "kafka_messages_delivered_total", 0, "success=false")
Expand Down
Loading