Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

agent/txn_endpoint: configure max txn request length #7388

Merged
merged 8 commits into from
Mar 5, 2020
1 change: 1 addition & 0 deletions agent/config/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -968,6 +968,7 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
TLSPreferServerCipherSuites: b.boolVal(c.TLSPreferServerCipherSuites),
TaggedAddresses: c.TaggedAddresses,
TranslateWANAddrs: b.boolVal(c.TranslateWANAddrs),
TxnMaxReqLen: b.uint64Val(c.Limits.TxnMaxReqLen),
UIDir: b.stringVal(c.UIDir),
UIContentPath: UIPathBuilder(b.stringVal(c.UIContentPath)),
UnixSocketGroup: b.stringVal(c.UnixSocket.Group),
Expand Down
1 change: 1 addition & 0 deletions agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,7 @@ type Limits struct {
RPCMaxConnsPerClient *int `json:"rpc_max_conns_per_client,omitempty" hcl:"rpc_max_conns_per_client" mapstructure:"rpc_max_conns_per_client"`
RPCRate *float64 `json:"rpc_rate,omitempty" hcl:"rpc_rate" mapstructure:"rpc_rate"`
KVMaxValueSize *uint64 `json:"kv_max_value_size,omitempty" hcl:"kv_max_value_size" mapstructure:"kv_max_value_size"`
TxnMaxReqLen *uint64 `json:"txn_max_req_len,omitempty" hcl:"txn_max_req_len" mapstructure:"txn_max_req_len"`
}

type Segment struct {
Expand Down
1 change: 1 addition & 0 deletions agent/config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func DefaultSource() Source {
rpc_max_burst = 1000
rpc_max_conns_per_client = 100
kv_max_value_size = ` + strconv.FormatInt(raft.SuggestedMaxDataSize, 10) + `
txn_max_req_len = ` + strconv.FormatInt(raft.SuggestedMaxDataSize, 10) + `
}
performance = {
leave_drain_time = "5s"
Expand Down
6 changes: 6 additions & 0 deletions agent/config/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -1452,6 +1452,12 @@ type RuntimeConfig struct {
// hcl: translate_wan_addrs = (true|false)
TranslateWANAddrs bool

// TxnMaxReqLen configures the upper limit for the size (in bytes) of the
// incoming request bodies for transactions to the /txn endpoint.
//
// hcl: limits { txn_max_req_len = uint64 }
TxnMaxReqLen uint64

// UIDir is the directory containing the Web UI resources.
// If provided, the UI endpoints will be enabled.
//
Expand Down
7 changes: 6 additions & 1 deletion agent/config/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3870,7 +3870,8 @@ func TestFullConfig(t *testing.T) {
"rpc_rate": 12029.43,
"rpc_max_burst": 44848,
"rpc_max_conns_per_client": 2954,
"kv_max_value_size": 1234567800000000
"kv_max_value_size": 1234567800000000,
"txn_max_req_len": 5678000000000000
},
"log_level": "k1zo9Spt",
"log_json": true,
Expand Down Expand Up @@ -4500,6 +4501,7 @@ func TestFullConfig(t *testing.T) {
rpc_max_burst = 44848
rpc_max_conns_per_client = 2954
kv_max_value_size = 1234567800000000
txn_max_req_len = 5678000000000000
}
log_level = "k1zo9Spt"
log_json = true
Expand Down Expand Up @@ -5573,6 +5575,7 @@ func TestFullConfig(t *testing.T) {
"wan_ipv4": "78.63.37.19",
},
TranslateWANAddrs: true,
TxnMaxReqLen: 5678000000000000,
UIContentPath: "/consul/",
UIDir: "11IFzAUn",
UnixSocketUser: "E0nB1DwA",
Expand Down Expand Up @@ -5908,6 +5911,7 @@ func TestSanitize(t *testing.T) {
},
},
KVMaxValueSize: 1234567800000000,
TxnMaxReqLen: 5678000000000000,
}

rtJSON := `{
Expand Down Expand Up @@ -6217,6 +6221,7 @@ func TestSanitize(t *testing.T) {
"StatsiteAddr": ""
},
"TranslateWANAddrs": false,
"TxnMaxReqLen": 5678000000000000,
"UIDir": "",
"UIContentPath": "",
"UnixSocketGroup": "",
Expand Down
68 changes: 39 additions & 29 deletions agent/txn_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"encoding/base64"
"fmt"
"net/http"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -65,27 +64,49 @@ func isWrite(op api.KVOp) bool {
// a boolean, that if false means an error response has been generated and
// processing should stop.
func (s *HTTPServer) convertOps(resp http.ResponseWriter, req *http.Request) (structs.TxnOps, int, bool) {
// The TxnMaxReqLen limit and KVMaxValueSize limit both default to the
// suggested raft data size and can be configured independently. The
// TxnMaxReqLen is enforced on the cumulative size of the transaction,
// whereas the KVMaxValueSize limit is imposed on the values of individual KV
// operations -- this is to keep consistent with the behavior for KV values
// in the kvs endpoint.
//
// The defaults are set to the suggested raft size to keep the total
// transaction size reasonable to account for timely heartbeat signals. If
// the TxnMaxReqLen limit is above the raft's suggested threshold, large
// transactions are automatically set to attempt a chunking apply.
// Performance may degrade and warning messages may appear.
maxTxnLen := int64(s.agent.config.TxnMaxReqLen)
kvMaxValueSize := int64(s.agent.config.KVMaxValueSize)

// For backward compatibility, KVMaxValueSize is used as the max txn request
// length if it is configured greater than TxnMaxReqLen or its default
if maxTxnLen < kvMaxValueSize {
maxTxnLen = kvMaxValueSize
}

sizeStr := req.Header.Get("Content-Length")
if sizeStr != "" {
if size, err := strconv.Atoi(sizeStr); err != nil {
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(resp, "Failed to parse Content-Length: %v", err)
return nil, 0, false
} else if size > int(s.agent.config.KVMaxValueSize) {
resp.WriteHeader(http.StatusRequestEntityTooLarge)
fmt.Fprintf(resp, "Request body too large, max size: %v bytes", s.agent.config.KVMaxValueSize)
return nil, 0, false
}
// Check Content-Length first before decoding to return early
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this code existed before, but I don't like the duplication and I am wondering what your opinion is on removing the early return in favor of readability and maintainability.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If availability of the agents are important, I think it's worth while to reduce the overhead of unnecessary processing time to decode a large request. Though, on the point of readability, what do you think about the change I just pushed up? It simplifies checking of content length from relying on the Content-Length header to using the actual request length.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that!

if req.ContentLength > maxTxnLen {
resp.WriteHeader(http.StatusRequestEntityTooLarge)
fmt.Fprintf(resp, "Request body too large, max size: %v bytes", maxTxnLen)
return nil, 0, false
}

// Note the body is in API format, and not the RPC format. If we can't
// decode it, we will return a 400 since we don't have enough context to
// associate the error with a given operation.
var ops api.TxnOps
req.Body = http.MaxBytesReader(resp, req.Body, maxTxnLen)
if err := decodeBody(req.Body, &ops); err != nil {
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(resp, "Failed to parse body: %v", err)
if err.Error() == "http: request body too large" {
// The request size is also verified during decoding to double check
// if the Content-Length header was not set by the client.
resp.WriteHeader(http.StatusRequestEntityTooLarge)
fmt.Fprintf(resp, "Request body too large, max size: %v bytes", maxTxnLen)
} else {
// Note the body is in API format, and not the RPC format. If we can't
// decode it, we will return a 400 since we don't have enough context to
// associate the error with a given operation.
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(resp, "Failed to parse body: %v", err)
}
return nil, 0, false
}

Expand All @@ -104,17 +125,15 @@ func (s *HTTPServer) convertOps(resp http.ResponseWriter, req *http.Request) (st
// byte arrays so we can assign right over.
var opsRPC structs.TxnOps
var writes int
var netKVSize uint64
for _, in := range ops {
switch {
case in.KV != nil:
size := len(in.KV.Value)
if uint64(size) > s.agent.config.KVMaxValueSize {
if int64(size) > kvMaxValueSize {
resp.WriteHeader(http.StatusRequestEntityTooLarge)
fmt.Fprintf(resp, "Value for key %q is too large (%d > %d bytes)", in.KV.Key, size, s.agent.config.KVMaxValueSize)
return nil, 0, false
}
netKVSize += uint64(size)

verb := in.KV.Verb
if isWrite(verb) {
Expand Down Expand Up @@ -257,15 +276,6 @@ func (s *HTTPServer) convertOps(resp http.ResponseWriter, req *http.Request) (st
}
}

// Enforce an overall size limit to help prevent abuse.
if netKVSize > s.agent.config.KVMaxValueSize {
resp.WriteHeader(http.StatusRequestEntityTooLarge)
fmt.Fprintf(resp, "Cumulative size of key data is too large (%d > %d bytes)",
netKVSize, s.agent.config.KVMaxValueSize)

return nil, 0, false
}

return opsRPC, writes, true
}

Expand Down
93 changes: 41 additions & 52 deletions agent/txn_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"strconv"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -54,7 +53,6 @@ func TestTxnEndpoint_Bad_Size_Item(t *testing.T) {
]
`, value)))
req, _ := http.NewRequest("PUT", "/v1/txn", buf)
req.Header.Add("Content-Length", fmt.Sprintf("%d", buf.Len()))
resp := httptest.NewRecorder()
if _, err := agent.srv.Txn(resp, req); err != nil {
t.Fatalf("err: %v", err)
Expand All @@ -67,14 +65,30 @@ func TestTxnEndpoint_Bad_Size_Item(t *testing.T) {
}
}

t.Run("toobig", func(t *testing.T) {
t.Run("exceeds default limits", func(t *testing.T) {
a := NewTestAgent(t, t.Name(), "")
testIt(t, a, false)
a.Shutdown()
})

t.Run("exceeds configured max txn len", func(t *testing.T) {
a := NewTestAgent(t, t.Name(), "limits = { txn_max_req_len = 700000 }")
testIt(t, a, false)
a.Shutdown()
})

t.Run("exceeds default max kv value size", func(t *testing.T) {
a := NewTestAgent(t, t.Name(), "limits = { txn_max_req_len = 123456789 }")
testIt(t, a, false)
a.Shutdown()
})

t.Run("allowed", func(t *testing.T) {
a := NewTestAgent(t, t.Name(), "limits = { kv_max_value_size = 123456789 }")
a := NewTestAgent(t, t.Name(), `
limits = {
txn_max_req_len = 123456789
kv_max_value_size = 123456789
}`)
testIt(t, a, true)
a.Shutdown()
})
Expand Down Expand Up @@ -124,13 +138,35 @@ func TestTxnEndpoint_Bad_Size_Net(t *testing.T) {
}
}

t.Run("toobig", func(t *testing.T) {
t.Run("exceeds default limits", func(t *testing.T) {
a := NewTestAgent(t, t.Name(), "")
testIt(a, false)
a.Shutdown()
})

t.Run("exceeds configured max txn len", func(t *testing.T) {
a := NewTestAgent(t, t.Name(), "limits = { txn_max_req_len = 700000 }")
testIt(a, false)
a.Shutdown()
})

t.Run("exceeds default max kv value size", func(t *testing.T) {
a := NewTestAgent(t, t.Name(), "limits = { txn_max_req_len = 123456789 }")
testIt(a, false)
a.Shutdown()
})

t.Run("allowed", func(t *testing.T) {
a := NewTestAgent(t, t.Name(), `
limits = {
txn_max_req_len = 123456789
kv_max_value_size = 123456789
}`)
testIt(a, true)
a.Shutdown()
})

t.Run("allowed kv max backward compatible", func(t *testing.T) {
a := NewTestAgent(t, t.Name(), "limits = { kv_max_value_size = 123456789 }")
testIt(a, true)
a.Shutdown()
Expand Down Expand Up @@ -612,50 +648,3 @@ func TestTxnEndpoint_UpdateCheck(t *testing.T) {
}
assert.Equal(t, expected, txnResp)
}

func TestConvertOps_ContentLength(t *testing.T) {
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()

jsonBody := `[
{
"KV": {
"Verb": "set",
"Key": "key1",
"Value": "aGVsbG8gd29ybGQ="
}
}
]`

tests := []struct {
contentLength string
ok bool
}{
{"", true},
{strconv.Itoa(len(jsonBody)), true},
{strconv.Itoa(raft.SuggestedMaxDataSize), true},
{strconv.Itoa(raft.SuggestedMaxDataSize + 100), false},
}

for _, tc := range tests {
t.Run("contentLength: "+tc.contentLength, func(t *testing.T) {
resp := httptest.NewRecorder()
var body bytes.Buffer

// Doesn't matter what the request body size actually is, as we only
// check 'Content-Length' header in this test anyway.
body.WriteString(jsonBody)

req := httptest.NewRequest("POST", "http://foo.com", &body)
req.Header.Add("Content-Length", tc.contentLength)

_, _, ok := a.srv.convertOps(resp, req)
if ok != tc.ok {
t.Fatal("ok != tc.ok")
}

})

}

}
8 changes: 8 additions & 0 deletions website/source/docs/agent/options.html.md
Original file line number Diff line number Diff line change
Expand Up @@ -1449,6 +1449,14 @@ Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'."
single RPC call to a Consul server. See
https://en.wikipedia.org/wiki/Token_bucket for more details about how
token bucket rate limiters operate.
* <a name="txn_max_req_len"></a><a href="#txn_max_req_len">
`txn_max_req_len`</a> - Configures the maximum number of
bytes for a transaction request body to the [`/v1/txn`](/api/txn.html)
endpoint. This limit defaults to [raft's](https://github.com/hashicorp/raft)
suggested max size. **Note that increasing beyond this default can
cause Consul to fail in unexpected ways**, it may potentially affect
leadership stability and prevent timely heartbeat signals by
increasing RPC IO duration.

* <a name="log_file"></a><a href="#log_file">`log_file`</a> Equivalent to the
[`-log-file` command-line flag](#_log_file).
Expand Down