Skip to content

Commit

Permalink
Parse captive core toml to implement better merging
Browse files Browse the repository at this point in the history
  • Loading branch information
tamirms committed Apr 24, 2021
1 parent fcd228f commit e142246
Show file tree
Hide file tree
Showing 21 changed files with 567 additions and 233 deletions.
33 changes: 20 additions & 13 deletions exp/services/captivecore/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,17 @@ import (
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/network"
"github.com/stellar/go/support/config"
support "github.com/stellar/go/support/config"
"github.com/stellar/go/support/db"
supporthttp "github.com/stellar/go/support/http"
supportlog "github.com/stellar/go/support/log"
)

func main() {
var port int
var networkPassphrase, binaryPath, configAppendPath, dbURL string
var networkPassphrase, binaryPath, dbURL string
var captiveCoreTomlParams ledgerbackend.CaptiveCoreTomlParams
var historyArchiveURLs []string
var stellarCoreHTTPPort uint
var checkpointFrequency uint32
var logLevel logrus.Level
logger := supportlog.New()
Expand Down Expand Up @@ -56,9 +57,9 @@ func main() {
Name: "captive-core-config-append-path",
OptType: types.String,
FlagDefault: "",
Required: false,
Required: true,
Usage: "path to additional configuration for the Stellar Core configuration file used by captive core. It must, at least, include enough details to define a quorum set",
ConfigKey: &configAppendPath,
ConfigKey: &captiveCoreTomlParams.ConfigPath,
},
&config.ConfigOption{
Name: "history-archive-urls",
Expand Down Expand Up @@ -97,12 +98,13 @@ func main() {
Usage: "horizon postgres database to connect with",
},
&config.ConfigOption{
Name: "stellar-captive-core-http-port",
ConfigKey: &stellarCoreHTTPPort,
OptType: types.Uint,
FlagDefault: uint(11626),
Required: false,
Usage: "HTTP port for captive core to listen on (0 disables the HTTP server)",
Name: "stellar-captive-core-http-port",
ConfigKey: &captiveCoreTomlParams.HTTPPort,
OptType: types.Uint,
CustomSetValue: support.SetOptionalUint,
Required: false,
FlagDefault: uint(0),
Usage: "HTTP port for Captive Core to listen on (0 disables the HTTP server)",
},
&config.ConfigOption{
Name: "checkpoint-frequency",
Expand All @@ -121,19 +123,24 @@ func main() {
configOpts.SetValues()
logger.SetLevel(logLevel)

captiveCoreTomlParams.HistoryArchiveURLs = historyArchiveURLs
captiveCoreTomlParams.NetworkPassphrase = networkPassphrase
captiveCoreToml, err := ledgerbackend.NewCaptiveCoreToml(captiveCoreTomlParams)
if err != nil {
logger.WithError(err).Fatal("Invalid captive core toml")
}

captiveConfig := ledgerbackend.CaptiveCoreConfig{
BinaryPath: binaryPath,
ConfigAppendPath: configAppendPath,
NetworkPassphrase: networkPassphrase,
HistoryArchiveURLs: historyArchiveURLs,
CheckpointFrequency: checkpointFrequency,
HTTPPort: stellarCoreHTTPPort,
Log: logger.WithField("subservice", "stellar-core"),
Toml: captiveCoreToml,
}

var dbConn *db.Session
if len(dbURL) > 0 {
var err error
dbConn, err = db.Open("postgres", dbURL)
if err != nil {
logger.WithError(err).Fatal("Could not create db connection instance")
Expand Down
1 change: 0 additions & 1 deletion exp/tools/captive-core-start-tester/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ func check(ledger uint32) bool {
c, err := ledgerbackend.NewCaptive(
ledgerbackend.CaptiveCoreConfig{
BinaryPath: "stellar-core",
ConfigAppendPath: "stellar-core-standalone2.cfg",
NetworkPassphrase: "Standalone Network ; February 2017",
HistoryArchiveURLs: []string{"http://localhost:1570"},
},
Expand Down
1 change: 0 additions & 1 deletion ingest/doc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ func Example_changes() {
backend, err := ledgerbackend.NewCaptive(
ledgerbackend.CaptiveCoreConfig{
BinaryPath: "/bin/stellar-core",
ConfigAppendPath: "/opt/stellar-core.cfg",
NetworkPassphrase: networkPassphrase,
HistoryArchiveURLs: []string{archiveURL},
},
Expand Down
13 changes: 1 addition & 12 deletions ingest/ledgerbackend/captive_core_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,11 @@ type CaptiveStellarCore struct {
type CaptiveCoreConfig struct {
// BinaryPath is the file path to the Stellar Core binary
BinaryPath string
// ConfigAppendPath is the file path to additional configuration for the Stellar Core configuration file used
// by captive core. This field is only required when ingesting in online mode (e.g. UnboundedRange).
ConfigAppendPath string
// NetworkPassphrase is the Stellar network passphrase used by captive core when connecting to the Stellar network
NetworkPassphrase string
// HistoryArchiveURLs are a list of history archive urls
HistoryArchiveURLs []string
Toml *CaptiveCoreToml

// Optional fields

Expand All @@ -121,18 +119,9 @@ type CaptiveCoreConfig struct {
CheckpointFrequency uint32
// LedgerHashStore is an optional store used to obtain hashes for ledger sequences from a trusted source
LedgerHashStore TrustedLedgerHashStore
// HTTPPort is the TCP port to listen for requests (defaults to 0, which disables the HTTP server)
HTTPPort uint
// PeerPort is the TCP port to bind to for connecting to the Stellar network
// (defaults to 11625). It may be useful for example when there's >1 Stellar-Core
// instance running on a machine.
PeerPort uint
// Log is an (optional) custom logger which will capture any output from the Stellar Core process.
// If Log is omitted then all output will be printed to stdout.
Log *log.Entry
// LogPath is the (optional) path in which to store Core logs, passed along
// to Stellar Core's LOG_FILE_PATH
LogPath string
// Context is the (optional) context which controls the lifetime of a CaptiveStellarCore instance. Once the context is done
// the CaptiveStellarCore instance will not be able to stream ledgers from Stellar Core or spawn new
// instances of Stellar Core. If Context is omitted CaptiveStellarCore will default to using context.Background.
Expand Down
6 changes: 4 additions & 2 deletions ingest/ledgerbackend/captive_core_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,12 @@ type testLedgerHeader struct {

func TestCaptiveNew(t *testing.T) {
executablePath := "/etc/stellar-core"
configPath := "/etc/stellar-core.cfg"
networkPassphrase := network.PublicNetworkPassphrase
historyURLs := []string{"http://history.stellar.org/prd/core-live/core_live_001"}

captiveStellarCore, err := NewCaptive(
CaptiveCoreConfig{
BinaryPath: executablePath,
ConfigAppendPath: configPath,
NetworkPassphrase: networkPassphrase,
HistoryArchiveURLs: historyURLs,
},
Expand Down Expand Up @@ -631,11 +629,15 @@ func TestCaptiveStellarCore_PrepareRangeAfterClose(t *testing.T) {
networkPassphrase := network.PublicNetworkPassphrase
historyURLs := []string{"http://localhost"}

captiveCoreToml, err := NewCaptiveCoreToml(CaptiveCoreTomlParams{})
assert.NoError(t, err)

captiveStellarCore, err := NewCaptive(
CaptiveCoreConfig{
BinaryPath: executablePath,
NetworkPassphrase: networkPassphrase,
HistoryArchiveURLs: historyURLs,
Toml: captiveCoreToml,
},
)
assert.NoError(t, err)
Expand Down
99 changes: 23 additions & 76 deletions ingest/ledgerbackend/stellar_core_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,7 @@ type pipe struct {
}

type stellarCoreRunner struct {
logPath string
executablePath string
configAppendPath string
networkPassphrase string
historyURLs []string
httpPort uint
peerPort uint
mode stellarCoreRunnerMode
executablePath string

started bool
wg sync.WaitGroup
Expand Down Expand Up @@ -112,84 +105,49 @@ func newStellarCoreRunner(config CaptiveCoreConfig, mode stellarCoreRunnerMode)
ctx, cancel := context.WithCancel(config.Context)

runner := &stellarCoreRunner{
logPath: config.LogPath,
executablePath: config.BinaryPath,
configAppendPath: config.ConfigAppendPath,
networkPassphrase: config.NetworkPassphrase,
historyURLs: config.HistoryArchiveURLs,
httpPort: config.HTTPPort,
peerPort: config.PeerPort,
mode: mode,
ctx: ctx,
cancel: cancel,
storagePath: fullStoragePath,
executablePath: config.BinaryPath,
ctx: ctx,
cancel: cancel,
storagePath: fullStoragePath,
nonce: fmt.Sprintf(
"captive-stellar-core-%x",
rand.New(rand.NewSource(time.Now().UnixNano())).Uint64(),
),
log: config.Log,
}

if err := runner.writeConf(); err != nil {
if conf, err := writeConf(config.Toml, mode, runner.getConfFileName()); err != nil {
return nil, errors.Wrap(err, "error writing configuration")
} else {
runner.log.Debugf("captive core config file contents:\n%s", conf)
}

return runner, nil
}

func (r *stellarCoreRunner) generateConfig() (string, error) {
if r.mode == stellarCoreRunnerModeOnline && r.configAppendPath == "" {
return "", errors.New("stellar-core append config file path cannot be empty in online mode")
}

lines := []string{
"# Generated file -- do not edit",
"NODE_IS_VALIDATOR=false",
"DISABLE_XDR_FSYNC=true",
fmt.Sprintf(`NETWORK_PASSPHRASE="%s"`, r.networkPassphrase),
// Note: We don't pass BUCKET_DIR_PATH here because it's created
// *relative* to the storage path (i.e. where the config lives).
fmt.Sprintf(`HTTP_PORT=%d`, r.httpPort),
fmt.Sprintf(`LOG_FILE_PATH="%s"`, r.logPath),
}

if r.peerPort > 0 {
lines = append(lines, fmt.Sprintf(`PEER_PORT=%d`, r.peerPort))
func writeConf(captiveCoreToml *CaptiveCoreToml, mode stellarCoreRunnerMode, location string) (string, error) {
text, err := generateConfig(captiveCoreToml, mode)
if err != nil {
return "", err
}

if r.mode == stellarCoreRunnerModeOffline {
// In offline mode, there is no need to connect to peers
lines = append(lines, "RUN_STANDALONE=true")
// We don't need consensus when catching up
lines = append(lines, "UNSAFE_QUORUM=true")
}
return string(text), ioutil.WriteFile(location, text, 0644)
}

if r.mode == stellarCoreRunnerModeOffline && r.configAppendPath == "" {
// Add a fictional quorum -- necessary to convince core to start up;
// but not used at all for our purposes. Pubkey here is just random.
lines = append(lines,
"[QUORUM_SET]",
"THRESHOLD_PERCENT=100",
`VALIDATORS=["GCZBOIAY4HLKAJVNJORXZOZRAY2BJDBZHKPBHZCRAIUR5IHC2UHBGCQR"]`)
func generateConfig(captiveCoreToml *CaptiveCoreToml, mode stellarCoreRunnerMode) ([]byte, error) {
if mode == stellarCoreRunnerModeOffline {
captiveCoreToml = captiveCoreToml.CatchupToml()
}

result := strings.ReplaceAll(strings.Join(lines, "\n"), `\`, `\\`) + "\n\n"
if r.configAppendPath != "" {
appendConfigContents, err := ioutil.ReadFile(r.configAppendPath)
if err != nil {
return "", errors.Wrap(err, "reading quorum config file")
}
result += string(appendConfigContents) + "\n\n"
if !captiveCoreToml.QuorumSetIsConfigured() {
return nil, errors.New("stellar-core append config file does not define any quorum set")
}

lines = []string{}
for i, val := range r.historyURLs {
lines = append(lines, fmt.Sprintf("[HISTORY.h%d]", i))
lines = append(lines, fmt.Sprintf(`get="curl -sf %s/{0} -o {1}"`, val))
text, err := captiveCoreToml.Marshall()
if err != nil {
return nil, errors.Wrap(err, "could not marshal captive core config")
}
result += strings.Join(lines, "\n")

return result, nil
return text, nil
}

func (r *stellarCoreRunner) getConfFileName() string {
Expand Down Expand Up @@ -255,17 +213,6 @@ func (r *stellarCoreRunner) getLogLineWriter() io.Writer {
return wr
}

// Makes the temp directory and writes the config file to it; called by the
// platform-specific captiveStellarCore.Start() methods.
func (r *stellarCoreRunner) writeConf() error {
conf, err := r.generateConfig()
if err != nil {
return err
}
r.log.Debugf("captive core config file contents:\n%s", conf)
return ioutil.WriteFile(r.getConfFileName(), []byte(conf), 0644)
}

func (r *stellarCoreRunner) createCmd(params ...string) *exec.Cmd {
allParams := append([]string{"--conf", r.getConfFileName()}, params...)
cmd := exec.CommandContext(r.ctx, r.executablePath, allParams...)
Expand Down
32 changes: 19 additions & 13 deletions ingest/ledgerbackend/stellar_core_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ import (
"github.com/stellar/go/support/log"
)

func newUint(v uint) *uint {
p := new(uint)
*p = v
return p
}

func TestGenerateConfig(t *testing.T) {
for _, testCase := range []struct {
name string
Expand Down Expand Up @@ -39,36 +45,36 @@ func TestGenerateConfig(t *testing.T) {
},
} {
t.Run(testCase.name, func(t *testing.T) {
stellarCoreRunner, err := newStellarCoreRunner(CaptiveCoreConfig{
HTTPPort: 6789,
HistoryArchiveURLs: []string{"http://localhost:1170"},
Log: log.New(),
ConfigAppendPath: testCase.appendPath,
StoragePath: "./test-temp-dir",
PeerPort: 12345,
Context: context.Background(),
captiveCoreToml, err := NewCaptiveCoreToml(CaptiveCoreTomlParams{
ConfigPath: testCase.appendPath,
Strict: true,
NetworkPassphrase: "Public Global Stellar Network ; September 2015",
}, testCase.mode)
HistoryArchiveURLs: []string{"http://localhost:1170"},
HTTPPort: newUint(6789),
PeerPort: newUint(12345),
})
assert.NoError(t, err)

config, err := stellarCoreRunner.generateConfig()
configBytes, err := generateConfig(captiveCoreToml, testCase.mode)
assert.NoError(t, err)

expectedByte, err := ioutil.ReadFile(testCase.expectedPath)
assert.NoError(t, err)

assert.Equal(t, config, string(expectedByte))

assert.NoError(t, stellarCoreRunner.close())
assert.Equal(t, string(configBytes), string(expectedByte))
})
}
}

func TestCloseBeforeStart(t *testing.T) {
captiveCoreToml, err := NewCaptiveCoreToml(CaptiveCoreTomlParams{})
assert.NoError(t, err)

runner, err := newStellarCoreRunner(CaptiveCoreConfig{
HistoryArchiveURLs: []string{"http://localhost"},
Log: log.New(),
Context: context.Background(),
Toml: captiveCoreToml,
}, stellarCoreRunnerModeOffline)
assert.NoError(t, err)

Expand Down
29 changes: 16 additions & 13 deletions ingest/ledgerbackend/testdata/expected-offline-core.cfg
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
# Generated file -- do not edit
NODE_IS_VALIDATOR=false
DISABLE_XDR_FSYNC=true
NETWORK_PASSPHRASE="Public Global Stellar Network ; September 2015"
HTTP_PORT=6789
LOG_FILE_PATH=""
PEER_PORT=12345
RUN_STANDALONE=true
UNSAFE_QUORUM=true
[QUORUM_SET]
THRESHOLD_PERCENT=100
VALIDATORS=["GCZBOIAY4HLKAJVNJORXZOZRAY2BJDBZHKPBHZCRAIUR5IHC2UHBGCQR"]
LOG_FILE_PATH = ""
HTTP_PORT = 0
NETWORK_PASSPHRASE = "Public Global Stellar Network ; September 2015"
PEER_PORT = 12345
FAILURE_SAFETY = 0
UNSAFE_QUORUM = true
RUN_STANDALONE = true
DISABLE_XDR_FSYNC = true

[HISTORY]
[HISTORY.h0]
get = "curl -sf http://localhost:1170/{0} -o {1}"

[HISTORY.h0]
get="curl -sf http://localhost:1170/{0} -o {1}"
[QUORUM_SET]
[QUORUM_SET.generated]
THRESHOLD_PERCENT = 100
VALIDATORS = ["GCZBOIAY4HLKAJVNJORXZOZRAY2BJDBZHKPBHZCRAIUR5IHC2UHBGCQR"]
Loading

0 comments on commit e142246

Please sign in to comment.