Skip to content

Commit

Permalink
internal/appsec: refactor simple WAF run operations (#2776)
Browse files Browse the repository at this point in the history
Signed-off-by: Eliott Bouhana <[email protected]>
  • Loading branch information
eliottness authored Jul 10, 2024
1 parent 1a13250 commit 457de24
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 84 deletions.
4 changes: 2 additions & 2 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ stages:
- test-apps

variables:
# This base image is created here: https://gitlab.ddbuild.io/DataDog/apm-reliability/benchmarking-platform/-/pipelines/30723596
BASE_CI_IMAGE: 486234852809.dkr.ecr.us-east-1.amazonaws.com/ci/benchmarking-platform:dd-trace-go-30723596
# This base image is created here: https://gitlab.ddbuild.io/DataDog/apm-reliability/benchmarking-platform/-/pipelines/38806487
BASE_CI_IMAGE: 486234852809.dkr.ecr.us-east-1.amazonaws.com/ci/benchmarking-platform:dd-trace-go-38806487
INDEX_FILE: index.txt
KUBERNETES_SERVICE_ACCOUNT_OVERWRITE: dd-trace-go
FF_USE_LEGACY_KUBERNETES_EXECUTION_STRATEGY: "true"
Expand Down
2 changes: 1 addition & 1 deletion internal/appsec/listener/graphqlsec/graphql.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func Install(wafHandle *waf.Handle, cfg *config.Config, lim limiter.Limiter, roo
type wafEventListener struct {
wafHandle *waf.Handle
config *config.Config
addresses map[string]struct{}
addresses listener.AddressSet
limiter limiter.Limiter
wafDiags waf.Diagnostics
once sync.Once
Expand Down
25 changes: 10 additions & 15 deletions internal/appsec/listener/grpcsec/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/listener"
"gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/listener/httpsec"
shared "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/listener/sharedsec"
"gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/listener/sqlsec"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
"gopkg.in/DataDog/dd-trace-go.v1/internal/samplernames"

Expand Down Expand Up @@ -53,7 +54,7 @@ func Install(wafHandle *waf.Handle, cfg *config.Config, lim limiter.Limiter, roo
type wafEventListener struct {
wafHandle *waf.Handle
config *config.Config
addresses map[string]struct{}
addresses listener.AddressSet
limiter limiter.Limiter
wafDiags waf.Diagnostics
once sync.Once
Expand Down Expand Up @@ -112,28 +113,22 @@ func (l *wafEventListener) onEvent(op *types.HandlerOperation, handlerArgs types
return
}

if _, ok := l.addresses[httpsec.ServerIoNetURLAddr]; ok {
if l.isSecAddressListened(httpsec.ServerIoNetURLAddr) {
httpsec.RegisterRoundTripperListener(op, &op.SecurityEventsHolder, wafCtx, l.limiter)
}

if sqlsec.SQLAddressesPresent(l.addresses) {
sqlsec.RegisterSQLListener(op, &op.SecurityEventsHolder, wafCtx, l.limiter)
}

// Listen to the UserID address if the WAF rules are using it
if l.isSecAddressListened(httpsec.UserIDAddr) {
// UserIDOperation happens when appsec.SetUser() is called. We run the WAF and apply actions to
// see if the associated user should be blocked. Since we don't control the execution flow in this case
// (SetUser is SDK), we delegate the responsibility of interrupting the handler to the user.
dyngo.On(op, func(op *sharedsec.UserIDOperation, args sharedsec.UserIDOperationArgs) {
values := map[string]any{
httpsec.UserIDAddr: args.UserID,
}
wafResult := shared.RunWAF(wafCtx, waf.RunAddressData{Persistent: values})
if wafResult.HasEvents() {
addEvents(wafResult.Events)
log.Debug("appsec: WAF detected an authenticated user attack: %s", args.UserID)
}
if wafResult.HasActions() {
shared.ProcessActions(op, wafResult.Actions)
}
})
dyngo.On(op, shared.MakeWAFRunListener(&op.SecurityEventsHolder, wafCtx, l.limiter, func(args sharedsec.UserIDOperationArgs) waf.RunAddressData {
return waf.RunAddressData{Persistent: map[string]any{httpsec.UserIDAddr: args.UserID}}
}))
}

values := make(map[string]any, 2) // 2 because the method and client ip addresses are commonly present in the rules
Expand Down
57 changes: 18 additions & 39 deletions internal/appsec/listener/httpsec/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func Install(wafHandle *waf.Handle, cfg *config.Config, lim limiter.Limiter, roo
type wafEventListener struct {
wafHandle *waf.Handle
config *config.Config
addresses map[string]struct{}
addresses listener.AddressSet
limiter limiter.Limiter
wafDiags waf.Diagnostics
once sync.Once
Expand Down Expand Up @@ -98,14 +98,6 @@ func newWafEventListener(wafHandle *waf.Handle, cfg *config.Config, limiter limi
}
}

func sqlAddressesPresent(addresses map[string]struct{}) bool {
_, queryAddr := addresses[sqlsec.ServerDBStatementAddr]
_, driverAddr := addresses[sqlsec.ServerDBTypeAddr]

return queryAddr || driverAddr

}

func (l *wafEventListener) onEvent(op *types.Operation, args types.HandlerOperationArgs) {
wafCtx, err := l.wafHandle.NewContextWithBudget(l.config.WAFTimeout)
if err != nil {
Expand All @@ -120,25 +112,22 @@ func (l *wafEventListener) onEvent(op *types.Operation, args types.HandlerOperat
}

if _, ok := l.addresses[ServerIoNetURLAddr]; ok {
RegisterRoundTripperListener(op, &op.SecurityEventsHolder, wafCtx, l.limiter)
dyngo.On(op, shared.MakeWAFRunListener(&op.SecurityEventsHolder, wafCtx, l.limiter, func(args types.RoundTripOperationArgs) waf.RunAddressData {
return waf.RunAddressData{Ephemeral: map[string]any{ServerIoNetURLAddr: args.URL}}
}))
}

if sqlAddressesPresent(l.addresses) {
if sqlsec.SQLAddressesPresent(l.addresses) {
sqlsec.RegisterSQLListener(op, &op.SecurityEventsHolder, wafCtx, l.limiter)
}

if _, ok := l.addresses[UserIDAddr]; ok {
// OnUserIDOperationStart happens when appsec.SetUser() is called. We run the WAF and apply actions to
// see if the associated user should be blocked. Since we don't control the execution flow in this case
// (SetUser is SDK), we delegate the responsibility of interrupting the handler to the user.
dyngo.On(op, func(operation *sharedsec.UserIDOperation, args sharedsec.UserIDOperationArgs) {
wafResult := shared.RunWAF(wafCtx, waf.RunAddressData{Persistent: map[string]any{UserIDAddr: args.UserID}})
if wafResult.HasActions() || wafResult.HasEvents() {
shared.ProcessActions(operation, wafResult.Actions)
shared.AddSecurityEvents(&op.SecurityEventsHolder, l.limiter, wafResult.Events)
log.Debug("appsec: WAF detected a suspicious user: %s", args.UserID)
}
})
dyngo.On(op, shared.MakeWAFRunListener(&op.SecurityEventsHolder, wafCtx, l.limiter, func(args sharedsec.UserIDOperationArgs) waf.RunAddressData {
return waf.RunAddressData{Persistent: map[string]any{UserIDAddr: args.UserID}}
}))
}

values := make(map[string]any, 8)
Expand Down Expand Up @@ -170,16 +159,8 @@ func (l *wafEventListener) onEvent(op *types.Operation, args types.HandlerOperat
}
}
}
if l.canExtractSchemas() {
// This address will be passed as persistent. The WAF will keep it in store and trigger schema extraction
// for each run.
values["waf.context.processor"] = map[string]any{"extract-schema": true}
}

wafResult := shared.RunWAF(wafCtx, waf.RunAddressData{Persistent: values})
for tag, value := range wafResult.Derivatives {
op.AddSerializableTag(tag, value)
}
if wafResult.HasActions() || wafResult.HasEvents() {
interrupt := shared.ProcessActions(op, wafResult.Actions)
shared.AddSecurityEvents(&op.SecurityEventsHolder, l.limiter, wafResult.Events)
Expand All @@ -191,23 +172,21 @@ func (l *wafEventListener) onEvent(op *types.Operation, args types.HandlerOperat
}

if _, ok := l.addresses[ServerRequestBodyAddr]; ok {
dyngo.On(op, func(sdkBodyOp *types.SDKBodyOperation, args types.SDKBodyOperationArgs) {
wafResult := shared.RunWAF(wafCtx, waf.RunAddressData{Persistent: map[string]any{ServerRequestBodyAddr: args.Body}})
for tag, value := range wafResult.Derivatives {
op.AddSerializableTag(tag, value)
}
if wafResult.HasActions() || wafResult.HasEvents() {
shared.ProcessActions(sdkBodyOp, wafResult.Actions)
shared.AddSecurityEvents(&op.SecurityEventsHolder, l.limiter, wafResult.Events)
log.Debug("appsec: WAF detected a suspicious request body")
}
})
dyngo.On(op, shared.MakeWAFRunListener(&op.SecurityEventsHolder, wafCtx, l.limiter, func(args types.SDKBodyOperationArgs) waf.RunAddressData {
return waf.RunAddressData{Persistent: map[string]any{ServerRequestBodyAddr: args.Body}}
}))
}

dyngo.OnFinish(op, func(op *types.Operation, res types.HandlerOperationRes) {
defer wafCtx.Close()

values = make(map[string]any, 2)
values = make(map[string]any, 3)
if l.canExtractSchemas() {
// This address will be passed as persistent. The WAF will keep it in store and trigger schema extraction
// for each run.
values["waf.context.processor"] = map[string]any{"extract-schema": true}
}

if _, ok := l.addresses[ServerResponseStatusAddr]; ok {
// serverResponseStatusAddr is a string address, so we must format the status code...
values[ServerResponseStatusAddr] = fmt.Sprintf("%d", res.Status)
Expand Down
15 changes: 3 additions & 12 deletions internal/appsec/listener/httpsec/roundtripper.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,14 @@ import (
"gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/emitter/httpsec/types"
"gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/listener/sharedsec"
"gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/trace"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"

"github.com/DataDog/appsec-internal-go/limiter"
"github.com/DataDog/go-libddwaf/v3"
)

// RegisterRoundTripperListener registers a listener on outgoing HTTP client requests to run the WAF.
func RegisterRoundTripperListener(op dyngo.Operation, events *trace.SecurityEventsHolder, wafCtx *waf.Context, limiter limiter.Limiter) {
dyngo.On(op, func(op *types.RoundTripOperation, args types.RoundTripOperationArgs) {
wafResult := sharedsec.RunWAF(wafCtx, waf.RunAddressData{Ephemeral: map[string]any{ServerIoNetURLAddr: args.URL}})
if !wafResult.HasEvents() {
return
}

log.Debug("appsec: WAF detected a suspicious outgoing request URL: %s", args.URL)

sharedsec.ProcessActions(op, wafResult.Actions)
sharedsec.AddSecurityEvents(events, limiter, wafResult.Events)
})
dyngo.On(op, sharedsec.MakeWAFRunListener(events, wafCtx, limiter, func(args types.RoundTripOperationArgs) waf.RunAddressData {
return waf.RunAddressData{Ephemeral: map[string]any{ServerIoNetURLAddr: args.URL}}
}))
}
19 changes: 19 additions & 0 deletions internal/appsec/listener/sharedsec/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,25 @@ func RunWAF(wafCtx *waf.Context, values waf.RunAddressData) waf.Result {
return result
}

func MakeWAFRunListener[O dyngo.Operation, T dyngo.ArgOf[O]](
events *trace.SecurityEventsHolder,
wafCtx *waf.Context,
limiter limiter.Limiter,
toRunAddressData func(T) waf.RunAddressData,
) func(O, T) {
return func(op O, args T) {
wafResult := RunWAF(wafCtx, toRunAddressData(args))
if !wafResult.HasEvents() {
return
}

log.Debug("appsec: WAF detected a suspicious WAF event")

ProcessActions(op, wafResult.Actions)
AddSecurityEvents(events, limiter, wafResult.Events)
}
}

// AddSecurityEvents is a helper function to add sec events to an operation taking into account the rate limiter.
func AddSecurityEvents(holder *trace.SecurityEventsHolder, limiter limiter.Limiter, matches []any) {
if len(matches) > 0 && limiter.Allow() {
Expand Down
27 changes: 12 additions & 15 deletions internal/appsec/listener/sqlsec/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ package sqlsec
import (
"gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/dyngo"
"gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/emitter/sqlsec/types"
"gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/listener"
"gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/listener/sharedsec"
"gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/trace"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"

"github.com/DataDog/appsec-internal-go/limiter"
waf "github.com/DataDog/go-libddwaf/v3"
Expand All @@ -22,18 +22,15 @@ const (
)

func RegisterSQLListener(op dyngo.Operation, events *trace.SecurityEventsHolder, wafCtx *waf.Context, limiter limiter.Limiter) {
dyngo.On(op, func(op *types.SQLOperation, args types.SQLOperationArgs) {
wafResult := sharedsec.RunWAF(wafCtx, waf.RunAddressData{Ephemeral: map[string]any{
ServerDBStatementAddr: args.Query,
ServerDBTypeAddr: args.Driver,
}})
if !wafResult.HasEvents() {
return
}

log.Debug("appsec: WAF detected a suspicious SQL operation")

sharedsec.ProcessActions(op, wafResult.Actions)
sharedsec.AddSecurityEvents(events, limiter, wafResult.Events)
})
dyngo.On(op, sharedsec.MakeWAFRunListener(events, wafCtx, limiter, func(args types.SQLOperationArgs) waf.RunAddressData {
return waf.RunAddressData{Ephemeral: map[string]any{ServerDBStatementAddr: args.Query, ServerDBTypeAddr: args.Driver}}
}))
}

func SQLAddressesPresent(addresses listener.AddressSet) bool {
_, queryAddr := addresses[ServerDBStatementAddr]
_, driverAddr := addresses[ServerDBTypeAddr]

return queryAddr || driverAddr

}

0 comments on commit 457de24

Please sign in to comment.