Skip to content
This repository has been archived by the owner on Apr 5, 2023. It is now read-only.

Eh main/feature/consumer #18

Merged
merged 112 commits into from
May 4, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
112 commits
Select commit Hold shift + click to select a range
f60cb61
feat: add worker class to manager the active job
eHattori Apr 11, 2022
8ad2bc1
feat: add libraries to gemfile
eHattori Apr 11, 2022
642d029
feat: add config to rubocop
eHattori Apr 11, 2022
d9c66b4
feat: add worker to boot app
eHattori Apr 11, 2022
f9be3ab
feat: add default worker options
eHattori Apr 11, 2022
c6b6bba
feat: add self.included to add attr in runtime
eHattori Apr 11, 2022
9597d62
feat: add .env.local to dev enviroment
eHattori Apr 12, 2022
52bdc63
refact: url of queue
eHattori Apr 12, 2022
21b9a52
feat: implement a perform_async method
eHattori Apr 12, 2022
6978417
feat: create a parent broker and impl aws_broker
eHattori Apr 12, 2022
44efd09
test: implement fail to broker
eHattori Apr 13, 2022
80a06e2
feat: add config to broker
eHattori Apr 13, 2022
3f4a254
feat: add resourceError
eHattori Apr 14, 2022
0df13ca
fix: stub problem and configure poller
eHattori Apr 14, 2022
496b17b
feat: a test to aws_broker
eHattori Apr 14, 2022
3d6922d
feat: adjust erros to aws_broker
eHattori Apr 15, 2022
20b03f4
feat: adjust options
eHattori Apr 15, 2022
6a8716f
feat: add test to poller success
eHattori Apr 15, 2022
fc0ec26
refact: options to isolate arguments
eHattori Apr 15, 2022
3de01cf
refact: build_instance_broker
eHattori Apr 15, 2022
2e3ad1f
refact: perform_asyn to process_message
eHattori Apr 15, 2022
45296e5
Ignore log folder
ferreira-mev Apr 18, 2022
ba704e9
Add "spec" file for the logger module
ferreira-mev Apr 18, 2022
a3c4b65
Outline Logging module (shared logger object provider)
ferreira-mev Apr 18, 2022
65b32c6
Format JSON output, improve static method inclusion, require logging …
ferreira-mev Apr 18, 2022
14b84c6
Refactor to include Logging as part of PipefyMessage
ferreira-mev Apr 18, 2022
dd2394e
Include calling obj information on JSON output
ferreira-mev Apr 19, 2022
4deebe1
Add preliminary comments and logs to consumer implementation
ferreira-mev Apr 19, 2022
715bfcb
Rephrase log message
ferreira-mev Apr 19, 2022
6a59709
feat: add worker class to manager the active job
eHattori Apr 11, 2022
4cb8c1c
feat: add libraries to gemfile
eHattori Apr 11, 2022
913f990
feat: add config to rubocop
eHattori Apr 11, 2022
df2b7cf
feat: add worker to boot app
eHattori Apr 11, 2022
e5ff577
feat: add default worker options
eHattori Apr 11, 2022
b40d85f
feat: add self.included to add attr in runtime
eHattori Apr 11, 2022
cb790c3
feat: add .env.local to dev enviroment
eHattori Apr 12, 2022
f2cf8a0
refact: url of queue
eHattori Apr 12, 2022
7c82f84
feat: implement a perform_async method
eHattori Apr 12, 2022
1045cf4
feat: create a parent broker and impl aws_broker
eHattori Apr 12, 2022
8155d8c
test: implement fail to broker
eHattori Apr 13, 2022
c7e2dd6
feat: add config to broker
eHattori Apr 13, 2022
109a8f3
feat: add resourceError
eHattori Apr 14, 2022
c5ba545
fix: stub problem and configure poller
eHattori Apr 14, 2022
0b1db3f
feat: a test to aws_broker
eHattori Apr 14, 2022
4d7dd50
feat: adjust erros to aws_broker
eHattori Apr 15, 2022
374fa3e
feat: adjust options
eHattori Apr 15, 2022
7f68344
feat: add test to poller success
eHattori Apr 15, 2022
40b85ab
refact: options to isolate arguments
eHattori Apr 15, 2022
eeac6a2
refact: build_instance_broker
eHattori Apr 15, 2022
dc6f246
refact: perform_asyn to process_message
eHattori Apr 15, 2022
9eb73b4
Merge branch 'eh-main/feature/consumer' of github.com:pipefy/pipefy_m…
eHattori Apr 19, 2022
0e2385f
Merge CustomLogger class config into Logging module
ferreira-mev Apr 19, 2022
37f84c6
refact: remove beta consumer version
eHattori Apr 19, 2022
8bd7111
Use Logging module in pipefy_message.rb
ferreira-mev Apr 19, 2022
0ed0fd6
Merge Hattori and Duda's changes (formatting logs, removing tests out…
ferreira-mev Apr 19, 2022
d2f2c5c
refact: logs and elapsed time to ensure
eHattori Apr 19, 2022
52b4d73
refact: remove JSON.parse
eHattori Apr 19, 2022
746f385
Refactor publisher to incorporate new centralized logger instance
ferreira-mev Apr 20, 2022
890bce8
Remove old logger
ferreira-mev Apr 25, 2022
7f38b09
refact: namespaces
eHattori Apr 25, 2022
155f288
refact: rename publisher class
eHattori Apr 25, 2022
a7a32b5
refact: move base_publisher to publisher
eHattori Apr 25, 2022
183b6ce
refact: create instance by inflection
eHattori Apr 25, 2022
91a4cc6
refact: fix "aws" and worker feature
eHattori Apr 25, 2022
6952468
refact: Move common AWS broker methods to superclass
ferreira-mev Apr 26, 2022
f8f5be6
refact: Document AWS broker class, move shared imports
ferreira-mev Apr 26, 2022
af41655
"fix": Refact SQS err to SNS err on SNS rescue
ferreira-mev Apr 26, 2022
b767f61
refact: Open AWS connection on superclass constructor
ferreira-mev Apr 26, 2022
a21292a
refact: Rewrite publisher logs in JSON format
ferreira-mev Apr 26, 2022
5309c6b
refact: configuration
eHattori Apr 26, 2022
e24bfc1
Documenting SNS broker (partial commit before pulling)
ferreira-mev Apr 26, 2022
f508676
Merge branch 'eh-main/feature/consumer' of github.com:pipefy/pipefy_m…
ferreira-mev Apr 26, 2022
9c8ceee
refact: Remove old AWS configuration class
ferreira-mev Apr 26, 2022
3e454cf
Move SQS, SNS README to AWS client dir
ferreira-mev Apr 26, 2022
fe59799
Handle invalid provider option
ferreira-mev Apr 26, 2022
54dec70
refact: Use described_class rather than hardcoded class name on RSpec…
ferreira-mev Apr 26, 2022
39263ce
refact: Specify which exceptions to handle (WIP - should be improved?)
ferreira-mev Apr 26, 2022
f34f2b0
refact: Separate contexts in worker tests
ferreira-mev Apr 26, 2022
55b1c56
Try to improve worker tests (keyword "try" :P)
ferreira-mev Apr 26, 2022
f654212
refact: Address Rubocop offenses
ferreira-mev Apr 27, 2022
e05ea12
refact: Incorporate review feedback into Logging module
ferreira-mev Apr 27, 2022
aad3ff4
refact: (post-review) Disable AbcSize, MethodLength globally
ferreira-mev Apr 27, 2022
597f7c6
feat: (post-review) Create module to autolog all custom errors
ferreira-mev Apr 27, 2022
b0ca1d7
refact (minor): Please Rubocop
ferreira-mev Apr 27, 2022
f856b3a
refact: (post-review) Move class_path hash to BrokerResolver class
ferreira-mev Apr 27, 2022
7317f1d
refact: Add... missing trailing newline. Yeah. :P
ferreira-mev Apr 27, 2022
788e307
refact: (post-review) Move SQS-specific poller wait time option to Sq…
ferreira-mev Apr 27, 2022
23cb031
refact (post-review): Remove logger methods made unnecessarily verbos…
ferreira-mev Apr 28, 2022
92dd3cc
refact: Update log message to reflect ms time unit
ferreira-mev Apr 28, 2022
7badbfe
refact: [WIP?] Improve separation of AWS-wide and service-specific co…
ferreira-mev Apr 28, 2022
9d08224
refact: Wait longer for localstack init (10 s wasn't enough on my mac…
ferreira-mev Apr 29, 2022
0da3c7f
refact: Add queue name to logs if not already present
ferreira-mev Apr 29, 2022
5298582
refact: Reword/break lines
ferreira-mev Apr 29, 2022
02d097d
refact: Expand AWS SQS, SNS CLI readme
ferreira-mev Apr 29, 2022
4faaf42
refact/fix: Alter test to reflect new default arg value
ferreira-mev Apr 29, 2022
9cc0163
refact: Address some minor issues from the PR review
ferreira-mev Apr 29, 2022
bd1af3a
Remove AwsBroker -> {Sns, Sqs}Broker inheritance
ferreira-mev May 2, 2022
2be266a
refact: Remove currently unused Broker class
ferreira-mev May 2, 2022
d63ca52
(forgot to include all files in the last commit, then pushed :P)
ferreira-mev May 2, 2022
c53ff12
feat/refact?: Allow setting broker options from Worker module and Pub…
ferreira-mev May 2, 2022
253dd05
Add and improve RSpec tests
ferreira-mev May 3, 2022
b2f8d9c
[WIP] Polish
jleber May 3, 2022
6716678
refact: Refactor tests to match lib folder refactor
ferreira-mev May 3, 2022
b59e4f9
Merge pull request #19 from pipefy/ef/refactor/remove-inheritance
ferreira-mev May 4, 2022
2809815
Update project documentation
jleber May 4, 2022
12b6293
Adjust Aws Client
jleber May 4, 2022
1860bcf
Fix for log separator
jleber May 4, 2022
67a2a42
Adjust consumer stuff
jleber May 4, 2022
ecacdb1
Adjust log message and add sample consumer class
jleber May 4, 2022
58f40d6
Linter adjustments
jleber May 4, 2022
6d0f63d
refact (minor): Nitpicky uniformization of || vs fetch
ferreira-mev May 4, 2022
dbe13de
Quick README review w/ minor grammar-related edits and clarifications
ferreira-mev May 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 20 additions & 37 deletions lib/pipefy_message/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ module Consumer
include PipefyMessage::Logging
include PipefyMessage::Providers::Errors
include PipefyMessage::Providers::BrokerResolver

##
# Default options for consumer setup.
def self.default_consumer_options
Expand Down Expand Up @@ -47,33 +48,15 @@ module ClassMethods
# Merges default worker options with the hash passed as
# an argument. The latter takes precedence.
def options(opts = {})
eHattori marked this conversation as resolved.
Show resolved Hide resolved
@options = Consumer.default_consumer_options.merge(opts)
@options.each do |k, v|
singleton_class.class_eval { attr_accessor k }
send("#{k}=", v)
end

logger.debug({
options_set: @options,
message_text: "Set #{name} options to options_set"
})
end

##
# Sets broker-specific options to be passed to the broker's
# constructor.
def broker_options(opts = {})
@broker_opts = opts
@consumer_options = Consumer.default_consumer_options.merge(opts)
end

##
# Initializes and returns an instance of a broker for
# the provider specified in the class options.
def build_consumer_instance
options if @broker.nil?
broker_options if @broker_opts.nil?
consumer_map = resolve_broker(@broker, "consumer")
consumer_map[:class_name].constantize.new(@broker_opts)
consumer_map = resolve_broker(@consumer_options[:broker], "consumer")
consumer_map[:class_name].constantize.new(@consumer_options)
eHattori marked this conversation as resolved.
Show resolved Hide resolved
end

##
Expand All @@ -85,27 +68,27 @@ def build_consumer_instance
def process_message
start = Time.now
obj = new
logger.info({
message_text: "Calling poller for #{@broker} object"
})

build_consumer_instance.poller do |message|
logger.info({ message_text: "Calling consumer poller" })

build_consumer_instance.poller do |payload|
logger.info({
message_text: "Message received by poller to be processed by consumer",
received_message: payload
})

obj.perform(payload["Message"])

elapsed_time = (Time.now - start) * 1000.0
logger.info({
message_text: "Message received by #{@broker} poller to be processed by consumer",
received_message: message
duration_ms: elapsed_time,
message_text: "Message received by consumer poller, processed " \
"in #{elapsed_time} milliseconds"
})
obj.perform(message)
end
rescue PipefyMessage::Providers::Errors::ResourceError => e # (any others?)
rescue PipefyMessage::Providers::Errors::ResourceError => e
logger.error("Failed to process message, details #{e.inspect}")
raise e
ensure
elapsed_time = (Time.now - start) * 1000.0
logger.info({
duration_ms: elapsed_time,
message_text: "Message received by #{@broker}" \
"poller processed by #{name} worker" \
"in #{elapsed_time} milliseconds"
})
end
end
end
Expand Down
5 changes: 4 additions & 1 deletion lib/pipefy_message/providers/aws_client/sqs_broker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,16 @@ def initialize(opts = {})
# Initiates SQS queue polling, with wait_time_seconds as given
# in the initial configuration.
def poller
logger.debug(build_log_hash("Initiating SQS polling on queue #{@config[:queue_name]}"))
logger.info(build_log_hash("Initiating SQS polling on queue #{@config[:queue_name]}"))

@poller.poll(wait_time_seconds: @config[:wait_time_seconds]) do |received_message|
logger.debug(build_log_hash("Message received by SQS poller on queue #{@config[:queue_name]}"))

payload = JSON.parse(received_message.body)
yield(payload)

rescue StandardError => e
raise PipefyMessage::Providers::Errors::ResourceError, e.message
end
end

Expand Down
24 changes: 14 additions & 10 deletions spec/consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def poller
end
end

class TestWorker
class TestConsumer
include PipefyMessage::Consumer
options broker: "aws", queue_name: "pipefy-local-queue"

Expand All @@ -20,49 +20,53 @@ def perform(message)
end

RSpec.describe PipefyMessage::Consumer do
before do
ENV["ENABLE_AWS_CLIENT_CONFIG"] = "true"
ENV["AWS_CLI_STUB_RESPONSE"] = "true"
end
describe "#perform" do
context "successful polling" do
it "should call #perform from child instance when #process_message is called" do
mock_broker = instance_double("MockBroker")
allow(mock_broker).to receive(:poller).with(no_args)

allow(TestWorker).to receive(:build_consumer_instance).and_return(mock_broker)
allow(TestConsumer).to receive(:build_consumer_instance).and_return(mock_broker)

TestWorker.process_message
TestConsumer.process_message
expect(mock_broker).to have_received(:poller)
end
end

context "polling failure" do
it "should call #perform from child instance when #process_message is called" do
allow(TestWorker).to receive(:build_consumer_instance).and_return(MockBrokerFail.new)
expect { TestWorker.process_message }.to raise_error(PipefyMessage::Providers::Errors::ResourceError)
allow(TestConsumer).to receive(:build_consumer_instance).and_return(MockBrokerFail.new)
expect { TestConsumer.process_message }.to raise_error(PipefyMessage::Providers::Errors::ResourceError)
end
end

it "should fail if called directly from the parent class" do
expect { TestWorker.perform("message") }.to raise_error NotImplementedError
expect { TestConsumer.perform("message") }.to raise_error NotImplementedError
end
end

describe "#options class" do
it "should set options in class" do
expect(TestWorker.broker).to eq "aws"
expect(TestConsumer.options[:broker]).to eq "aws"
end
end

describe "#build_instance_broker" do
context "invalid provider" do
before(:all) do
TestWorker.broker = "NaN"
TestConsumer.options[:broker] = "NaN"
end

after(:all) do
TestWorker.broker = "aws" # reverting
TestConsumer.options[:broker] = "aws" # reverting
end

it "should raise an error" do
expect { TestWorker.build_consumer_instance }.to raise_error PipefyMessage::Providers::Errors::InvalidOption
expect { TestConsumer.build_consumer_instance }.to raise_error PipefyMessage::Providers::Errors::InvalidOption
end
end

Expand Down