Skip to content

Commit

Permalink
ingest: Render captive core logs as Horizon logs (#3189)
Browse files Browse the repository at this point in the history
We go from the following raw lines in the log output:
    GABCD [category LEVEL] stuff here
to the following log-formatted lines:
    LVEL[time] category: stuff here
  • Loading branch information
Shaptic authored Nov 9, 2020
1 parent a10c000 commit ef1e30d
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 17 deletions.
34 changes: 24 additions & 10 deletions ingest/ledgerbackend/captive_core_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/pkg/errors"

"github.com/stellar/go/historyarchive"
"github.com/stellar/go/support/log"
"github.com/stellar/go/xdr"
)

Expand Down Expand Up @@ -88,6 +89,9 @@ type CaptiveStellarCore struct {
// waitIntervalPrepareRange defines a time to wait between checking if the buffer
// is empty. Default 1s, lower in tests to make them faster.
waitIntervalPrepareRange time.Duration

// Optionally, pass along a custom logger to the underlying runner.
log *log.Entry
}

// NewCaptive returns a new CaptiveStellarCore.
Expand All @@ -105,17 +109,27 @@ func NewCaptive(executablePath, configPath, networkPassphrase string, historyURL
return nil, errors.Wrap(err, "error connecting to history archive")
}

return &CaptiveStellarCore{
archive: archive,
executablePath: executablePath,
configPath: configPath,
historyURLs: historyURLs,
networkPassphrase: networkPassphrase,
stellarCoreRunnerFactory: func(configPath2 string) (stellarCoreRunnerInterface, error) {
return newStellarCoreRunner(executablePath, configPath2, networkPassphrase, historyURLs)
},
c := &CaptiveStellarCore{
archive: archive,
executablePath: executablePath,
configPath: configPath,
historyURLs: historyURLs,
networkPassphrase: networkPassphrase,
waitIntervalPrepareRange: time.Second,
}, nil
}
c.stellarCoreRunnerFactory = func(configPath2 string) (stellarCoreRunnerInterface, error) {
runner, innerErr := newStellarCoreRunner(executablePath, configPath2, networkPassphrase, historyURLs)
if innerErr != nil {
return runner, innerErr
}
runner.setLogger(c.log)
return runner, nil
}
return c, nil
}

func (c *CaptiveStellarCore) SetStellarCoreLogger(logger *log.Entry) {
c.log = logger
}

func (c *CaptiveStellarCore) getLatestCheckpointSequence() (uint32, error) {
Expand Down
3 changes: 3 additions & 0 deletions ingest/ledgerbackend/captive_core_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stellar/go/historyarchive"
"github.com/stellar/go/network"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/log"
"github.com/stellar/go/xdr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -53,6 +54,8 @@ func (m *stellarCoreRunnerMock) close() error {
return a.Error(0)
}

func (m *stellarCoreRunnerMock) setLogger(*log.Entry) {}

func buildLedgerCloseMeta(sequence uint32) xdr.LedgerCloseMeta {
opResults := []xdr.OperationResult{}
opMeta := []xdr.OperationMeta{}
Expand Down
50 changes: 45 additions & 5 deletions ingest/ledgerbackend/stellar_core_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/pkg/errors"
"github.com/stellar/go/support/log"
)

type stellarCoreRunnerInterface interface {
Expand All @@ -25,6 +26,7 @@ type stellarCoreRunnerInterface interface {
getProcessExitChan() <-chan struct{}
// getProcessExitError returns an exit error of the process, can be nil
getProcessExitError() error
setLogger(*log.Entry)
close() error
}

Expand All @@ -39,13 +41,17 @@ type stellarCoreRunner struct {
shutdown chan struct{}

cmd *exec.Cmd

// processExit channel receives an error when the process exited with an error
// or nil if the process exited without an error.
processExit chan struct{}
processExitError error
metaPipe io.Reader
tempDir string
nonce string

// Optionally, logging can be done to something other than stdout.
Log *log.Entry
}

func newStellarCoreRunner(executablePath, configPath, networkPassphrase string, historyURLs []string) (*stellarCoreRunner, error) {
Expand Down Expand Up @@ -111,22 +117,52 @@ func (r *stellarCoreRunner) getConfFileName() string {
return filepath.Join(r.tempDir, "stellar-core.conf")
}

func (*stellarCoreRunner) getLogLineWriter() io.Writer {
r, w := io.Pipe()
br := bufio.NewReader(r)
func (r *stellarCoreRunner) getLogLineWriter() io.Writer {
rd, wr := io.Pipe()
br := bufio.NewReader(rd)

// Strip timestamps from log lines from captive stellar-core. We emit our own.
dateRx := regexp.MustCompile(`^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3} `)
go func() {
levelRx := regexp.MustCompile(`G[A-Z]{4} \[(\w+) ([A-Z]+)\] (.*)`)
for {
line, err := br.ReadString('\n')
if err != nil {
break
}
line = dateRx.ReplaceAllString(line, "")
fmt.Print(line)

// If there's a logger, we attempt to extract metadata about the log
// entry, then redirect it to the logger. Otherwise, we just use stdout.
if r.Log == nil {
fmt.Print(line)
continue
}

matches := levelRx.FindStringSubmatch(line)
if len(matches) >= 4 {
// Extract the substrings from the log entry and trim it
category, level := matches[1], matches[2]
line = matches[3]

levelMapping := map[string]func(string, ...interface{}){
"FATAL": r.Log.Errorf,
"ERROR": r.Log.Errorf,
"WARNING": r.Log.Warnf,
"INFO": r.Log.Infof,
}

if writer, ok := levelMapping[strings.ToUpper(level)]; ok {
writer("%s: %s", category, line)
} else {
r.Log.Infof(line)
}
} else {
r.Log.Infof(line)
}
}
}()
return w
return wr
}

// Makes the temp directory and writes the config file to it; called by the
Expand Down Expand Up @@ -224,6 +260,10 @@ func (r *stellarCoreRunner) getProcessExitError() error {
return r.processExitError
}

func (r *stellarCoreRunner) setLogger(logger *log.Entry) {
r.Log = logger
}

func (r *stellarCoreRunner) close() error {
var err1, err2 error

Expand Down
7 changes: 5 additions & 2 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ func NewSystem(config Config) (System, error) {
return nil, errors.Wrap(err, "error creating captive core backend")
}
} else {
//
ledgerBackend, err = ledgerbackend.NewCaptive(
var captiveCoreBackend *ledgerbackend.CaptiveStellarCore
captiveCoreBackend, err = ledgerbackend.NewCaptive(
config.StellarCoreBinaryPath,
config.StellarCoreConfigPath,
config.NetworkPassphrase,
Expand All @@ -182,6 +182,9 @@ func NewSystem(config Config) (System, error) {
cancel()
return nil, errors.Wrap(err, "error creating captive core backend")
}
captiveCoreBackend.SetStellarCoreLogger(
log.WithField("subservice", "stellar-core"))
ledgerBackend = captiveCoreBackend
}
} else {
coreSession := config.CoreSession.Clone()
Expand Down

0 comments on commit ef1e30d

Please sign in to comment.