Skip to content

Commit

Permalink
Implement general components.
Browse files Browse the repository at this point in the history
  • Loading branch information
illenko committed Aug 18, 2024
0 parents commit 5e25912
Show file tree
Hide file tree
Showing 25 changed files with 646 additions and 0 deletions.
34 changes: 34 additions & 0 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# .github/workflows/go.yml
name: Go CI

on:
push:
branches: [ "main" ]
pull_request:
branches: [ "main" ]

jobs:
build:
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v3

- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: '1.22.3'

- name: Install dependencies
run: go mod tidy

- name: Install golangci-lint
run: |
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.50.1
- name: Run goimports
run: goimports -d .

- name: Run golangci-lint
run: golangci-lint run
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Messaging driven architecture with RabbitMQ

### Stack
- Go 1.22.3
- RabbitMQ 3.13.6
- amqp091-go v1.10.0

### Architecture
![img.png](img.png)
27 changes: 27 additions & 0 deletions common/amqpmodel/model.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package amqpmodel

import "github.com/google/uuid"

type OrderAction struct {
ID uuid.UUID `json:"id"`
CustomerID uuid.UUID `json:"customerId"`
CardID uuid.UUID `json:"cardId"`
ItemID uuid.UUID `json:"itemId"`
Price int `json:"price"`
}

type OrderActionResultStatus string

const (
OrderActionResultStatusSuccess OrderActionResultStatus = "success"
OrderActionResultStatusFailed OrderActionResultStatus = "failed"
)

type OrderActionResult struct {
ID uuid.UUID `json:"id"`
CustomerID uuid.UUID `json:"customerId"`
CardID uuid.UUID `json:"cardId"`
ItemID uuid.UUID `json:"itemId"`
Price int `json:"price"`
Status OrderActionResultStatus `json:"status"`
}
18 changes: 18 additions & 0 deletions common/connection/connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package connection

import amqp "github.com/rabbitmq/amqp091-go"

func ConnectToRabbitMQ() (*amqp.Connection, *amqp.Channel, error) {
conn, err := amqp.Dial("amqp://user:password@localhost:5672/")
if err != nil {
return nil, nil, err
}

ch, err := conn.Channel()
if err != nil {
conn.Close()
return nil, nil, err
}

return conn, ch, nil
}
36 changes: 36 additions & 0 deletions common/consumer/order_action.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package consumer

import (
"encoding/json"
"log"

"github.com/illenko/common/amqpmodel"
amqp "github.com/rabbitmq/amqp091-go"
)

func ConsumeOrderAction(ch *amqp.Channel, queueName string, processFunc func(*amqp.Channel, amqpmodel.OrderAction)) {
msgs, err := ch.Consume(
queueName,
"",
true,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("Failed to register a consumer: %v", err)
}

for msg := range msgs {
var orderAction amqpmodel.OrderAction
err := json.Unmarshal(msg.Body, &orderAction)
if err != nil {
log.Printf("Failed to unmarshal message: %v", err)
continue
}

log.Printf("Received a message from %v: %v", queueName, orderAction)
processFunc(ch, orderAction)
}
}
8 changes: 8 additions & 0 deletions common/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
module github.com/illenko/common

go 1.22.3

require (
github.com/google/uuid v1.6.0
github.com/rabbitmq/amqp091-go v1.10.0
)
14 changes: 14 additions & 0 deletions common/mapper/order_action_result.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package mapper

import "github.com/illenko/common/amqpmodel"

func ToOrderActionResult(orderAction amqpmodel.OrderAction, status amqpmodel.OrderActionResultStatus) amqpmodel.OrderActionResult {
return amqpmodel.OrderActionResult{
ID: orderAction.ID,
CustomerID: orderAction.CustomerID,
CardID: orderAction.CardID,
ItemID: orderAction.ItemID,
Price: orderAction.Price,
Status: status,
}
}
31 changes: 31 additions & 0 deletions common/publisher/order_result.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package publisher

import (
"encoding/json"
"log"

"github.com/illenko/common/amqpmodel"
amqp "github.com/rabbitmq/amqp091-go"
)

func PublishOrderResult(ch *amqp.Channel, exchangeName, routingKey string, orderResult amqpmodel.OrderActionResult) error {
body, err := json.Marshal(orderResult)
if err != nil {
return err
}

err = ch.Publish(
exchangeName,
routingKey,
false,
false,
amqp.Publishing{
ContentType: "application/json",
Body: body,
})

if err == nil {
log.Printf("Order result published: %v in %v with %v rk", orderResult, exchangeName, routingKey)
}
return err
}
11 changes: 11 additions & 0 deletions environment/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
version: '3.8'

services:
rabbitmq:
image: rabbitmq:3-management
ports:
- "5672:5672" # RabbitMQ default port
- "15672:15672" # RabbitMQ management plugin port
environment:
RABBITMQ_DEFAULT_USER: user
RABBITMQ_DEFAULT_PASS: password
Binary file added img.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
25 changes: 25 additions & 0 deletions order-service/expired_actions_consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package main

import (
"log"

"github.com/illenko/common/amqpmodel"
"github.com/illenko/common/consumer"
amqp "github.com/rabbitmq/amqp091-go"
)

func consumeExpiredProductReservation(ch *amqp.Channel) {
consumer.ConsumeOrderAction(ch, "dlx-product-reservation-action-queue", processExpiredProductReservation)
}

func processExpiredProductReservation(_ *amqp.Channel, orderAction amqpmodel.OrderAction) {
log.Printf("Processing expired product reservation: %v and other work will be done...", orderAction)
}

func consumeExpiredPayment(ch *amqp.Channel) {
consumer.ConsumeOrderAction(ch, "dlx-payment-action-queue", processExpiredPayment)
}

func processExpiredPayment(_ *amqp.Channel, orderAction amqpmodel.OrderAction) {
log.Printf("Processing expired payment: %v and other work will be done...", orderAction)
}
11 changes: 11 additions & 0 deletions order-service/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module github.com/illenko/order-service

go 1.22.3

require (
github.com/google/uuid v1.6.0
github.com/illenko/common v0.0.0-00010101000000-000000000000
github.com/rabbitmq/amqp091-go v1.10.0
)

replace github.com/illenko/common => ../common
6 changes: 6 additions & 0 deletions order-service/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
27 changes: 27 additions & 0 deletions order-service/http_model.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package main

import "github.com/google/uuid"

type OrderStatus string

const (
Pending OrderStatus = "pending"
Completed OrderStatus = "completed"
Cancelled OrderStatus = "cancelled"
)

type OrderRequest struct {
CustomerID uuid.UUID `json:"customerId"`
CardID uuid.UUID `json:"cardId"`
ItemID uuid.UUID `json:"itemId"`
Price int `json:"price"`
}

type OrderResponse struct {
ID uuid.UUID `json:"id"`
CustomerID uuid.UUID `json:"customerId"`
CardID uuid.UUID `json:"cardId"`
ItemID uuid.UUID `json:"itemId"`
Price int `json:"price"`
Status OrderStatus `json:"status"`
}
68 changes: 68 additions & 0 deletions order-service/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package main

import (
"log"

"github.com/google/uuid"
"github.com/illenko/common/amqpmodel"
"github.com/illenko/common/connection"
)

func main() {
conn, ch, err := connection.ConnectToRabbitMQ()
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %v", err)
}
defer conn.Close()
defer ch.Close()

err = declareDLXExchange(ch)
if err != nil {
log.Fatalf("Failed to declare DLX exchange: %v", err)
}

err = setupDLXQueues(ch, []QueueConfig{
{Name: "dlx-product-reservation-action-queue"},
{Name: "dlx-payment-action-queue"},
})
if err != nil {
log.Fatalf("Failed to setup DLX queues: %v", err)
}

err = setupExchangeAndQueues(ch, "order-action-exchange", []QueueConfig{
{Name: "product-reservation-action-queue", TTL: 15000, DLX: "dlx-exchange"},
{Name: "product-cancellation-action-queue"},
{Name: "payment-action-queue", TTL: 60000, DLX: "dlx-exchange"},
})
if err != nil {
log.Fatalf("Failed to setup order-action-exchange: %v", err)
}

err = setupExchangeAndQueues(ch, "order-result-exchange", []QueueConfig{
{Name: "product-reservation-result-queue"},
{Name: "payment-result-queue"},
})
if err != nil {
log.Fatalf("Failed to setup order-result-exchange: %v", err)
}

orderAction := amqpmodel.OrderAction{
ID: uuid.New(),
CustomerID: uuid.New(),
CardID: uuid.New(),
ItemID: uuid.New(),
Price: 100,
}

err = publishOrderAction(ch, "order-action-exchange", "product-reservation-action-queue", orderAction)
if err != nil {
log.Fatalf("Failed to publish order: %v", err)
}

go consumeExpiredPayment(ch)
go consumeExpiredProductReservation(ch)
go consumeProductReservationMessages(ch)
go consumePaymentMessages(ch)

select {}
}
31 changes: 31 additions & 0 deletions order-service/publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package main

import (
"encoding/json"
"log"

"github.com/illenko/common/amqpmodel"
amqp "github.com/rabbitmq/amqp091-go"
)

func publishOrderAction(ch *amqp.Channel, exchangeName, routingKey string, order amqpmodel.OrderAction) error {
body, err := json.Marshal(order)
if err != nil {
return err
}

err = ch.Publish(
exchangeName,
routingKey,
false,
false,
amqp.Publishing{
ContentType: "application/json",
Body: body,
})

if err == nil {
log.Printf("Order action published: %v in %v with %v rk", order, exchangeName, routingKey)
}
return err
}
Loading

0 comments on commit 5e25912

Please sign in to comment.