diff --git a/prefork/README.md b/prefork/README.md new file mode 100644 index 0000000000..31f2c43c31 --- /dev/null +++ b/prefork/README.md @@ -0,0 +1,90 @@ +# Prefork + +Server prefork implementation. + +Preforks master process between several child processes increases performance, because Go doesn't have to share and manage memory between cores. + +**WARNING: using prefork prevents the use of any global state!. Things like in-memory caches won't work.** + +- How it works: + +```go +import ( + "github.com/valyala/fasthttp" + "github.com/valyala/fasthttp/prefork" +) + +server := &fasthttp.Server{ + // Your configuration +} + +// Wraps the server with prefork +preforkServer := prefork.New(server) + +if err := preforkServer.ListenAndServe(":8080"); err != nil { + panic(err) +} +``` + +## Benchmarks + +Environment: + +- Machine: MacBook Pro 13-inch, 2017 +- OS: MacOS 10.15.3 +- Go: go1.13.6 darwin/amd64 + +Handler code: + +```go +func requestHandler(ctx *fasthttp.RequestCtx) { + // Simulates some hard work + time.Sleep(100 * time.Millisecond) +} +``` + +Test command: + +```bash +$ wrk -H 'Host: localhost' -H 'Accept: text/plain,text/html;q=0.9,application/xhtml+xml;q=0.9,application/xml;q=0.8,*/*;q=0.7' -H 'Connection: keep-alive' --latency -d 15 -c 512 --timeout 8 -t 4 http://localhost:8080 +``` + +Results: + +- prefork + +```bash +Running 15s test @ http://localhost:8080 + 4 threads and 512 connections + Thread Stats Avg Stdev Max +/- Stdev + Latency 4.75ms 4.27ms 126.24ms 97.45% + Req/Sec 26.46k 4.16k 71.18k 88.72% + Latency Distribution + 50% 4.55ms + 75% 4.82ms + 90% 5.46ms + 99% 15.49ms + 1581916 requests in 15.09s, 140.30MB read + Socket errors: connect 0, read 318, write 0, timeout 0 +Requests/sec: 104861.58 +Transfer/sec: 9.30MB +``` + +- **non**-prefork + +```bash +Running 15s test @ http://localhost:8080 + 4 threads and 512 connections + Thread Stats Avg Stdev Max +/- Stdev + Latency 6.42ms 11.83ms 177.19ms 96.42% + Req/Sec 24.96k 5.83k 56.83k 82.93% + Latency Distribution + 50% 4.53ms + 75% 4.93ms + 90% 6.94ms + 99% 74.54ms + 1472441 requests in 15.09s, 130.59MB read + Socket errors: connect 0, read 265, write 0, timeout 0 +Requests/sec: 97553.34 +Transfer/sec: 8.65MB +``` diff --git a/prefork/prefork.go b/prefork/prefork.go new file mode 100644 index 0000000000..8fcc842d18 --- /dev/null +++ b/prefork/prefork.go @@ -0,0 +1,188 @@ +package prefork + +import ( + "flag" + "net" + "os" + "os/exec" + "runtime" + + "github.com/valyala/fasthttp" + "github.com/valyala/fasthttp/reuseport" +) + +const preforkChildFlag = "-prefork-child" +const defaultNetwork = "tcp4" + +// Prefork implements fasthttp server prefork +// +// Preforks master process (with all cores) between several child processes +// increases performance significantly, because Go doesn't have to share +// and manage memory between cores +// +// WARNING: using prefork prevents the use of any global state! +// Things like in-memory caches won't work. +type Prefork struct { + // The network must be "tcp", "tcp4" or "tcp6". + // + // By default is "tcp4" + Network string + + // Flag to use a listener with reuseport, if not a file Listener will be used + // See: https://www.nginx.com/blog/socket-sharding-nginx-release-1-9-1/ + // + // It's disabled by default + Reuseport bool + + ServeFunc func(ln net.Listener) error + ServeTLSFunc func(ln net.Listener, certFile, keyFile string) error + ServeTLSEmbedFunc func(ln net.Listener, certData, keyData []byte) error + + ln net.Listener + files []*os.File +} + +func init() { //nolint:gochecknoinits + // Definition flag to not break the program when the user adds their own flags + // and runs `flag.Parse()` + flag.Bool(preforkChildFlag[1:], false, "Is a child process") +} + +// IsChild checks if the current thread/process is a child +func IsChild() bool { + for _, arg := range os.Args[1:] { + if arg == preforkChildFlag { + return true + } + } + + return false +} + +// New wraps the fasthttp server to run with preforked processes +func New(s *fasthttp.Server) *Prefork { + return &Prefork{ + Network: defaultNetwork, + ServeFunc: s.Serve, + ServeTLSFunc: s.ServeTLS, + ServeTLSEmbedFunc: s.ServeTLSEmbed, + } +} + +func (p *Prefork) listen(addr string) (net.Listener, error) { + runtime.GOMAXPROCS(1) + + if p.Network == "" { + p.Network = defaultNetwork + } + + if p.Reuseport { + return reuseport.Listen(p.Network, addr) + } + + return net.FileListener(os.NewFile(3, "")) +} + +func (p *Prefork) setTCPListenerFiles(addr string) error { + if p.Network == "" { + p.Network = defaultNetwork + } + + tcpAddr, err := net.ResolveTCPAddr(p.Network, addr) + if err != nil { + return err + } + + tcplistener, err := net.ListenTCP(p.Network, tcpAddr) + if err != nil { + return err + } + + p.ln = tcplistener + + fl, err := tcplistener.File() + if err != nil { + return err + } + + p.files = []*os.File{fl} + + return nil +} + +func (p *Prefork) prefork(addr string) error { + chErr := make(chan error, 1) + + if !p.Reuseport { + if err := p.setTCPListenerFiles(addr); err != nil { + return err + } + + defer p.ln.Close() + } + + for i := 0; i < runtime.GOMAXPROCS(0); i++ { + cmd := exec.Command(os.Args[0], append(os.Args[1:], preforkChildFlag)...) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + cmd.ExtraFiles = p.files + + go func() { + chErr <- cmd.Run() + }() + } + + return <-chErr +} + +// ListenAndServe serves HTTP requests from the given TCP addr +func (p *Prefork) ListenAndServe(addr string) error { + if IsChild() { + ln, err := p.listen(addr) + if err != nil { + return err + } + + p.ln = ln + + return p.ServeFunc(ln) + } + + return p.prefork(addr) +} + +// ListenAndServeTLS serves HTTPS requests from the given TCP addr +// +// certFile and keyFile are paths to TLS certificate and key files. +func (p *Prefork) ListenAndServeTLS(addr, certKey, certFile string) error { + if IsChild() { + ln, err := p.listen(addr) + if err != nil { + return err + } + + p.ln = ln + + return p.ServeTLSFunc(ln, certFile, certKey) + } + + return p.prefork(addr) +} + +// ListenAndServeTLSEmbed serves HTTPS requests from the given TCP addr +// +// certData and keyData must contain valid TLS certificate and key data. +func (p *Prefork) ListenAndServeTLSEmbed(addr string, certData, keyData []byte) error { + if IsChild() { + ln, err := p.listen(addr) + if err != nil { + return err + } + + p.ln = ln + + return p.ServeTLSEmbedFunc(ln, certData, keyData) + } + + return p.prefork(addr) +} diff --git a/prefork/prefork_test.go b/prefork/prefork_test.go new file mode 100644 index 0000000000..12f84452db --- /dev/null +++ b/prefork/prefork_test.go @@ -0,0 +1,214 @@ +package prefork + +import ( + "fmt" + "math/rand" + "net" + "os" + "reflect" + "runtime" + "testing" + + "github.com/valyala/fasthttp" +) + +func setUp() { + os.Args = append(os.Args, preforkChildFlag) +} + +func tearDown() { + os.Args = os.Args[:len(os.Args)-1] +} + +func getAddr() string { + return fmt.Sprintf("0.0.0.0:%d", rand.Intn(9000-3000)+3000) +} + +func Test_IsChild(t *testing.T) { + v := IsChild() + if v { + t.Errorf("IsChild() == %v, want %v", v, false) + } + + setUp() + + v = IsChild() + if !v { + t.Errorf("IsChild() == %v, want %v", v, true) + } + + tearDown() +} + +func Test_New(t *testing.T) { + s := &fasthttp.Server{} + p := New(s) + + if p.Network != defaultNetwork { + t.Errorf("Prefork.Netork == %s, want %s", p.Network, defaultNetwork) + } + + if reflect.ValueOf(p.ServeFunc).Pointer() != reflect.ValueOf(s.Serve).Pointer() { + t.Errorf("Prefork.ServeFunc == %p, want %p", p.ServeFunc, s.Serve) + } + + if reflect.ValueOf(p.ServeTLSFunc).Pointer() != reflect.ValueOf(s.ServeTLS).Pointer() { + t.Errorf("Prefork.ServeTLSFunc == %p, want %p", p.ServeTLSFunc, s.ServeTLS) + } + + if reflect.ValueOf(p.ServeTLSEmbedFunc).Pointer() != reflect.ValueOf(s.ServeTLSEmbed).Pointer() { + t.Errorf("Prefork.ServeTLSFunc == %p, want %p", p.ServeTLSEmbedFunc, s.ServeTLSEmbed) + } +} + +func Test_listen(t *testing.T) { + p := &Prefork{ + Reuseport: true, + } + addr := getAddr() + + ln, err := p.listen(addr) + + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + ln.Close() + + lnAddr := ln.Addr().String() + if lnAddr != addr { + t.Errorf("Prefork.Addr == %s, want %s", lnAddr, addr) + } + + if p.Network != defaultNetwork { + t.Errorf("Prefork.Network == %s, want %s", p.Network, defaultNetwork) + } + + procs := runtime.GOMAXPROCS(0) + if procs != 1 { + t.Errorf("GOMAXPROCS == %d, want %d", procs, 1) + } +} + +func Test_setTCPListenerFiles(t *testing.T) { + p := &Prefork{} + addr := getAddr() + + err := p.setTCPListenerFiles(addr) + + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + if p.ln == nil { + t.Fatal("Prefork.ln is nil") + } + + p.ln.Close() + + lnAddr := p.ln.Addr().String() + if lnAddr != addr { + t.Errorf("Prefork.Addr == %s, want %s", lnAddr, addr) + } + + if p.Network != defaultNetwork { + t.Errorf("Prefork.Network == %s, want %s", p.Network, defaultNetwork) + } + + if len(p.files) != 1 { + t.Errorf("Prefork.files == %d, want %d", len(p.files), 1) + } +} + +func Test_ListenAndServe(t *testing.T) { + setUp() + + s := &fasthttp.Server{} + p := New(s) + p.Reuseport = true + p.ServeFunc = func(ln net.Listener) error { + return nil + } + + addr := getAddr() + + err := p.ListenAndServe(addr) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + + p.ln.Close() + + lnAddr := p.ln.Addr().String() + if lnAddr != addr { + t.Errorf("Prefork.Addr == %s, want %s", lnAddr, addr) + } + + if p.ln == nil { + t.Error("Prefork.ln is nil") + } + + tearDown() +} + +func Test_ListenAndServeTLS(t *testing.T) { + setUp() + + s := &fasthttp.Server{} + p := New(s) + p.Reuseport = true + p.ServeTLSFunc = func(ln net.Listener, certFile, keyFile string) error { + return nil + } + + addr := getAddr() + + err := p.ListenAndServeTLS(addr, "./key", "./cert") + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + + p.ln.Close() + + lnAddr := p.ln.Addr().String() + if lnAddr != addr { + t.Errorf("Prefork.Addr == %s, want %s", lnAddr, addr) + } + + if p.ln == nil { + t.Error("Prefork.ln is nil") + } + + tearDown() +} + +func Test_ListenAndServeTLSEmbed(t *testing.T) { + setUp() + + s := &fasthttp.Server{} + p := New(s) + p.Reuseport = true + p.ServeTLSEmbedFunc = func(ln net.Listener, certData, keyData []byte) error { + return nil + } + + addr := getAddr() + + err := p.ListenAndServeTLSEmbed(addr, []byte("key"), []byte("cert")) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + + p.ln.Close() + + lnAddr := p.ln.Addr().String() + if lnAddr != addr { + t.Errorf("Prefork.Addr == %s, want %s", lnAddr, addr) + } + + if p.ln == nil { + t.Error("Prefork.ln is nil") + } + + tearDown() +}