Skip to content

Commit

Permalink
agent/txn_endpoint: configure max txn request length (#7388)
Browse files Browse the repository at this point in the history
configure max transaction size separately from kv limit
  • Loading branch information
findkim committed Mar 17, 2020
1 parent 2a5412e commit 99936c1
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 82 deletions.
1 change: 1 addition & 0 deletions agent/config/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -968,6 +968,7 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
TLSPreferServerCipherSuites: b.boolVal(c.TLSPreferServerCipherSuites),
TaggedAddresses: c.TaggedAddresses,
TranslateWANAddrs: b.boolVal(c.TranslateWANAddrs),
TxnMaxReqLen: b.uint64Val(c.Limits.TxnMaxReqLen),
UIDir: b.stringVal(c.UIDir),
UIContentPath: UIPathBuilder(b.stringVal(c.UIContentPath)),
UnixSocketGroup: b.stringVal(c.UnixSocket.Group),
Expand Down
1 change: 1 addition & 0 deletions agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,7 @@ type Limits struct {
RPCMaxConnsPerClient *int `json:"rpc_max_conns_per_client,omitempty" hcl:"rpc_max_conns_per_client" mapstructure:"rpc_max_conns_per_client"`
RPCRate *float64 `json:"rpc_rate,omitempty" hcl:"rpc_rate" mapstructure:"rpc_rate"`
KVMaxValueSize *uint64 `json:"kv_max_value_size,omitempty" hcl:"kv_max_value_size" mapstructure:"kv_max_value_size"`
TxnMaxReqLen *uint64 `json:"txn_max_req_len,omitempty" hcl:"txn_max_req_len" mapstructure:"txn_max_req_len"`
}

type Segment struct {
Expand Down
1 change: 1 addition & 0 deletions agent/config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func DefaultSource() Source {
rpc_max_burst = 1000
rpc_max_conns_per_client = 100
kv_max_value_size = ` + strconv.FormatInt(raft.SuggestedMaxDataSize, 10) + `
txn_max_req_len = ` + strconv.FormatInt(raft.SuggestedMaxDataSize, 10) + `
}
performance = {
leave_drain_time = "5s"
Expand Down
6 changes: 6 additions & 0 deletions agent/config/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -1451,6 +1451,12 @@ type RuntimeConfig struct {
// hcl: translate_wan_addrs = (true|false)
TranslateWANAddrs bool

// TxnMaxReqLen configures the upper limit for the size (in bytes) of the
// incoming request bodies for transactions to the /txn endpoint.
//
// hcl: limits { txn_max_req_len = uint64 }
TxnMaxReqLen uint64

// UIDir is the directory containing the Web UI resources.
// If provided, the UI endpoints will be enabled.
//
Expand Down
7 changes: 6 additions & 1 deletion agent/config/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3905,7 +3905,8 @@ func TestFullConfig(t *testing.T) {
"rpc_rate": 12029.43,
"rpc_max_burst": 44848,
"rpc_max_conns_per_client": 2954,
"kv_max_value_size": 1234567800000000
"kv_max_value_size": 1234567800000000,
"txn_max_req_len": 5678000000000000
},
"log_level": "k1zo9Spt",
"log_json": true,
Expand Down Expand Up @@ -4535,6 +4536,7 @@ func TestFullConfig(t *testing.T) {
rpc_max_burst = 44848
rpc_max_conns_per_client = 2954
kv_max_value_size = 1234567800000000
txn_max_req_len = 5678000000000000
}
log_level = "k1zo9Spt"
log_json = true
Expand Down Expand Up @@ -5608,6 +5610,7 @@ func TestFullConfig(t *testing.T) {
"wan_ipv4": "78.63.37.19",
},
TranslateWANAddrs: true,
TxnMaxReqLen: 5678000000000000,
UIContentPath: "/consul/",
UIDir: "11IFzAUn",
UnixSocketUser: "E0nB1DwA",
Expand Down Expand Up @@ -5943,6 +5946,7 @@ func TestSanitize(t *testing.T) {
},
},
KVMaxValueSize: 1234567800000000,
TxnMaxReqLen: 5678000000000000,
}

rtJSON := `{
Expand Down Expand Up @@ -6252,6 +6256,7 @@ func TestSanitize(t *testing.T) {
"StatsiteAddr": ""
},
"TranslateWANAddrs": false,
"TxnMaxReqLen": 5678000000000000,
"UIDir": "",
"UIContentPath": "",
"UnixSocketGroup": "",
Expand Down
68 changes: 39 additions & 29 deletions agent/txn_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"encoding/base64"
"fmt"
"net/http"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -65,27 +64,49 @@ func isWrite(op api.KVOp) bool {
// a boolean, that if false means an error response has been generated and
// processing should stop.
func (s *HTTPServer) convertOps(resp http.ResponseWriter, req *http.Request) (structs.TxnOps, int, bool) {
// The TxnMaxReqLen limit and KVMaxValueSize limit both default to the
// suggested raft data size and can be configured independently. The
// TxnMaxReqLen is enforced on the cumulative size of the transaction,
// whereas the KVMaxValueSize limit is imposed on the values of individual KV
// operations -- this is to keep consistent with the behavior for KV values
// in the kvs endpoint.
//
// The defaults are set to the suggested raft size to keep the total
// transaction size reasonable to account for timely heartbeat signals. If
// the TxnMaxReqLen limit is above the raft's suggested threshold, large
// transactions are automatically set to attempt a chunking apply.
// Performance may degrade and warning messages may appear.
maxTxnLen := int64(s.agent.config.TxnMaxReqLen)
kvMaxValueSize := int64(s.agent.config.KVMaxValueSize)

// For backward compatibility, KVMaxValueSize is used as the max txn request
// length if it is configured greater than TxnMaxReqLen or its default
if maxTxnLen < kvMaxValueSize {
maxTxnLen = kvMaxValueSize
}

sizeStr := req.Header.Get("Content-Length")
if sizeStr != "" {
if size, err := strconv.Atoi(sizeStr); err != nil {
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(resp, "Failed to parse Content-Length: %v", err)
return nil, 0, false
} else if size > int(s.agent.config.KVMaxValueSize) {
resp.WriteHeader(http.StatusRequestEntityTooLarge)
fmt.Fprintf(resp, "Request body too large, max size: %v bytes", s.agent.config.KVMaxValueSize)
return nil, 0, false
}
// Check Content-Length first before decoding to return early
if req.ContentLength > maxTxnLen {
resp.WriteHeader(http.StatusRequestEntityTooLarge)
fmt.Fprintf(resp, "Request body too large, max size: %v bytes", maxTxnLen)
return nil, 0, false
}

// Note the body is in API format, and not the RPC format. If we can't
// decode it, we will return a 400 since we don't have enough context to
// associate the error with a given operation.
var ops api.TxnOps
req.Body = http.MaxBytesReader(resp, req.Body, maxTxnLen)
if err := decodeBody(req.Body, &ops); err != nil {
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(resp, "Failed to parse body: %v", err)
if err.Error() == "http: request body too large" {
// The request size is also verified during decoding to double check
// if the Content-Length header was not set by the client.
resp.WriteHeader(http.StatusRequestEntityTooLarge)
fmt.Fprintf(resp, "Request body too large, max size: %v bytes", maxTxnLen)
} else {
// Note the body is in API format, and not the RPC format. If we can't
// decode it, we will return a 400 since we don't have enough context to
// associate the error with a given operation.
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(resp, "Failed to parse body: %v", err)
}
return nil, 0, false
}

Expand All @@ -104,17 +125,15 @@ func (s *HTTPServer) convertOps(resp http.ResponseWriter, req *http.Request) (st
// byte arrays so we can assign right over.
var opsRPC structs.TxnOps
var writes int
var netKVSize uint64
for _, in := range ops {
switch {
case in.KV != nil:
size := len(in.KV.Value)
if uint64(size) > s.agent.config.KVMaxValueSize {
if int64(size) > kvMaxValueSize {
resp.WriteHeader(http.StatusRequestEntityTooLarge)
fmt.Fprintf(resp, "Value for key %q is too large (%d > %d bytes)", in.KV.Key, size, s.agent.config.KVMaxValueSize)
return nil, 0, false
}
netKVSize += uint64(size)

verb := in.KV.Verb
if isWrite(verb) {
Expand Down Expand Up @@ -257,15 +276,6 @@ func (s *HTTPServer) convertOps(resp http.ResponseWriter, req *http.Request) (st
}
}

// Enforce an overall size limit to help prevent abuse.
if netKVSize > s.agent.config.KVMaxValueSize {
resp.WriteHeader(http.StatusRequestEntityTooLarge)
fmt.Fprintf(resp, "Cumulative size of key data is too large (%d > %d bytes)",
netKVSize, s.agent.config.KVMaxValueSize)

return nil, 0, false
}

return opsRPC, writes, true
}

Expand Down
93 changes: 41 additions & 52 deletions agent/txn_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"strconv"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -54,7 +53,6 @@ func TestTxnEndpoint_Bad_Size_Item(t *testing.T) {
]
`, value)))
req, _ := http.NewRequest("PUT", "/v1/txn", buf)
req.Header.Add("Content-Length", fmt.Sprintf("%d", buf.Len()))
resp := httptest.NewRecorder()
if _, err := agent.srv.Txn(resp, req); err != nil {
t.Fatalf("err: %v", err)
Expand All @@ -67,14 +65,30 @@ func TestTxnEndpoint_Bad_Size_Item(t *testing.T) {
}
}

t.Run("toobig", func(t *testing.T) {
t.Run("exceeds default limits", func(t *testing.T) {
a := NewTestAgent(t, t.Name(), "")
testIt(t, a, false)
a.Shutdown()
})

t.Run("exceeds configured max txn len", func(t *testing.T) {
a := NewTestAgent(t, t.Name(), "limits = { txn_max_req_len = 700000 }")
testIt(t, a, false)
a.Shutdown()
})

t.Run("exceeds default max kv value size", func(t *testing.T) {
a := NewTestAgent(t, t.Name(), "limits = { txn_max_req_len = 123456789 }")
testIt(t, a, false)
a.Shutdown()
})

t.Run("allowed", func(t *testing.T) {
a := NewTestAgent(t, t.Name(), "limits = { kv_max_value_size = 123456789 }")
a := NewTestAgent(t, t.Name(), `
limits = {
txn_max_req_len = 123456789
kv_max_value_size = 123456789
}`)
testIt(t, a, true)
a.Shutdown()
})
Expand Down Expand Up @@ -124,13 +138,35 @@ func TestTxnEndpoint_Bad_Size_Net(t *testing.T) {
}
}

t.Run("toobig", func(t *testing.T) {
t.Run("exceeds default limits", func(t *testing.T) {
a := NewTestAgent(t, t.Name(), "")
testIt(a, false)
a.Shutdown()
})

t.Run("exceeds configured max txn len", func(t *testing.T) {
a := NewTestAgent(t, t.Name(), "limits = { txn_max_req_len = 700000 }")
testIt(a, false)
a.Shutdown()
})

t.Run("exceeds default max kv value size", func(t *testing.T) {
a := NewTestAgent(t, t.Name(), "limits = { txn_max_req_len = 123456789 }")
testIt(a, false)
a.Shutdown()
})

t.Run("allowed", func(t *testing.T) {
a := NewTestAgent(t, t.Name(), `
limits = {
txn_max_req_len = 123456789
kv_max_value_size = 123456789
}`)
testIt(a, true)
a.Shutdown()
})

t.Run("allowed kv max backward compatible", func(t *testing.T) {
a := NewTestAgent(t, t.Name(), "limits = { kv_max_value_size = 123456789 }")
testIt(a, true)
a.Shutdown()
Expand Down Expand Up @@ -612,50 +648,3 @@ func TestTxnEndpoint_UpdateCheck(t *testing.T) {
}
assert.Equal(t, expected, txnResp)
}

func TestConvertOps_ContentLength(t *testing.T) {
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()

jsonBody := `[
{
"KV": {
"Verb": "set",
"Key": "key1",
"Value": "aGVsbG8gd29ybGQ="
}
}
]`

tests := []struct {
contentLength string
ok bool
}{
{"", true},
{strconv.Itoa(len(jsonBody)), true},
{strconv.Itoa(raft.SuggestedMaxDataSize), true},
{strconv.Itoa(raft.SuggestedMaxDataSize + 100), false},
}

for _, tc := range tests {
t.Run("contentLength: "+tc.contentLength, func(t *testing.T) {
resp := httptest.NewRecorder()
var body bytes.Buffer

// Doesn't matter what the request body size actually is, as we only
// check 'Content-Length' header in this test anyway.
body.WriteString(jsonBody)

req := httptest.NewRequest("POST", "http://foo.com", &body)
req.Header.Add("Content-Length", tc.contentLength)

_, _, ok := a.srv.convertOps(resp, req)
if ok != tc.ok {
t.Fatal("ok != tc.ok")
}

})

}

}
8 changes: 8 additions & 0 deletions website/source/docs/agent/options.html.md
Original file line number Diff line number Diff line change
Expand Up @@ -1443,6 +1443,14 @@ default will automatically work with some tooling.
single RPC call to a Consul server. See
https://en.wikipedia.org/wiki/Token_bucket for more details about how
token bucket rate limiters operate.
* <a name="txn_max_req_len"></a><a href="#txn_max_req_len">
`txn_max_req_len`</a> - Configures the maximum number of
bytes for a transaction request body to the [`/v1/txn`](/api/txn.html)
endpoint. This limit defaults to [raft's](https://github.com/hashicorp/raft)
suggested max size. **Note that increasing beyond this default can
cause Consul to fail in unexpected ways**, it may potentially affect
leadership stability and prevent timely heartbeat signals by
increasing RPC IO duration.
* <a name="log_file"></a><a href="#log_file">`log_file`</a> Equivalent to the
[`-log-file` command-line flag](#_log_file).
Expand Down

0 comments on commit 99936c1

Please sign in to comment.