Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC for creating TCP Reset toxic #247

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions link.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package toxiproxy

import (
"io"

"github.com/Shopify/toxiproxy/stream"
"github.com/Shopify/toxiproxy/toxics"
"github.com/sirupsen/logrus"
"io"
"net"
)

// ToxicLinks are single direction pipelines that connects an input and output via
Expand Down Expand Up @@ -54,6 +54,7 @@ func NewToxicLink(proxy *Proxy, collection *ToxicCollection, direction stream.Di

// Start the link with the specified toxics
func (link *ToxicLink) Start(name string, source io.Reader, dest io.WriteCloser) {

go func() {
bytes, err := io.Copy(link.input, source)
if err != nil {
Expand All @@ -69,6 +70,24 @@ func (link *ToxicLink) Start(name string, source io.Reader, dest io.WriteCloser)
if stateful, ok := toxic.Toxic.(toxics.StatefulToxic); ok {
link.stubs[i].State = stateful.NewState()
}
if _, ok := toxic.Toxic.(*toxics.ResetToxic); ok {
if err := source.(*net.TCPConn).SetLinger(0); err != nil {
logrus.WithFields(logrus.Fields{
"name": link.proxy.Name,
"toxic": toxic.Type,
"err": err,
}).Error("source: Unable to setLinger(ms)")

}
if err := dest.(*net.TCPConn).SetLinger(0); err != nil {
logrus.WithFields(logrus.Fields{
"name": link.proxy.Name,
"toxic": toxic.Type,
"err": err,
}).Error("dest: Unable to setLinger(ms)")

}
}

go link.stubs[i].Run(toxic)
}
Expand Down
37 changes: 37 additions & 0 deletions toxics/reset_peer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package toxics

import (
"time"
)

/*
The ResetToxic sends closes the connection abruptly after a timeout (in ms). The behaviour of Close is set to discard any unsent/unacknowledged data by setting SetLinger to 0,
~= sets TCP RST flag and resets the connection.
If the timeout is set to 0, then the connection will be reset immediately.

Drop data since it will initiate a graceful close by sending the FIN/ACK. (io.EOF)
*/

type ResetToxic struct {
// Timeout in milliseconds
Timeout int64 `json:"timeout"`
}

func (t *ResetToxic) Pipe(stub *ToxicStub) {
timeout := time.Duration(t.Timeout) * time.Millisecond

for {
select {
case <-stub.Interrupt:
return
case <-stub.Input:
<-time.After(timeout)
stub.Close()
return
}
}
}

func init() {
Register("reset_peer", new(ResetToxic))
}
83 changes: 83 additions & 0 deletions toxics/reset_peer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package toxics_test

import (
"bufio"
"github.com/Shopify/toxiproxy/toxics"
"io"
"net"
"os"
"syscall"
"testing"
"time"
)

const msg = "reset toxic payload\n"

func TestResetToxicNoTimeout(t *testing.T) {
resetTCPHelper(t, ToxicToJson(t, "resettcp", "reset_peer", "upstream", &toxics.ResetToxic{}))
}

func TestResetToxicWithTimeout(t *testing.T) {
start := time.Now()
resetToxic := toxics.ResetToxic{Timeout: 100}
resetTCPHelper(t, ToxicToJson(t, "resettcp", "reset_peer", "upstream", &resetToxic))
AssertDeltaTime(t, "Reset after timeout", time.Since(start), time.Duration(resetToxic.Timeout)*time.Millisecond, time.Duration(resetToxic.Timeout+10)*time.Millisecond)
}

func TestResetToxicWithTimeoutDownstream(t *testing.T) {
start := time.Now()
resetToxic := toxics.ResetToxic{Timeout: 100}
resetTCPHelper(t, ToxicToJson(t, "resettcp", "reset_peer", "downstream", &resetToxic))
AssertDeltaTime(t, "Reset after timeout", time.Since(start), time.Duration(resetToxic.Timeout)*time.Millisecond, time.Duration(resetToxic.Timeout+10)*time.Millisecond)

}

func checkConnectionState(t *testing.T, listenAddress string) {
conn, err := net.Dial("tcp", listenAddress)
if err != nil {
t.Error("Unable to dial TCP server", err)
}
if _, err := conn.Write([]byte(msg)); err != nil {
t.Error("Failed writing TCP payload", err)
}
tmp := make([]byte, 1000)
_, err = conn.Read(tmp)
defer conn.Close()
if opErr, ok := err.(*net.OpError); ok {
syscallErr, _ := opErr.Err.(*os.SyscallError)
if !(syscallErr.Err == syscall.ECONNRESET) {
t.Error("Expected: connection reset by peer. Got:", err)
}
} else {
t.Error("Expected: connection reset by peer. Got:", err, "conn:", conn.RemoteAddr(), conn.LocalAddr())
}
_, err = conn.Read(tmp)
if err != io.EOF {
t.Error("expected EOF from closed connection")
}
}

func resetTCPHelper(t *testing.T, toxicJSON io.Reader) {
ln, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatal("Failed to create TCP server", err)
}
defer ln.Close()
proxy := NewTestProxy("test", ln.Addr().String())
proxy.Start()
proxy.Toxics.AddToxicJson(toxicJSON)
defer proxy.Stop()

go func() {
conn, err := ln.Accept()
if err != nil {
t.Error("Unable to accept TCP connection", err)
}
defer ln.Close()
scan := bufio.NewScanner(conn)
if scan.Scan() {
conn.Write([]byte(msg))
}
}()
checkConnectionState(t, proxy.Listen)
}