Skip to content

Commit

Permalink
Merge pull request #534 from grafana/refactor/browser-connection
Browse files Browse the repository at this point in the history
Minor BrowserType and Connection refactor
  • Loading branch information
Ivan Mirić authored Sep 19, 2022
2 parents 96b89a1 + 8280ec7 commit 430e82c
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 114 deletions.
101 changes: 1 addition & 100 deletions chromium/browser_type.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package chromium

import (
"bufio"
"context"
"errors"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
Expand Down Expand Up @@ -233,17 +231,7 @@ func (b *BrowserType) allocate(
path = b.ExecutablePath()
}

cmd, stdout, err := execute(ctx, path, args, env, dataDir, logger)
if err != nil {
return nil, err
}

wsURL, err := parseDevToolsURL(ctx, stdout)
if err != nil {
return nil, fmt.Errorf("getting DevTools URL: %w", err)
}

return common.NewBrowserProcess(ctx, cancel, cmd.Process, wsURL, dataDir), nil
return common.NewBrowserProcess(ctx, path, args, env, dataDir, cancel, logger) //nolint: wrapcheck
}

// parseArgs parses command-line arguments and returns them.
Expand Down Expand Up @@ -368,93 +356,6 @@ func setFlagsFromK6Options(flags map[string]interface{}, k6opts *k6lib.Options)
}
}

func execute(
ctx context.Context, path string, args, env []string, dataDir *storage.Dir, logger *log.Logger,
) (*exec.Cmd, io.Reader, error) {
cmd := exec.CommandContext(ctx, path, args...)
killAfterParent(cmd)

stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, nil, fmt.Errorf("%w", err)
}
cmd.Stderr = cmd.Stdout

// Set up environment variable for process
if len(env) > 0 {
cmd.Env = append(os.Environ(), env...)
}

// We must start the cmd before calling cmd.Wait, as otherwise the two
// can run into a data race.
err = cmd.Start()
if os.IsNotExist(err) {
return nil, nil, fmt.Errorf("file does not exist: %s", path)
}
if err != nil {
return nil, nil, fmt.Errorf("%w", err)
}
if ctx.Err() != nil {
return nil, nil, fmt.Errorf("%w", ctx.Err())
}
go func() {
// TODO: How to handle these errors?
defer func() {
if err := dataDir.Cleanup(); err != nil {
logger.Errorf("BrowserType:Close", "cleaning up the user data directory: %v", err)
}
}()

if err := cmd.Wait(); err != nil {
logErr := logger.Errorf
if s := err.Error(); strings.Contains(s, "signal: killed") || strings.Contains(s, "exit status 1") {
// The browser process is killed when the context is cancelled
// after a k6 iteration ends, so silence the log message until
// we can stop it gracefully. See #https://github.com/grafana/xk6-browser/issues/423
logErr = logger.Debugf
}
logErr(
"browser", "process with PID %d unexpectedly ended: %v",
cmd.Process.Pid, err,
)
}
}()

return cmd, stdout, nil
}

// parseDevToolsURL grabs the websocket address from chrome's output and returns it.
func parseDevToolsURL(ctx context.Context, rc io.Reader) (wsURL string, _ error) {
type result struct {
devToolsURL string
err error
}
c := make(chan result, 1)
go func() {
const prefix = "DevTools listening on "

scanner := bufio.NewScanner(rc)
for scanner.Scan() {
if s := scanner.Text(); strings.HasPrefix(s, prefix) {
c <- result{
strings.TrimPrefix(strings.TrimSpace(s), prefix),
nil,
}
return
}
}
if err := scanner.Err(); err != nil {
c <- result{"", err}
}
}()
select {
case r := <-c:
return r.devToolsURL, r.err
case <-ctx.Done():
return "", fmt.Errorf("%w", ctx.Err())
}
}

// makeLogger makes and returns an extension wide logger.
func makeLogger(ctx context.Context, launchOpts *common.LaunchOptions) (*log.Logger, error) {
var (
Expand Down
115 changes: 111 additions & 4 deletions common/browser_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@
package common

import (
"bufio"
"context"
"fmt"
"io"
"os"
"os/exec"
"strings"

"github.com/grafana/xk6-browser/log"
"github.com/grafana/xk6-browser/storage"
Expand All @@ -49,17 +54,29 @@ type BrowserProcess struct {
}

func NewBrowserProcess(
ctx context.Context, cancel context.CancelFunc, process *os.Process, wsURL string, dataDir *storage.Dir,
) *BrowserProcess {
ctx context.Context, path string, args, env []string, dataDir *storage.Dir,
cancel context.CancelFunc, logger *log.Logger,
) (*BrowserProcess, error) {
cmd, stdout, err := execute(ctx, path, args, env, dataDir, logger)
if err != nil {
return nil, err
}

wsURL, err := parseDevToolsURL(ctx, stdout)
if err != nil {
return nil, fmt.Errorf("getting DevTools URL: %w", err)
}

p := BrowserProcess{
ctx: ctx,
cancel: cancel,
process: process,
process: cmd.Process,
lostConnection: make(chan struct{}),
processIsGracefullyClosing: make(chan struct{}),
wsURL: wsURL,
userDataDir: dataDir,
}

go func() {
// If we lose connection to the browser and we're not in-progress with clean
// browser-initiated termination then cancel the context to clean up.
Expand All @@ -70,7 +87,8 @@ func NewBrowserProcess(
p.cancel()
}
}()
return &p

return &p, nil
}

func (p *BrowserProcess) didLoseConnection() {
Expand Down Expand Up @@ -113,3 +131,92 @@ func (p *BrowserProcess) Pid() int {
func (p *BrowserProcess) AttachLogger(logger *log.Logger) {
p.logger = logger
}

func execute(
ctx context.Context, path string, args, env []string, dataDir *storage.Dir,
logger *log.Logger,
) (*exec.Cmd, io.Reader, error) {
cmd := exec.CommandContext(ctx, path, args...)
killAfterParent(cmd)

stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, nil, fmt.Errorf("%w", err)
}
cmd.Stderr = cmd.Stdout

// Set up environment variable for process
if len(env) > 0 {
cmd.Env = append(os.Environ(), env...)
}

// We must start the cmd before calling cmd.Wait, as otherwise the two
// can run into a data race.
err = cmd.Start()
if os.IsNotExist(err) {
return nil, nil, fmt.Errorf("file does not exist: %s", path)
}
if err != nil {
return nil, nil, fmt.Errorf("%w", err)
}
if ctx.Err() != nil {
return nil, nil, fmt.Errorf("%w", ctx.Err())
}

go func() {
// TODO: How to handle these errors?
defer func() {
if err := dataDir.Cleanup(); err != nil {
logger.Errorf("BrowserType:Close", "cleaning up the user data directory: %v", err)
}
}()

if err := cmd.Wait(); err != nil {
logErr := logger.Errorf
if s := err.Error(); strings.Contains(s, "signal: killed") || strings.Contains(s, "exit status 1") {
// The browser process is killed when the context is cancelled
// after a k6 iteration ends, so silence the log message until
// we can stop it gracefully. See #https://github.com/grafana/xk6-browser/issues/423
logErr = logger.Debugf
}
logErr(
"browser", "process with PID %d unexpectedly ended: %v",
cmd.Process.Pid, err,
)
}
}()

return cmd, stdout, nil
}

// parseDevToolsURL grabs the websocket address from chrome's output and returns it.
func parseDevToolsURL(ctx context.Context, rc io.Reader) (wsURL string, _ error) {
type result struct {
devToolsURL string
err error
}
c := make(chan result, 1)
go func() {
const prefix = "DevTools listening on "

scanner := bufio.NewScanner(rc)
for scanner.Scan() {
if s := scanner.Text(); strings.HasPrefix(s, prefix) {
c <- result{
strings.TrimPrefix(strings.TrimSpace(s), prefix),
nil,
}
return
}
}
if err := scanner.Err(); err != nil {
c <- result{"", err}
}
}()
select {
case r := <-c:
return r.devToolsURL, r.err
case <-ctx.Done():
return "", fmt.Errorf("%w", ctx.Err())
}
}
16 changes: 8 additions & 8 deletions common/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,10 @@ func NewConnection(ctx context.Context, wsURL string, logger *log.Logger) (*Conn
return &c, nil
}

// closeConnection cleanly closes the WebSocket connection.
// close cleanly closes the WebSocket connection.
// Returns an error if sending the close control frame fails.
func (c *Connection) closeConnection(code int) error {
c.logger.Debugf("Connection:closeConnection", "code:%d", code)
func (c *Connection) close(code int) error {
c.logger.Debugf("Connection:close", "code:%d", code)

var err error
c.shutdownOnce.Do(func() {
Expand Down Expand Up @@ -355,7 +355,7 @@ func (c *Connection) recvLoop() {
case session.readCh <- &msg:
case code := <-c.closeCh:
c.logger.Debugf("Connection:recvLoop:<-c.closeCh", "sid:%v tid:%v wsURL:%v crashed:%t", session.id, session.targetID, c.wsURL, session.crashed)
_ = c.closeConnection(code)
_ = c.close(code)
case <-c.done:
c.logger.Debugf("Connection:recvLoop:<-c.done", "sid:%v tid:%v wsURL:%v crashed:%t", session.id, session.targetID, c.wsURL, session.crashed)
return
Expand Down Expand Up @@ -388,7 +388,7 @@ func (c *Connection) send(ctx context.Context, msg *cdproto.Message, recvCh chan
return fmt.Errorf("sending a message to browser: %w", err)
case code := <-c.closeCh:
c.logger.Debugf("Connection:send:<-c.closeCh", "wsURL:%q sid:%v, websocket code:%v", c.wsURL, msg.SessionID, code)
_ = c.closeConnection(code)
_ = c.close(code)
return fmt.Errorf("closing communication with browser: %w", &websocket.CloseError{Code: code})
case <-ctx.Done():
c.logger.Debugf("Connection:send:<-ctx.Done", "wsURL:%q sid:%v err:%v", c.wsURL, msg.SessionID, c.ctx.Err())
Expand Down Expand Up @@ -426,7 +426,7 @@ func (c *Connection) send(ctx context.Context, msg *cdproto.Message, recvCh chan
return err
case code := <-c.closeCh:
c.logger.Debugf("Connection:send:<-c.closeCh #2", "sid:%v tid:%v wsURL:%q, websocket code:%v", msg.SessionID, tid, c.wsURL, code)
_ = c.closeConnection(code)
_ = c.close(code)
return &websocket.CloseError{Code: code}
case <-c.done:
c.logger.Debugf("Connection:send:<-c.done #2", "sid:%v tid:%v wsURL:%q", msg.SessionID, tid, c.wsURL)
Expand Down Expand Up @@ -476,7 +476,7 @@ func (c *Connection) sendLoop() {
}
case code := <-c.closeCh:
c.logger.Debugf("Connection:sendLoop:<-c.closeCh", "wsURL:%q code:%d", c.wsURL, code)
_ = c.closeConnection(code)
_ = c.close(code)
return
case <-c.done:
c.logger.Debugf("Connection:sendLoop:<-c.done#2", "wsURL:%q", c.wsURL)
Expand All @@ -494,7 +494,7 @@ func (c *Connection) Close(args ...goja.Value) {
code = int(args[0].ToInteger())
}
c.logger.Debugf("connection:Close", "wsURL:%q code:%d", c.wsURL, code)
_ = c.closeConnection(code)
_ = c.close(code)
}

// Execute implements cdproto.Executor and performs a synchronous send and receive.
Expand Down
2 changes: 1 addition & 1 deletion chromium/kill_linux.go → common/kill_linux.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//go:build linux
// +build linux

package chromium
package common

import (
"os/exec"
Expand Down
2 changes: 1 addition & 1 deletion chromium/kill_other.go → common/kill_other.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//go:build !linux
// +build !linux

package chromium
package common

import "os/exec"

Expand Down

0 comments on commit 430e82c

Please sign in to comment.