Skip to content

Commit

Permalink
Merge pull request #4138 from DataDog/appsec-55936-remove-reactive-op…
Browse files Browse the repository at this point in the history
…eration

[APPSEC-55936] Remove Reactive::Operation
  • Loading branch information
Strech authored Dec 12, 2024
2 parents 43498ff + 220c0a1 commit 3c0de7d
Show file tree
Hide file tree
Showing 32 changed files with 297 additions and 500 deletions.
30 changes: 14 additions & 16 deletions lib/datadog/appsec/contrib/graphql/gateway/watcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

require 'json'
require_relative '../../../instrumentation/gateway'
require_relative '../../../reactive/engine'
require_relative '../reactive/multiplex'
require_relative '../../../reactive/operation'

module Datadog
module AppSec
Expand All @@ -24,26 +24,24 @@ def watch_multiplex(gateway = Instrumentation.gateway)
gateway.watch('graphql.multiplex', :appsec) do |stack, gateway_multiplex|
block = false
event = nil

scope = AppSec::Scope.active_scope
engine = AppSec::Reactive::Engine.new

if scope
AppSec::Reactive::Operation.new('graphql.multiplex') do |op|
GraphQL::Reactive::Multiplex.subscribe(op, scope.processor_context) do |result|
event = {
waf_result: result,
trace: scope.trace,
span: scope.service_entry_span,
multiplex: gateway_multiplex,
actions: result.actions
}

Datadog::AppSec::Event.tag_and_keep!(scope, result)
scope.processor_context.events << event
end
GraphQL::Reactive::Multiplex.subscribe(engine, scope.processor_context) do |result|
event = {
waf_result: result,
trace: scope.trace,
span: scope.service_entry_span,
multiplex: gateway_multiplex,
actions: result.actions
}

block = GraphQL::Reactive::Multiplex.publish(op, gateway_multiplex)
Datadog::AppSec::Event.tag_and_keep!(scope, result)
scope.processor_context.events << event
end

block = GraphQL::Reactive::Multiplex.publish(engine, gateway_multiplex)
end

next [nil, [[:block, event]]] if block
Expand Down
8 changes: 4 additions & 4 deletions lib/datadog/appsec/contrib/graphql/reactive/multiplex.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@ module Multiplex
].freeze
private_constant :ADDRESSES

def self.publish(op, gateway_multiplex)
def self.publish(engine, gateway_multiplex)
catch(:block) do
op.publish('graphql.server.all_resolvers', gateway_multiplex.arguments)
engine.publish('graphql.server.all_resolvers', gateway_multiplex.arguments)

nil
end
end

def self.subscribe(op, waf_context)
op.subscribe(*ADDRESSES) do |*values|
def self.subscribe(engine, waf_context)
engine.subscribe(*ADDRESSES) do |*values|
Datadog.logger.debug { "reacted to #{ADDRESSES.inspect}: #{values.inspect}" }
arguments = values[0]

Expand Down
121 changes: 55 additions & 66 deletions lib/datadog/appsec/contrib/rack/gateway/watcher.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# frozen_string_literal: true

require_relative '../../../instrumentation/gateway'
require_relative '../../../reactive/operation'
require_relative '../../../reactive/engine'
require_relative '../reactive/request'
require_relative '../reactive/request_body'
require_relative '../reactive/response'
Expand All @@ -25,32 +25,29 @@ def watch

def watch_request(gateway = Instrumentation.gateway)
gateway.watch('rack.request', :appsec) do |stack, gateway_request|
block = false
event = nil
scope = gateway_request.env[Datadog::AppSec::Ext::SCOPE_KEY]

AppSec::Reactive::Operation.new('rack.request') do |op|
Rack::Reactive::Request.subscribe(op, scope.processor_context) do |result|
if result.status == :match
# TODO: should this hash be an Event instance instead?
event = {
waf_result: result,
trace: scope.trace,
span: scope.service_entry_span,
request: gateway_request,
actions: result.actions
}

# We want to keep the trace in case of security event
scope.trace.keep! if scope.trace
Datadog::AppSec::Event.tag_and_keep!(scope, result)
scope.processor_context.events << event
end
engine = AppSec::Reactive::Engine.new

Rack::Reactive::Request.subscribe(engine, scope.processor_context) do |result|
if result.status == :match
# TODO: should this hash be an Event instance instead?
event = {
waf_result: result,
trace: scope.trace,
span: scope.service_entry_span,
request: gateway_request,
actions: result.actions
}

# We want to keep the trace in case of security event
scope.trace.keep! if scope.trace
Datadog::AppSec::Event.tag_and_keep!(scope, result)
scope.processor_context.events << event
end

block = Rack::Reactive::Request.publish(op, gateway_request)
end

block = Rack::Reactive::Request.publish(engine, gateway_request)
next [nil, [[:block, event]]] if block

ret, res = stack.call(gateway_request.request)
Expand All @@ -66,33 +63,29 @@ def watch_request(gateway = Instrumentation.gateway)

def watch_response(gateway = Instrumentation.gateway)
gateway.watch('rack.response', :appsec) do |stack, gateway_response|
block = false

event = nil
scope = gateway_response.scope

AppSec::Reactive::Operation.new('rack.response') do |op|
Rack::Reactive::Response.subscribe(op, scope.processor_context) do |result|
if result.status == :match
# TODO: should this hash be an Event instance instead?
event = {
waf_result: result,
trace: scope.trace,
span: scope.service_entry_span,
response: gateway_response,
actions: result.actions
}

# We want to keep the trace in case of security event
scope.trace.keep! if scope.trace
Datadog::AppSec::Event.tag_and_keep!(scope, result)
scope.processor_context.events << event
end
engine = AppSec::Reactive::Engine.new

Rack::Reactive::Response.subscribe(engine, scope.processor_context) do |result|
if result.status == :match
# TODO: should this hash be an Event instance instead?
event = {
waf_result: result,
trace: scope.trace,
span: scope.service_entry_span,
response: gateway_response,
actions: result.actions
}

# We want to keep the trace in case of security event
scope.trace.keep! if scope.trace
Datadog::AppSec::Event.tag_and_keep!(scope, result)
scope.processor_context.events << event
end

block = Rack::Reactive::Response.publish(op, gateway_response)
end

block = Rack::Reactive::Response.publish(engine, gateway_response)
next [nil, [[:block, event]]] if block

ret, res = stack.call(gateway_response.response)
Expand All @@ -108,33 +101,29 @@ def watch_response(gateway = Instrumentation.gateway)

def watch_request_body(gateway = Instrumentation.gateway)
gateway.watch('rack.request.body', :appsec) do |stack, gateway_request|
block = false

event = nil
scope = gateway_request.env[Datadog::AppSec::Ext::SCOPE_KEY]

AppSec::Reactive::Operation.new('rack.request.body') do |op|
Rack::Reactive::RequestBody.subscribe(op, scope.processor_context) do |result|
if result.status == :match
# TODO: should this hash be an Event instance instead?
event = {
waf_result: result,
trace: scope.trace,
span: scope.service_entry_span,
request: gateway_request,
actions: result.actions
}

# We want to keep the trace in case of security event
scope.trace.keep! if scope.trace
Datadog::AppSec::Event.tag_and_keep!(scope, result)
scope.processor_context.events << event
end
engine = AppSec::Reactive::Engine.new

Rack::Reactive::RequestBody.subscribe(engine, scope.processor_context) do |result|
if result.status == :match
# TODO: should this hash be an Event instance instead?
event = {
waf_result: result,
trace: scope.trace,
span: scope.service_entry_span,
request: gateway_request,
actions: result.actions
}

# We want to keep the trace in case of security event
scope.trace.keep! if scope.trace
Datadog::AppSec::Event.tag_and_keep!(scope, result)
scope.processor_context.events << event
end

block = Rack::Reactive::RequestBody.publish(op, gateway_request)
end

block = Rack::Reactive::RequestBody.publish(engine, gateway_request)
next [nil, [[:block, event]]] if block

ret, res = stack.call(gateway_request.request)
Expand Down
18 changes: 9 additions & 9 deletions lib/datadog/appsec/contrib/rack/reactive/request.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,21 @@ module Request
].freeze
private_constant :ADDRESSES

def self.publish(op, gateway_request)
def self.publish(engine, gateway_request)
catch(:block) do
op.publish('request.query', gateway_request.query)
op.publish('request.headers', gateway_request.headers)
op.publish('request.uri.raw', gateway_request.fullpath)
op.publish('request.cookies', gateway_request.cookies)
op.publish('request.client_ip', gateway_request.client_ip)
op.publish('server.request.method', gateway_request.method)
engine.publish('request.query', gateway_request.query)
engine.publish('request.headers', gateway_request.headers)
engine.publish('request.uri.raw', gateway_request.fullpath)
engine.publish('request.cookies', gateway_request.cookies)
engine.publish('request.client_ip', gateway_request.client_ip)
engine.publish('server.request.method', gateway_request.method)

nil
end
end

def self.subscribe(op, waf_context)
op.subscribe(*ADDRESSES) do |*values|
def self.subscribe(engine, waf_context)
engine.subscribe(*ADDRESSES) do |*values|
Datadog.logger.debug { "reacted to #{ADDRESSES.inspect}: #{values.inspect}" }

headers = values[0]
Expand Down
8 changes: 4 additions & 4 deletions lib/datadog/appsec/contrib/rack/reactive/request_body.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@ module RequestBody
].freeze
private_constant :ADDRESSES

def self.publish(op, gateway_request)
def self.publish(engine, gateway_request)
catch(:block) do
# params have been parsed from the request body
op.publish('request.body', gateway_request.form_hash)
engine.publish('request.body', gateway_request.form_hash)

nil
end
end

def self.subscribe(op, waf_context)
op.subscribe(*ADDRESSES) do |*values|
def self.subscribe(engine, waf_context)
engine.subscribe(*ADDRESSES) do |*values|
Datadog.logger.debug { "reacted to #{ADDRESSES.inspect}: #{values.inspect}" }
body = values[0]

Expand Down
10 changes: 5 additions & 5 deletions lib/datadog/appsec/contrib/rack/reactive/response.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@ module Response
].freeze
private_constant :ADDRESSES

def self.publish(op, gateway_response)
def self.publish(engine, gateway_response)
catch(:block) do
op.publish('response.status', gateway_response.status)
op.publish('response.headers', gateway_response.headers)
engine.publish('response.status', gateway_response.status)
engine.publish('response.headers', gateway_response.headers)

nil
end
end

def self.subscribe(op, waf_context)
op.subscribe(*ADDRESSES) do |*values|
def self.subscribe(engine, waf_context)
engine.subscribe(*ADDRESSES) do |*values|
Datadog.logger.debug { "reacted to #{ADDRESSES.inspect}: #{values.inspect}" }

response_status = values[0]
Expand Down
38 changes: 17 additions & 21 deletions lib/datadog/appsec/contrib/rails/gateway/watcher.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# frozen_string_literal: true

require_relative '../../../instrumentation/gateway'
require_relative '../../../reactive/operation'
require_relative '../../../reactive/engine'
require_relative '../reactive/action'
require_relative '../../../event'

Expand All @@ -21,33 +21,29 @@ def watch

def watch_request_action(gateway = Instrumentation.gateway)
gateway.watch('rails.request.action', :appsec) do |stack, gateway_request|
block = false

event = nil
scope = gateway_request.env[Datadog::AppSec::Ext::SCOPE_KEY]
engine = AppSec::Reactive::Engine.new

AppSec::Reactive::Operation.new('rails.request.action') do |op|
Rails::Reactive::Action.subscribe(op, scope.processor_context) do |result|
if result.status == :match
# TODO: should this hash be an Event instance instead?
event = {
waf_result: result,
trace: scope.trace,
span: scope.service_entry_span,
request: gateway_request,
actions: result.actions
}
Rails::Reactive::Action.subscribe(engine, scope.processor_context) do |result|
if result.status == :match
# TODO: should this hash be an Event instance instead?
event = {
waf_result: result,
trace: scope.trace,
span: scope.service_entry_span,
request: gateway_request,
actions: result.actions
}

# We want to keep the trace in case of security event
scope.trace.keep! if scope.trace
Datadog::AppSec::Event.tag_and_keep!(scope, result)
scope.processor_context.events << event
end
# We want to keep the trace in case of security event
scope.trace.keep! if scope.trace
Datadog::AppSec::Event.tag_and_keep!(scope, result)
scope.processor_context.events << event
end

block = Rails::Reactive::Action.publish(op, gateway_request)
end

block = Rails::Reactive::Action.publish(engine, gateway_request)
next [nil, [[:block, event]]] if block

ret, res = stack.call(gateway_request.request)
Expand Down
10 changes: 5 additions & 5 deletions lib/datadog/appsec/contrib/rails/reactive/action.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@ module Action
].freeze
private_constant :ADDRESSES

def self.publish(op, gateway_request)
def self.publish(engine, gateway_request)
catch(:block) do
# params have been parsed from the request body
op.publish('rails.request.body', gateway_request.parsed_body)
op.publish('rails.request.route_params', gateway_request.route_params)
engine.publish('rails.request.body', gateway_request.parsed_body)
engine.publish('rails.request.route_params', gateway_request.route_params)

nil
end
end

def self.subscribe(op, waf_context)
op.subscribe(*ADDRESSES) do |*values|
def self.subscribe(engine, waf_context)
engine.subscribe(*ADDRESSES) do |*values|
Datadog.logger.debug { "reacted to #{ADDRESSES.inspect}: #{values.inspect}" }
body = values[0]
path_params = values[1]
Expand Down
Loading

0 comments on commit 3c0de7d

Please sign in to comment.