From 26acf4313e4ed32ad88e907f5575d15e5f56ce0b Mon Sep 17 00:00:00 2001 From: Tudor Golubenco Date: Tue, 6 Sep 2016 15:52:20 +0200 Subject: [PATCH 1/3] Improve handling of messages larger than 10MB Old implementation dropped messages larger than 10MB (hard coded), to protect against memory DoS. However, in some cases the transaction was still recorded, but the parsing of the headers was incomplete. This makes the support for large messages explicit, by adding a mode to the parser that "sees" the segments without storing them. --- packetbeat/protos/http/http.go | 15 ++-- packetbeat/protos/http/http_parser.go | 53 ++++++++++++-- packetbeat/protos/http/http_test.go | 100 ++++++++++++++++++++++---- 3 files changed, 141 insertions(+), 27 deletions(-) diff --git a/packetbeat/protos/http/http.go b/packetbeat/protos/http/http.go index cfab0d9e7b16..4ac797d568b0 100644 --- a/packetbeat/protos/http/http.go +++ b/packetbeat/protos/http/http.go @@ -265,19 +265,21 @@ func (http *HTTP) doParse( detailedf("Payload received: [%s]", pkt.Payload) } + extraMsgSize := 0 // size of a "seen" packet for which we don't store the actual bytes + st := conn.Streams[dir] if st == nil { st = newStream(pkt, tcptuple) conn.Streams[dir] = st } else { // concatenate bytes - st.data = append(st.data, pkt.Payload...) - if len(st.data) > tcp.TCP_MAX_DATA_IN_STREAM { + if len(st.data)+len(pkt.Payload) > tcp.TCP_MAX_DATA_IN_STREAM { if isDebug { - debugf("Stream data too large, dropping TCP stream") + debugf("Stream data too large, ignoring message") } - conn.Streams[dir] = nil - return conn + extraMsgSize = len(pkt.Payload) + } else { + st.data = append(st.data, pkt.Payload...) } } @@ -287,7 +289,7 @@ func (http *HTTP) doParse( } parser := newParser(&http.parserConfig) - ok, complete := parser.parse(st) + ok, complete := parser.parse(st, extraMsgSize) if !ok { // drop this tcp stream. Will retry parsing with the next // segment in it @@ -322,6 +324,7 @@ func newStream(pkt *protos.Packet, tcptuple *common.TcpTuple) *stream { func (http *HTTP) ReceivedFin(tcptuple *common.TcpTuple, dir uint8, private protos.ProtocolData) protos.ProtocolData { + debugf("Received FIN") conn := getHTTPConnection(private) if conn == nil { return private diff --git a/packetbeat/protos/http/http_parser.go b/packetbeat/protos/http/http_parser.go index 4f9021a8c463..f48bf692d966 100644 --- a/packetbeat/protos/http/http_parser.go +++ b/packetbeat/protos/http/http_parser.go @@ -16,7 +16,6 @@ type message struct { Ts time.Time hasContentLength bool headerOffset int - bodyOffset int version version connection common.NetString chunkedLength int @@ -46,9 +45,10 @@ type message struct { Notes []string - //Timing - start int - end int + //Offsets + start int + end int + bodyOffset int next *message } @@ -87,9 +87,18 @@ func newParser(config *parserConfig) *parser { return &parser{config: config} } -func (parser *parser) parse(s *stream) (bool, bool) { +func (parser *parser) parse(s *stream, extraMsgSize int) (bool, bool) { m := s.message + if extraMsgSize > 0 { + // A packet of extraMsgSize size was seen, but we don't have + // its actual bytes. This is only usable in the `stateBody` state. + if s.parseState != stateBody { + return false, false + } + return parser.eatBody(s, m, extraMsgSize) + } + for s.parseOffset < len(s.data) { switch s.parseState { case stateStart: @@ -363,14 +372,14 @@ func (parser *parser) parseHeader(m *message, data []byte) (bool, bool, int) { func (*parser) parseBody(s *stream, m *message) (ok, complete bool) { if isDebug { - debugf("eat body: %d", s.parseOffset) + debugf("parseBody body: %d", s.parseOffset) } if !m.hasContentLength && (bytes.Equal(m.connection, constClose) || (isVersion(m.version, 1, 0) && !bytes.Equal(m.connection, constKeepAlive))) { // HTTP/1.0 no content length. Add until the end of the connection if isDebug { - debugf("close connection, %d", len(s.data)-s.parseOffset) + debugf("http conn close, received %d", len(s.data)-s.parseOffset) } s.bodyReceived += (len(s.data) - s.parseOffset) m.ContentLength += (len(s.data) - s.parseOffset) @@ -391,6 +400,36 @@ func (*parser) parseBody(s *stream, m *message) (ok, complete bool) { } } +// eatBody acts as if size bytes were received, without having access to +// those bytes. +func (*parser) eatBody(s *stream, m *message, size int) (ok, complete bool) { + if isDebug { + debugf("eatBody body: %d", s.parseOffset) + } + if !m.hasContentLength && (bytes.Equal(m.connection, constClose) || + (isVersion(m.version, 1, 0) && !bytes.Equal(m.connection, constKeepAlive))) { + + // HTTP/1.0 no content length. Add until the end of the connection + if isDebug { + debugf("http conn close, received %d", size) + } + s.bodyReceived += size + m.ContentLength += size + return true, false + } else if size >= m.ContentLength-s.bodyReceived { + s.bodyReceived += (m.ContentLength - s.bodyReceived) + m.end = s.parseOffset + m.Size = uint64(m.bodyOffset-m.start) + uint64(m.ContentLength) + return true, true + } else { + s.bodyReceived += size + if isDebug { + debugf("bodyReceived: %d", s.bodyReceived) + } + return true, false + } +} + func (*parser) parseBodyChunkedStart(s *stream, m *message) (cont, ok, complete bool) { // read hexa length i := bytes.Index(s.data[s.parseOffset:], constCRLF) diff --git a/packetbeat/protos/http/http_test.go b/packetbeat/protos/http/http_test.go index 1a164fc75e54..8168a6ea4407 100644 --- a/packetbeat/protos/http/http_test.go +++ b/packetbeat/protos/http/http_test.go @@ -45,7 +45,7 @@ func (tp *testParser) parse() (*message, bool, bool) { } parser := newParser(&tp.http.parserConfig) - ok, complete := parser.parse(st) + ok, complete := parser.parse(st, 0) return st.message, ok, complete } @@ -63,9 +63,9 @@ func testParse(http *HTTP, data string) (*message, bool, bool) { return tp.parse() } -func testParseStream(http *HTTP, st *stream) (bool, bool) { +func testParseStream(http *HTTP, st *stream, extraLen int) (bool, bool) { parser := newParser(&http.parserConfig) - return parser.parse(st) + return parser.parse(st, extraLen) } func TestHttpParser_simpleResponse(t *testing.T) { @@ -165,8 +165,80 @@ func TestHttpParser_Request_ContentLength_0(t *testing.T) { assert.True(t, complete) } +func TestHttpParser_eatBody(t *testing.T) { + if testing.Verbose() { + logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"http", "httpdetailed"}) + } + + http := httpModForTests() + http.parserConfig.SendHeaders = true + http.parserConfig.SendAllHeaders = true + + data := []byte("POST / HTTP/1.1\r\n" + + "user-agent: curl/7.35.0\r\n" + + "host: localhost:9000\r\n" + + "accept: */*\r\n" + + "authorization: Company 1\r\n" + + "content-length: 20\r\n" + + "connection: close\r\n" + + "\r\n" + + "0123456789") + + st := &stream{data: data, message: new(message)} + ok, complete := testParseStream(http, st, 0) + assert.True(t, ok) + assert.False(t, complete) + assert.Equal(t, st.bodyReceived, 10) + + ok, complete = testParseStream(http, st, 5) + assert.True(t, ok) + assert.False(t, complete) + assert.Equal(t, st.bodyReceived, 15) + + ok, complete = testParseStream(http, st, 5) + assert.True(t, ok) + assert.True(t, complete) + assert.Equal(t, st.bodyReceived, 20) + assert.Equal(t, st.message.end, len(data)) +} + +func TestHttpParser_eatBody_connclose(t *testing.T) { + if testing.Verbose() { + logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"http", "httpdetailed"}) + } + + http := httpModForTests() + http.parserConfig.SendHeaders = true + http.parserConfig.SendAllHeaders = true + + data := []byte("HTTP/1.1 200 ok\r\n" + + "user-agent: curl/7.35.0\r\n" + + "host: localhost:9000\r\n" + + "accept: */*\r\n" + + "authorization: Company 1\r\n" + + "connection: close\r\n" + + "\r\n" + + "0123456789") + + st := &stream{data: data, message: new(message)} + ok, complete := testParseStream(http, st, 0) + assert.True(t, ok) + assert.False(t, complete) + assert.Equal(t, st.bodyReceived, 10) + + ok, complete = testParseStream(http, st, 5) + assert.True(t, ok) + assert.False(t, complete) + assert.Equal(t, st.bodyReceived, 15) + + ok, complete = testParseStream(http, st, 5) + assert.True(t, ok) + assert.False(t, complete) + assert.Equal(t, st.bodyReceived, 20) +} + func TestHttpParser_splitResponse(t *testing.T) { - data1 := "HTTP/1.1 200 OK\r\n" + + data1 := "HTTP/1.1 200 ok\r\n" + "Date: Tue, 14 Aug 2012 22:31:45 GMT\r\n" + "Expires: -1\r\n" + "Cache-Control: private, max-age=0\r\n" + @@ -672,7 +744,7 @@ func TestHttpParser_censorPasswordGET(t *testing.T) { st := &stream{data: data1, message: new(message)} - ok, complete := testParseStream(http, st) + ok, complete := testParseStream(http, st, 0) if !ok { t.Errorf("Parsing returned error") } @@ -722,7 +794,7 @@ func TestHttpParser_RedactAuthorization(t *testing.T) { st := &stream{data: data, message: new(message)} - ok, _ := testParseStream(http, st) + ok, _ := testParseStream(http, st, 0) st.message.Raw = st.data[st.message.start:] http.hideHeaders(st.message) @@ -759,7 +831,7 @@ func TestHttpParser_RedactAuthorization_raw(t *testing.T) { st := &stream{data: data, message: new(message)} - ok, complete := testParseStream(http, st) + ok, complete := testParseStream(http, st, 0) st.message.Raw = st.data[st.message.start:] http.hideHeaders(st.message) @@ -796,7 +868,7 @@ func TestHttpParser_RedactAuthorization_Proxy_raw(t *testing.T) { st := &stream{data: data, message: new(message)} - ok, complete := testParseStream(http, st) + ok, complete := testParseStream(http, st, 0) st.message.Raw = st.data[st.message.start:] http.hideHeaders(st.message) @@ -905,7 +977,7 @@ func Test_gap_in_headers(t *testing.T) { "Content-Type: text/html; charset=UTF-8\r\n") st := &stream{data: data1, message: new(message)} - ok, complete := testParseStream(http, st) + ok, complete := testParseStream(http, st, 0) assert.Equal(t, true, ok) assert.Equal(t, false, complete) @@ -934,7 +1006,7 @@ func Test_gap_in_body(t *testing.T) { "xxxxxxxxxxxxxxxxxxxx") st := &stream{data: data1, message: new(message)} - ok, complete := testParseStream(http, st) + ok, complete := testParseStream(http, st, 0) assert.Equal(t, true, ok) assert.Equal(t, false, complete) @@ -966,7 +1038,7 @@ func Test_gap_in_body_http1dot0(t *testing.T) { "xxxxxxxxxxxxxxxxxxxx") st := &stream{data: data1, message: new(message)} - ok, complete := testParseStream(http, st) + ok, complete := testParseStream(http, st, 0) assert.Equal(t, true, ok) assert.Equal(t, false, complete) @@ -1100,7 +1172,7 @@ func benchmarkHTTPMessage(b *testing.B, data []byte) { for i := 0; i < b.N; i++ { stream := &stream{data: data, message: new(message)} - ok, complete := parser.parse(stream) + ok, complete := parser.parse(stream, 0) if !ok || !complete { b.Errorf("failed to parse message") } @@ -1159,13 +1231,13 @@ func BenchmarkHTTPSplitResponse(b *testing.B) { for i := 0; i < b.N; i++ { stream := &stream{data: data1, message: new(message)} - ok, complete := parser.parse(stream) + ok, complete := parser.parse(stream, 0) if !ok || complete { b.Errorf("parse failure. Expected message to be incomplete, but no parse failures") } stream.data = append(stream.data, data2...) - ok, complete = parser.parse(stream) + ok, complete = parser.parse(stream, 0) if !ok || !complete { b.Errorf("failed to parse message") } From b4dcb32ba4448e4d97952e6981d5069c23a153bc Mon Sep 17 00:00:00 2001 From: Tudor Golubenco Date: Thu, 8 Sep 2016 12:43:15 +0200 Subject: [PATCH 2/3] Make the max_message_size configurable * It used to be hardcoded to 10 MB * Added a system test for the case of messages larger than max_message_size --- CHANGELOG.asciidoc | 2 ++ .../configuration/packetbeat-options.asciidoc | 6 +++++ packetbeat/etc/beat.full.yml | 4 ++++ packetbeat/packetbeat.full.yml | 4 ++++ packetbeat/protos/http/config.go | 3 +++ packetbeat/protos/http/http.go | 4 +++- .../tests/system/config/packetbeat.yml.j2 | 1 + .../tests/system/pcaps/http_get_2k_file.pcap | Bin 0 -> 4821 bytes .../tests/system/test_0063_http_body.py | 22 ++++++++++++++++++ 9 files changed, 45 insertions(+), 1 deletion(-) create mode 100644 packetbeat/tests/system/pcaps/http_get_2k_file.pcap diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 4a231243d41f..fca67c65f650 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -52,6 +52,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d *Packetbeat* - Fix mapping for some Packetbeat flow metrics that were not marked as being longs. {issue}2177[2177] +- Fix handling of messages larger than the maximum message size (10MB). {pull}2470[2470] *Topbeat* @@ -101,6 +102,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d - Add cassandra protocol analyzer to packetbeat. {pull}1959[1959] - Match connections with IPv6 addresses to processes {pull}2254[2254] - Add IP address to -devices command output {pull}2327[2327] +- Add configuration option for the maximum message size. Used to be hard-coded to 10 MB. {pull}2470[2470] *Topbeat* diff --git a/packetbeat/docs/reference/configuration/packetbeat-options.asciidoc b/packetbeat/docs/reference/configuration/packetbeat-options.asciidoc index cef30b2e73bb..f774204d7f85 100644 --- a/packetbeat/docs/reference/configuration/packetbeat-options.asciidoc +++ b/packetbeat/docs/reference/configuration/packetbeat-options.asciidoc @@ -501,6 +501,12 @@ information. If this header is present and contains a valid IP addresses, the information is used for the `real_ip` and `client_location` indexed fields. +===== max_message_size + +If an individual HTTP message is larger than this setting (in bytes), it will be trimmed +to this size. Unless this value is very small (<1.5K), Packetbeat is able to still correctly +follow the transaction and create an event for it. The default is 10485760 (10 MB). + ==== AMQP Configuration Options diff --git a/packetbeat/etc/beat.full.yml b/packetbeat/etc/beat.full.yml index 079e4414bdce..32f421e8b329 100644 --- a/packetbeat/etc/beat.full.yml +++ b/packetbeat/etc/beat.full.yml @@ -205,6 +205,10 @@ packetbeat.protocols.http: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Maximum message size. If an HTTP message is larger than this, it will + # be trimmed to this size. Default is 10 MB. + #max_message_size: 10485760 + packetbeat.protocols.memcache: # Enable memcache monitoring. Default: true #enabled: true diff --git a/packetbeat/packetbeat.full.yml b/packetbeat/packetbeat.full.yml index 14910b624367..e1928247b5c4 100644 --- a/packetbeat/packetbeat.full.yml +++ b/packetbeat/packetbeat.full.yml @@ -205,6 +205,10 @@ packetbeat.protocols.http: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Maximum message size. If an HTTP message is larger than this, it will + # be trimmed to this size. Default is 10 MB. + #max_message_size: 10485760 + packetbeat.protocols.memcache: # Enable memcache monitoring. Default: true #enabled: true diff --git a/packetbeat/protos/http/config.go b/packetbeat/protos/http/config.go index 71a256a33c04..fa1973bce385 100644 --- a/packetbeat/protos/http/config.go +++ b/packetbeat/protos/http/config.go @@ -3,6 +3,7 @@ package http import ( "github.com/elastic/beats/packetbeat/config" "github.com/elastic/beats/packetbeat/protos" + "github.com/elastic/beats/packetbeat/protos/tcp" ) type httpConfig struct { @@ -14,6 +15,7 @@ type httpConfig struct { Include_body_for []string `config:"include_body_for"` Hide_keywords []string `config:"hide_keywords"` Redact_authorization bool `config:"redact_authorization"` + MaxMessageSize int `config:"max_message_size"` } var ( @@ -21,5 +23,6 @@ var ( ProtocolCommon: config.ProtocolCommon{ TransactionTimeout: protos.DefaultTransactionExpiration, }, + MaxMessageSize: tcp.TCP_MAX_DATA_IN_STREAM, } ) diff --git a/packetbeat/protos/http/http.go b/packetbeat/protos/http/http.go index 4ac797d568b0..70e7d2f3a86f 100644 --- a/packetbeat/protos/http/http.go +++ b/packetbeat/protos/http/http.go @@ -68,6 +68,7 @@ type HTTP struct { HideKeywords []string RedactAuthorization bool IncludeBodyFor []string + MaxMessageSize int parserConfig parserConfig @@ -124,6 +125,7 @@ func (http *HTTP) setFromConfig(config *httpConfig) { http.parserConfig.RealIPHeader = strings.ToLower(config.Real_ip_header) http.transactionTimeout = config.TransactionTimeout http.IncludeBodyFor = config.Include_body_for + http.MaxMessageSize = config.MaxMessageSize if config.Send_all_headers { http.parserConfig.SendHeaders = true @@ -273,7 +275,7 @@ func (http *HTTP) doParse( conn.Streams[dir] = st } else { // concatenate bytes - if len(st.data)+len(pkt.Payload) > tcp.TCP_MAX_DATA_IN_STREAM { + if len(st.data)+len(pkt.Payload) > http.MaxMessageSize { if isDebug { debugf("Stream data too large, ignoring message") } diff --git a/packetbeat/tests/system/config/packetbeat.yml.j2 b/packetbeat/tests/system/config/packetbeat.yml.j2 index 79db5b9bf83d..916f4ee672fd 100644 --- a/packetbeat/tests/system/config/packetbeat.yml.j2 +++ b/packetbeat/tests/system/config/packetbeat.yml.j2 @@ -75,6 +75,7 @@ packetbeat.protocols.http: {%- endfor -%} ] {%- endif %} +{%- if http_max_message_size %} max_message_size: {{ http_max_message_size }} {%- endif %} packetbeat.protocols.memcache: ports: [{{ memcache_ports|default([11211])|join(", ") }}] diff --git a/packetbeat/tests/system/pcaps/http_get_2k_file.pcap b/packetbeat/tests/system/pcaps/http_get_2k_file.pcap new file mode 100644 index 0000000000000000000000000000000000000000..7f0d8069778af7285bb6f58fa6b6a5c8b5e3a91b GIT binary patch literal 4821 zcmeH~UuaWT9LG=6KS)Yu>ms!1>=i^@Fv(5qKkq}ip(g%mI}1(KId#VLw!IjWm|lxP z$2R7Ru#P=Ue9#Ad5?@9szK9Ogf)qCd+0coy$=FLLj?VvzBz{ls>Fqt{L|)52-*Y)R=eMcJ(@s)N=01gjd3C$<#?~KZ-DDeXX)QzTuABLjtgcmYY5zFvH@J38 zA8b}H9;qIp-%mI}Y$3#1U31D&RaN6~EO7aLeA-B#RXYix}Il8=+=Wsra@cDxGSs#f>fzV$6ke<|}4Y62X zAm|UeT^rJ)h9V`?iFk5^UUtZ`>~;-|YFS@+SWDqq;y^YTX!p0Q@#8@_ku(i`x) zU2Yd`vLI;kmo2>9e3LI#H2I+@tXGfRWFs1+#hQ#q)|V`WWLetW zZ3=oGm(J!&FAp};d#PeJ$KIF)3T~o}2Eo|{Ia*9``|Nq1!x|kMzko)m;3u45Q@@rS z)3S=xukXtwwW6-ILnL8MGaiwdm2>E}nYS)YWp#}=z)y$hV3Fs8+< zKV8txbC~r>%et4hUKcksMT(6aSS`sNQooi#zQHyrC@X7P6}eSf-y1Wfb%FHZ@A$!N z(w8BWR3aBj$9%&qVC5!J+>Fw3y|d-f^D8EDM(97T&TaK5F=S(}Vhut__Oc4Y^%uT|577 z{4B8dsQjq&8GC)OUnsKwRgm$DLp;Z{_xddRG8xB;@5UZI70>3(jBg1n^%p$dX4wVH z4@8!Cz)_GdEtYZH?Mpm|S$0{LUOr>f=nsKWevBvCj3VWXo`G|(h7`**s<~S`$#a-d zq-cb7y6OgRgk~zAN&=MxDhX5)c#jjH8^}D?`|q*P(=yk4=eMC(qV+!TVD2A?d|g@Z z-{Wch->A{|v+hahvFvl?;h8{akJJ)%>(H=f5ICP891v82d*d UM-yV;uYp^oTDgXcg literal 0 HcmV?d00001 diff --git a/packetbeat/tests/system/test_0063_http_body.py b/packetbeat/tests/system/test_0063_http_body.py index 2bcf84e54ab2..4beede566cf0 100644 --- a/packetbeat/tests/system/test_0063_http_body.py +++ b/packetbeat/tests/system/test_0063_http_body.py @@ -90,3 +90,25 @@ def test_wrong_content_type(self): assert "request" not in o assert "response" not in o + + def test_large_body(self): + """ + Checks that the transaction is still created if the + message is larger than the max_message_size. + """ + self.render_config_template( + http_include_body_for=["binary"], + http_ports=[8000], + http_max_message_size=1024 + ) + self.run_packetbeat(pcap="http_get_2k_file.pcap", + debug_selectors=["*"]) + objs = self.read_output() + + assert len(objs) == 1 + o = objs[0] + print len(o["http.response.body"]) + + # response body should be included but trimmed + assert len(o["http.response.body"]) < 2000 + assert len(o["http.response.body"]) > 500 From e2754c046c8c9907d467f60591f7fefb3228f44f Mon Sep 17 00:00:00 2001 From: Tudor Golubenco Date: Thu, 8 Sep 2016 13:23:27 +0200 Subject: [PATCH 3/3] Don't include body if the Content-Type is not set --- packetbeat/protos/http/http.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packetbeat/protos/http/http.go b/packetbeat/protos/http/http.go index 70e7d2f3a86f..13736f7752d4 100644 --- a/packetbeat/protos/http/http.go +++ b/packetbeat/protos/http/http.go @@ -588,7 +588,7 @@ func parseCookieValue(raw string) string { func (http *HTTP) extractBody(m *message) []byte { body := []byte{} - if len(m.ContentType) == 0 || http.shouldIncludeInBody(m.ContentType) { + if len(m.ContentType) > 0 && http.shouldIncludeInBody(m.ContentType) { if len(m.chunkedBody) > 0 { body = append(body, m.chunkedBody...) } else {