-
Notifications
You must be signed in to change notification settings - Fork 25
/
plugin.go
816 lines (710 loc) · 26.6 KB
/
plugin.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
// Copyright The OWASP Coraza contributors
// SPDX-License-Identifier: Apache-2.0
package wasmplugin
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"math"
"net"
"net/http"
"strconv"
"strings"
"github.com/corazawaf/coraza/v3"
"github.com/corazawaf/coraza/v3/debuglog"
ctypes "github.com/corazawaf/coraza/v3/types"
"github.com/tetratelabs/proxy-wasm-go-sdk/proxywasm"
"github.com/tetratelabs/proxy-wasm-go-sdk/proxywasm/types"
)
type vmContext struct {
// Embed the default VM context here,
// so that we don't need to reimplement all the methods.
types.DefaultVMContext
}
func NewVMContext() types.VMContext {
return &vmContext{}
}
func (*vmContext) NewPluginContext(contextID uint32) types.PluginContext {
return &corazaPlugin{}
}
type wafMap struct {
kv map[string]coraza.WAF
defaultWAF coraza.WAF
}
func newWAFMap(capacity int) wafMap {
return wafMap{
kv: make(map[string]coraza.WAF, capacity),
}
}
func (m *wafMap) put(key string, waf coraza.WAF) error {
if len(key) == 0 {
return errors.New("empty WAF key")
}
m.kv[key] = waf
return nil
}
func (m *wafMap) setDefaultWAF(w coraza.WAF) {
if w == nil {
panic("nil WAF set as default")
}
m.defaultWAF = w
}
func (m *wafMap) getWAFOrDefault(key string) (coraza.WAF, bool, error) {
if w, ok := m.kv[key]; ok {
return w, false, nil
}
if m.defaultWAF == nil {
return nil, false, errors.New("no default WAF")
}
return m.defaultWAF, true, nil
}
type corazaPlugin struct {
// Embed the default plugin context here,
// so that we don't need to reimplement all the methods.
types.DefaultPluginContext
perAuthorityWAFs wafMap
metricLabelsKV []string
metrics *wafMetrics
}
func (ctx *corazaPlugin) OnPluginStart(pluginConfigurationSize int) types.OnPluginStartStatus {
data, err := proxywasm.GetPluginConfiguration()
if err != nil && err != types.ErrorStatusNotFound {
proxywasm.LogCriticalf("Failed to read plugin configuration: %v", err)
return types.OnPluginStartStatusFailed
}
config, err := parsePluginConfiguration(data, proxywasm.LogInfo)
if err != nil {
proxywasm.LogCriticalf("Failed to parse plugin configuration: %v", err)
return types.OnPluginStartStatusFailed
}
// directivesAuthoritesMap is a map of directives name to the list of
// authorities that reference those directives. This is used to
// initialize the WAFs only for the directives that are referenced
directivesAuthoritiesMap := map[string][]string{}
for authority, directivesName := range config.perAuthorityDirectives {
directivesAuthoritiesMap[directivesName] = append(directivesAuthoritiesMap[directivesName], authority)
}
perAuthorityWAFs := newWAFMap(len(config.directivesMap))
for name, directives := range config.directivesMap {
var authorities []string
// if the name of the directives is the default directives, we
// initialize the WAF despite the fact that it is not associated
// to any authority. This is because we need to initialize the
// default WAF for requests that don't belong to any authority.
if name != config.defaultDirectives {
var directivesFound bool
authorities, directivesFound = directivesAuthoritiesMap[name]
if !directivesFound {
// if no directives found as key, no authority references
// these directives and hence we won't initialize them as
// it will be a waste of resources.
continue
}
}
// First we initialize our waf and our seclang parser
conf := coraza.NewWAFConfig().
WithErrorCallback(logError).
WithDebugLogger(debuglog.DefaultWithPrinterFactory(logPrinterFactory)).
// TODO(anuraaga): Make this configurable in plugin configuration.
// WithRequestBodyLimit(1024 * 1024 * 1024).
// WithRequestBodyInMemoryLimit(1024 * 1024 * 1024).
// Limit equal to MemoryLimit: TinyGo compilation will prevent
// buffering request body to files anyways.
WithRootFS(root)
waf, err := coraza.NewWAF(conf.WithDirectives(strings.Join(directives, "\n")))
if err != nil {
proxywasm.LogCriticalf("Failed to parse directives: %v", err)
return types.OnPluginStartStatusFailed
}
if len(authorities) == 0 {
// if no authorities are associated directly with this WAF
// but we still initialize it, it means this is the default
// one.
perAuthorityWAFs.setDefaultWAF(waf)
}
for _, authority := range authorities {
err = perAuthorityWAFs.put(authority, waf)
if err != nil {
proxywasm.LogCriticalf("Failed to register authority WAF: %v", err)
return types.OnPluginStartStatusFailed
}
}
delete(directivesAuthoritiesMap, name)
}
if len(directivesAuthoritiesMap) > 0 {
// if there are directives remaining in the directivesAuthoritiesMap, means
// those directives weren't part of the directivesMap and hence not declared.
for unknownDirective := range directivesAuthoritiesMap {
proxywasm.LogCriticalf("Unknown directives %q", unknownDirective)
}
return types.OnPluginStartStatusFailed
}
ctx.perAuthorityWAFs = perAuthorityWAFs
for k, v := range config.metricLabels {
ctx.metricLabelsKV = append(ctx.metricLabelsKV, k, v)
}
ctx.metrics = NewWAFMetrics()
return types.OnPluginStartStatusOK
}
func (ctx *corazaPlugin) NewHttpContext(contextID uint32) types.HttpContext {
return &httpContext{
contextID: contextID,
metrics: ctx.metrics,
metricLabelsKV: ctx.metricLabelsKV,
perAuthorityWAFs: ctx.perAuthorityWAFs,
}
}
type interruptionPhase int8
func (p interruptionPhase) isInterrupted() bool {
return p != interruptionPhaseNone
}
func (p interruptionPhase) String() string {
switch p {
case interruptionPhaseHttpRequestHeaders:
return "http_request_headers"
case interruptionPhaseHttpRequestBody:
return "http_request_body"
case interruptionPhaseHttpResponseHeaders:
return "http_response_headers"
case interruptionPhaseHttpResponseBody:
return "http_response_body"
default:
return "no interruption yet"
}
}
const (
interruptionPhaseNone = iota
interruptionPhaseHttpRequestHeaders = iota
interruptionPhaseHttpRequestBody = iota
interruptionPhaseHttpResponseHeaders = iota
interruptionPhaseHttpResponseBody = iota
)
type httpContext struct {
// Embed the default http context here,
// so that we don't need to reimplement all the methods.
types.DefaultHttpContext
contextID uint32
perAuthorityWAFs wafMap
tx ctypes.Transaction
httpProtocol string
processedRequestBody bool
processedResponseBody bool
bodyReadIndex int
metrics *wafMetrics
interruptedAt interruptionPhase
logger debuglog.Logger
metricLabelsKV []string
}
func (ctx *httpContext) OnHttpRequestHeaders(numHeaders int, endOfStream bool) types.Action {
defer logTime("OnHttpRequestHeaders", currentTime())
ctx.metrics.CountTX()
authority, err := proxywasm.GetHttpRequestHeader(":authority")
if err != nil {
proxywasm.LogDebugf("Failed to get the :authority pseudo-header: %v", err)
propHostRaw, propHostErr := proxywasm.GetProperty([]string{"request", "host"})
if propHostErr != nil {
proxywasm.LogWarnf("Failed to get the :authority pseudo-header or property of host of the request: %v", propHostErr)
return types.ActionContinue
}
authority = string(propHostRaw)
}
if waf, isDefault, resolveWAFErr := ctx.perAuthorityWAFs.getWAFOrDefault(authority); resolveWAFErr == nil {
ctx.tx = waf.NewTransaction()
logFields := []debuglog.ContextField{debuglog.Uint("context_id", uint(ctx.contextID))}
if !isDefault {
logFields = append(logFields, debuglog.Str("authority", authority))
}
ctx.logger = ctx.tx.DebugLogger().With(logFields...)
// CRS rules tend to expect Host even with HTTP/2
ctx.tx.AddRequestHeader("Host", authority)
ctx.tx.SetServerName(parseServerName(ctx.logger, authority))
if !isDefault {
ctx.metricLabelsKV = append(ctx.metricLabelsKV, "authority", authority)
}
} else {
proxywasm.LogWarnf("Failed to resolve WAF for authority %q: %v", authority, resolveWAFErr)
return types.ActionContinue
}
tx := ctx.tx
// This currently relies on Envoy's behavior of mapping all requests to HTTP/2 semantics
// and its request properties, but they may not be true of other proxies implementing
// proxy-wasm.
if tx.IsRuleEngineOff() {
return types.ActionContinue
}
// OnHttpRequestHeaders does not terminate if IP/Port retrieve goes wrong
srcIP, srcPort := retrieveAddressInfo(ctx.logger, "source")
dstIP, dstPort := retrieveAddressInfo(ctx.logger, "destination")
tx.ProcessConnection(srcIP, srcPort, dstIP, dstPort)
method, err := proxywasm.GetHttpRequestHeader(":method")
if err != nil {
ctx.logger.Error().
Err(err).
Msg("Failed to get :method")
propMethodRaw, propMethodErr := proxywasm.GetProperty([]string{"request", "method"})
if propMethodErr != nil {
ctx.logger.Error().
Err(propMethodErr).
Msg("Failed to get property of method of the request")
return types.ActionContinue
}
method = string(propMethodRaw)
}
uri := ""
if method == http.MethodConnect { // CONNECT requests does not have a path, see https://httpwg.org/specs/rfc9110#CONNECT
// Populate uri with authority to build a proper request line
uri = authority
} else {
// Note the pseudo-header :path includes the query.
// See https://httpwg.org/specs/rfc9113.html#rfc.section.8.3.1
uri, err = proxywasm.GetHttpRequestHeader(":path")
if err != nil {
ctx.logger.Error().
Err(err).
Msg("Failed to get :path")
propPathRaw, propPathErr := proxywasm.GetProperty([]string{"request", "path"})
if propPathErr != nil {
ctx.logger.Error().
Err(propPathErr).
Msg("Failed to get property of path of the request")
return types.ActionContinue
}
uri = string(propPathRaw)
}
}
protocol, err := proxywasm.GetProperty([]string{"request", "protocol"})
if err != nil {
// TODO(anuraaga): HTTP protocol is commonly required in WAF rules, we should probably
// fail fast here, but proxytest does not support properties yet.
protocol = []byte("HTTP/2.0")
}
ctx.httpProtocol = string(protocol)
tx.ProcessURI(uri, method, ctx.httpProtocol)
hs, err := proxywasm.GetHttpRequestHeaders()
if err != nil {
ctx.logger.Error().Err(err).Msg("Failed to get request headers")
return types.ActionContinue
}
for _, h := range hs {
tx.AddRequestHeader(h[0], h[1])
}
interruption := tx.ProcessRequestHeaders()
if interruption != nil {
return ctx.handleInterruption(interruptionPhaseHttpRequestHeaders, interruption)
}
return types.ActionContinue
}
func (ctx *httpContext) OnHttpRequestBody(bodySize int, endOfStream bool) types.Action {
defer logTime("OnHttpRequestBody", currentTime())
if ctx.interruptedAt.isInterrupted() {
ctx.logger.Error().
Str("interruption_handled_phase", ctx.interruptedAt.String()).
Msg("Interruption already handled")
return types.ActionPause
}
if ctx.processedRequestBody {
return types.ActionContinue
}
if ctx.tx == nil {
return types.ActionContinue
}
tx := ctx.tx
if tx.IsRuleEngineOff() {
return types.ActionContinue
}
// Do not perform any action related to request body data if SecRequestBodyAccess is set to false
if !tx.IsRequestBodyAccessible() {
ctx.logger.Debug().Msg("Skipping request body inspection, SecRequestBodyAccess is off.")
// ProcessRequestBody is still performed for phase 2 rules, checking already populated variables
ctx.processedRequestBody = true
interruption, err := tx.ProcessRequestBody()
if err != nil {
ctx.logger.Error().Err(err).Msg("Failed to process request body")
return types.ActionContinue
}
if interruption != nil {
return ctx.handleInterruption(interruptionPhaseHttpRequestBody, interruption)
}
return types.ActionContinue
}
// bodySize is the size of the whole body received so far, not the size of the current chunk
chunkSize := bodySize - ctx.bodyReadIndex
// OnHttpRequestBody might be called more than once with the same data, we check if there is new data available to be read
if chunkSize > 0 {
bodyChunk, err := proxywasm.GetHttpRequestBody(ctx.bodyReadIndex, chunkSize)
if err != nil {
ctx.logger.Error().Err(err).
Int("body_size", bodySize).
Int("body_read_index", ctx.bodyReadIndex).
Int("chunk_size", chunkSize).
Msg("Failed to read request body")
return types.ActionContinue
}
readchunkSize := len(bodyChunk)
if readchunkSize != chunkSize {
ctx.logger.Warn().Int("read_chunk_size", readchunkSize).Int("chunk_size", chunkSize).Msg("Request chunk size read is different from the computed one")
}
interruption, writtenBytes, err := tx.WriteRequestBody(bodyChunk)
if err != nil {
ctx.logger.Error().Err(err).Msg("Failed to write request body")
return types.ActionContinue
}
if interruption != nil {
return ctx.handleInterruption(interruptionPhaseHttpRequestBody, interruption)
}
// If not the whole chunk has been written, it implicitly means that we reached the waf request body limit.
// Internally ProcessRequestBody has been called and it did not raise any interruption (just checked in the condition above).
if writtenBytes < readchunkSize {
// No further body data will be processed
// Setting processedRequestBody avoid to call more than once ProcessRequestBody
ctx.processedRequestBody = true
return types.ActionContinue
}
ctx.bodyReadIndex += readchunkSize
}
if endOfStream {
ctx.processedRequestBody = true
ctx.bodyReadIndex = 0 // cleaning for further usage
interruption, err := tx.ProcessRequestBody()
if err != nil {
ctx.logger.Error().
Err(err).
Msg("Failed to process request body")
return types.ActionContinue
}
if interruption != nil {
return ctx.handleInterruption(interruptionPhaseHttpRequestBody, interruption)
}
return types.ActionContinue
}
return types.ActionPause
}
func (ctx *httpContext) OnHttpResponseHeaders(numHeaders int, endOfStream bool) types.Action {
defer logTime("OnHttpResponseHeaders", currentTime())
if ctx.interruptedAt.isInterrupted() {
// Handling the interruption (see handleInterruption) generates a HttpResponse with the required interruption status code.
// If handleInterruption is raised during OnHttpRequestHeaders or OnHttpRequestBody, the crafted response is sent
// downstream via the filter chain, therefore OnHttpResponseHeaders is called. It has to continue to properly send back the interruption action.
// A doublecheck might be eventually added, checking that the :status header matches the expected interruption status code.
// See https://github.com/corazawaf/coraza-proxy-wasm/pull/126
ctx.logger.Debug().
Str("interruption_handled_phase", ctx.interruptedAt.String()).
Msg("Interruption already handled, sending downstream the local response")
return types.ActionContinue
}
if ctx.tx == nil {
return types.ActionContinue
}
tx := ctx.tx
if tx.IsRuleEngineOff() {
return types.ActionContinue
}
// Requests without body won't call OnHttpRequestBody, but there are rules in the request body
// phase that still need to be executed. If they haven't been executed yet, now is the time.
if !ctx.processedRequestBody {
ctx.processedRequestBody = true
interruption, err := tx.ProcessRequestBody()
if err != nil {
ctx.logger.Error().
Err(err).Msg("Failed to process request body")
return types.ActionContinue
}
if interruption != nil {
return ctx.handleInterruption(interruptionPhaseHttpResponseHeaders, interruption)
}
}
status, err := proxywasm.GetHttpResponseHeader(":status")
if err != nil {
ctx.logger.Error().
Err(err).
Msg("Failed to get :status")
propCodeRaw, propCodeErr := proxywasm.GetProperty([]string{"response", "code"})
if propCodeErr != nil {
ctx.logger.Error().
Err(propCodeErr).
Msg("Failed to get property of code of the response")
return types.ActionContinue
}
status = string(propCodeRaw)
}
code, err := strconv.Atoi(status)
if err != nil {
code = 0
}
hs, err := proxywasm.GetHttpResponseHeaders()
if err != nil {
ctx.logger.Error().
Err(err).
Msg("Failed to get response headers")
return types.ActionContinue
}
for _, h := range hs {
tx.AddResponseHeader(h[0], h[1])
}
interruption := tx.ProcessResponseHeaders(code, ctx.httpProtocol)
if interruption != nil {
return ctx.handleInterruption(interruptionPhaseHttpResponseHeaders, interruption)
}
return types.ActionContinue
}
func (ctx *httpContext) OnHttpResponseBody(bodySize int, endOfStream bool) types.Action {
defer logTime("OnHttpResponseBody", currentTime())
if ctx.interruptedAt.isInterrupted() {
// At response body phase, proxy-wasm currently relies on emptying the response body as a way of
// interruption the response. See https://github.com/corazawaf/coraza-proxy-wasm/issues/26.
// If OnHttpResponseBody is called again and an interruption has already been raised, it means that
// we have to keep going with the sanitization of the response, emptying it.
// Sending the crafted HttpResponse with empty body, we don't expect to trigger OnHttpResponseBody
ctx.logger.Debug().
Str("interruption_handled_phase", ctx.interruptedAt.String()).
Msg("Response body interruption already handled, keeping replacing the body")
// Interruption happened, we don't want to send response body data
return replaceResponseBodyWhenInterrupted(ctx.logger, bodySize)
}
if ctx.processedResponseBody {
return types.ActionContinue
}
if ctx.tx == nil {
return types.ActionContinue
}
tx := ctx.tx
if tx.IsRuleEngineOff() {
return types.ActionContinue
}
// Do not perform any action related to response body data if SecResponseBodyAccess is set to false
if !tx.IsResponseBodyAccessible() || !tx.IsResponseBodyProcessable() {
ctx.logger.Debug().Bool("SecResponseBodyAccess", tx.IsResponseBodyAccessible()).
Bool("IsResponseBodyProcessable", tx.IsResponseBodyProcessable()).
Msg("Skipping response body inspection")
// ProcessResponseBody is performed for phase 4 rules, checking already populated variables
if !ctx.processedResponseBody {
interruption, err := tx.ProcessResponseBody()
if err != nil {
ctx.logger.Error().Err(err).Msg("Failed to process response body")
return types.ActionContinue
}
ctx.processedResponseBody = true
if interruption != nil {
// Proxy-wasm can not anymore deny the response. The best interruption is emptying the body
// Coraza Multiphase evaluation will help here avoiding late interruptions
ctx.bodyReadIndex = bodySize // hacky: bodyReadIndex stores the body size that has to be replaced
return ctx.handleInterruption(interruptionPhaseHttpResponseBody, interruption)
}
}
return types.ActionContinue
}
chunkSize := bodySize - ctx.bodyReadIndex
if chunkSize > 0 {
bodyChunk, err := proxywasm.GetHttpResponseBody(ctx.bodyReadIndex, chunkSize)
if err != nil {
ctx.logger.Error().
Int("body_size", bodySize).
Int("body_read_index", ctx.bodyReadIndex).
Int("chunk_size", chunkSize).
Err(err).
Msg("Failed to read response body")
return types.ActionContinue
}
readchunkSize := len(bodyChunk)
if readchunkSize != chunkSize {
ctx.logger.Warn().Int("read_chunk_size", readchunkSize).Int("chunk_size", chunkSize).Msg("Response chunk size read is different from the computed one")
}
interruption, writtenBytes, err := tx.WriteResponseBody(bodyChunk)
if err != nil {
ctx.logger.Error().Err(err).Msg("Failed to write response body")
return types.ActionContinue
}
// bodyReadIndex has to be updated before evaluating the interruption
// it is internally needed to replace the full body if the transaction is interrupted
ctx.bodyReadIndex += readchunkSize
if interruption != nil {
return ctx.handleInterruption(interruptionPhaseHttpResponseBody, interruption)
}
// If not the whole chunk has been written, it implicitly means that we reached the waf response body limit,
// internally ProcessResponseBody has been called and it did not raise any interruption (just checked in the condition above).
if writtenBytes < readchunkSize {
// no further body data will be processed
ctx.processedResponseBody = true
return types.ActionContinue
}
}
if endOfStream {
// We have already sent response headers, an unauthorized response can not be sent anymore,
// but we can still drop the response body to prevent leaking sensitive content.
// The error will also be logged by Coraza.
ctx.processedResponseBody = true
interruption, err := tx.ProcessResponseBody()
if err != nil {
ctx.logger.Error().
Err(err).
Msg("Failed to process response body")
return types.ActionContinue
}
if interruption != nil {
return ctx.handleInterruption(interruptionPhaseHttpResponseBody, interruption)
}
return types.ActionContinue
}
// Wait until we see the entire body. It has to be buffered in order to check that it is fully legit
// before sending it downstream (to the client)
return types.ActionPause
}
func (ctx *httpContext) OnHttpStreamDone() {
defer logTime("OnHttpStreamDone", currentTime())
tx := ctx.tx
if tx != nil {
if !tx.IsRuleEngineOff() && !ctx.interruptedAt.isInterrupted() {
// Responses without body won't call OnHttpResponseBody, but there are rules in the response body
// phase that still need to be executed. If they haven't been executed yet, and there has not been a previous
// interruption, now is the time.
if !ctx.processedResponseBody {
ctx.logger.Info().Msg("Running ProcessResponseBody in OnHttpStreamDone, triggered actions will not be enforced. Further logs are for detection only purposes")
ctx.processedResponseBody = true
_, err := tx.ProcessResponseBody()
if err != nil {
ctx.logger.Error().
Err(err).
Msg("Failed to process response body")
}
}
}
// ProcessLogging is still called even if RuleEngine is off for potential logs generated before the engine is turned off.
// Internally, if the engine is off, no log phase rules are evaluated
ctx.tx.ProcessLogging()
err := ctx.tx.Close()
if err != nil {
ctx.logger.Error().Err(err).Msg("Failed to close transaction")
}
ctx.logger.Info().Msg("Finished")
logMemStats()
}
}
const noGRPCStream int32 = -1
const defaultInterruptionStatusCode int = 403
func (ctx *httpContext) handleInterruption(phase interruptionPhase, interruption *ctypes.Interruption) types.Action {
if ctx.interruptedAt.isInterrupted() {
// handleInterruption should never be called more than once
panic("Interruption already handled")
}
ctx.metrics.CountTXInterruption(phase.String(), interruption.RuleID, ctx.metricLabelsKV)
ctx.logger.Info().
Str("action", interruption.Action).
Str("phase", phase.String()).
Msg("Transaction interrupted")
ctx.interruptedAt = phase
if phase == interruptionPhaseHttpResponseBody {
return replaceResponseBodyWhenInterrupted(ctx.logger, ctx.bodyReadIndex)
}
statusCode := interruption.Status
if statusCode == 0 {
statusCode = defaultInterruptionStatusCode
}
if err := proxywasm.SendHttpResponse(uint32(statusCode), nil, nil, noGRPCStream); err != nil {
panic(err)
}
// SendHttpResponse must be followed by ActionPause in order to stop malicious content
return types.ActionPause
}
func logError(error ctypes.MatchedRule) {
msg := error.ErrorLog()
switch error.Rule().Severity() {
case ctypes.RuleSeverityEmergency:
proxywasm.LogCritical(msg)
case ctypes.RuleSeverityAlert:
proxywasm.LogCritical(msg)
case ctypes.RuleSeverityCritical:
proxywasm.LogCritical(msg)
case ctypes.RuleSeverityError:
proxywasm.LogError(msg)
case ctypes.RuleSeverityWarning:
proxywasm.LogWarn(msg)
case ctypes.RuleSeverityNotice:
proxywasm.LogInfo(msg)
case ctypes.RuleSeverityInfo:
proxywasm.LogInfo(msg)
case ctypes.RuleSeverityDebug:
proxywasm.LogDebug(msg)
}
}
// retrieveAddressInfo retrieves address properties from the proxy
// Expected targets are "source" or "destination"
// Envoy ref: https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/advanced/attributes#connection-attributes
func retrieveAddressInfo(logger debuglog.Logger, target string) (string, int) {
var targetIP, targetPortStr string
var targetPort int
targetAddressRaw, err := proxywasm.GetProperty([]string{target, "address"})
if err != nil {
logger.Debug().
Err(err).
Msg(fmt.Sprintf("Failed to get %s address", target))
} else {
targetIP, targetPortStr, err = net.SplitHostPort(string(targetAddressRaw))
if err != nil {
logger.Debug().
Err(err).
Msg(fmt.Sprintf("Failed to parse %s address", target))
}
}
targetPortRaw, err := proxywasm.GetProperty([]string{target, "port"})
if err == nil {
targetPort, err = parsePort(targetPortRaw)
if err != nil {
logger.Debug().
Err(err).
Msg(fmt.Sprintf("Failed to parse %s port", target))
}
} else if targetPortStr != "" {
// If GetProperty fails we rely on the port inside the Address property
// Mostly useful for proxies other than Envoy
targetPort, err = strconv.Atoi(targetPortStr)
if err != nil {
logger.Debug().
Err(err).
Msg(fmt.Sprintf("Failed to get %s port", target))
}
}
return targetIP, targetPort
}
// parsePort converts port, retrieved as little-endian bytes, into int
func parsePort(b []byte) (int, error) {
// Port attribute ({"source", "port"}) is populated as uint64 (8 byte)
// Ref: https://github.com/envoyproxy/envoy/blob/1b3da361279a54956f01abba830fc5d3a5421828/source/common/network/utility.cc#L201
if len(b) < 8 {
return 0, errors.New("port bytes not found")
}
// 0 < Port number <= 65535, therefore the retrieved value should never exceed 16 bits
// and correctly fit int (at least 32 bits in size)
unsignedInt := binary.LittleEndian.Uint64(b)
if unsignedInt > math.MaxInt32 {
return 0, errors.New("port conversion error")
}
return int(unsignedInt), nil
}
// replaceResponseBodyWhenInterrupted address an interruption raised during phase 4.
// At this phase, response headers are already sent downstream, therefore an interruption
// can not change anymore the status code, but only tweak the response body
func replaceResponseBodyWhenInterrupted(logger debuglog.Logger, bodySize int) types.Action {
// TODO(M4tteoP): Update response body interruption logic after https://github.com/corazawaf/coraza-proxy-wasm/issues/26
// Currently returns a body filled with null bytes that replaces the sensitive data potentially leaked
err := proxywasm.ReplaceHttpResponseBody(bytes.Repeat([]byte("\x00"), bodySize))
if err != nil {
logger.Error().Err(err).Msg("Failed to replace response body")
return types.ActionContinue
}
logger.Warn().Msg("Response body intervention occurred: body replaced")
return types.ActionContinue
}
// parseServerName parses :authority pseudo-header in order to retrieve the
// virtual host.
func parseServerName(logger debuglog.Logger, authority string) string {
host, _, err := net.SplitHostPort(authority)
if err != nil {
// missing port or bad format
logger.Debug().
Err(err).
Msg("Failed to parse server name from authority")
host = authority
}
return host
}