Skip to content

Commit

Permalink
Merge branch 'master' into arnikola/address-nits
Browse files Browse the repository at this point in the history
  • Loading branch information
arnikola authored Dec 5, 2019
2 parents 8b7e574 + 54cdb87 commit d11df61
Show file tree
Hide file tree
Showing 24 changed files with 688 additions and 242 deletions.
19 changes: 7 additions & 12 deletions src/cmd/services/m3aggregator/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package main
import (
"flag"
"fmt"
"log"
"os"
"os/signal"
"syscall"
Expand All @@ -32,6 +33,7 @@ import (
"github.com/m3db/m3/src/cmd/services/m3aggregator/config"
"github.com/m3db/m3/src/cmd/services/m3aggregator/serve"
xconfig "github.com/m3db/m3/src/x/config"
"github.com/m3db/m3/src/x/config/configflag"
"github.com/m3db/m3/src/x/etcd"
"github.com/m3db/m3/src/x/instrument"

Expand All @@ -42,25 +44,18 @@ const (
gracefulShutdownTimeout = 15 * time.Second
)

var (
configFile = flag.String("f", "", "configuration file")
)

func main() {
flag.Parse()
var cfgOpts configflag.Options
cfgOpts.Register()

if len(*configFile) == 0 {
flag.Usage()
os.Exit(1)
}
flag.Parse()

// Set globals for etcd related packages.
etcd.SetGlobals()

var cfg config.Configuration
if err := xconfig.LoadFile(&cfg, *configFile, xconfig.Options{}); err != nil {
fmt.Printf("error loading config file: %v\n", err)
os.Exit(1)
if err := cfgOpts.MainLoad(&cfg, xconfig.Options{}); err != nil {
log.Fatal(err.Error())
}

// Create logger and metrics scope.
Expand Down
20 changes: 11 additions & 9 deletions src/cmd/services/m3collector/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,31 @@ package main

import (
"flag"
"log"
_ "net/http/pprof" // pprof: for debug listen server if configured
"os"

"github.com/m3db/m3/src/cmd/services/m3collector/config"
"github.com/m3db/m3/src/collector/server"
xconfig "github.com/m3db/m3/src/x/config"
"github.com/m3db/m3/src/x/config/configflag"
"github.com/m3db/m3/src/x/etcd"
)

var (
configFile = flag.String("f", "", "configuration file")
)

func main() {
var configOpts configflag.Options
configOpts.Register()

flag.Parse()

if len(*configFile) == 0 {
flag.Usage()
os.Exit(1)
var cfg config.Configuration
if err := configOpts.MainLoad(&cfg, xconfig.Options{}); err != nil {
log.Fatal(err.Error())
}

// Set globals for etcd related packages.
etcd.SetGlobals()

server.Run(server.RunOptions{
ConfigFile: *configFile,
Config: cfg,
})
}
22 changes: 12 additions & 10 deletions src/cmd/services/m3coordinator/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,31 @@ package main

import (
"flag"
"log"
_ "net/http/pprof" // pprof: for debug listen server if configured
"os"

"github.com/m3db/m3/src/cmd/services/m3query/config"
"github.com/m3db/m3/src/query/server"
xconfig "github.com/m3db/m3/src/x/config"
"github.com/m3db/m3/src/x/config/configflag"
"github.com/m3db/m3/src/x/etcd"
)

var configFiles xconfig.FlagStringSlice

func main() {
flag.Var(&configFiles, "f", "configuration file(s)")
flag.Parse()
var cfgOpts configflag.Options
cfgOpts.Register()

if len(configFiles) == 0 || len(configFiles[0]) == 0 {
flag.Usage()
os.Exit(1)
}
flag.Parse()

// Set globals for etcd related packages.
etcd.SetGlobals()

var cfg config.Configuration
if err := cfgOpts.MainLoad(&cfg, xconfig.Options{}); err != nil {
log.Fatal(err.Error())
}

server.Run(server.RunOptions{
ConfigFiles: configFiles,
Config: cfg,
})
}
19 changes: 10 additions & 9 deletions src/cmd/services/m3ctl/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package main
import (
"flag"
"fmt"
"log"
"os"
"os/signal"
"strconv"
Expand All @@ -36,6 +37,7 @@ import (
"github.com/m3db/m3/src/ctl/service/r2"
"github.com/m3db/m3/src/x/clock"
xconfig "github.com/m3db/m3/src/x/config"
"github.com/m3db/m3/src/x/config/configflag"
"github.com/m3db/m3/src/x/etcd"
"github.com/m3db/m3/src/x/instrument"
)
Expand All @@ -47,21 +49,20 @@ const (
)

func main() {
configFile := flag.String("f", "m3ctl.yml", "configuration file")
flag.Parse()

if len(*configFile) == 0 {
flag.Usage()
os.Exit(1)
configOpts := configflag.Options{
ConfigFiles: configflag.FlagStringSlice{Value: []string{"m3ctl.yml"}},
}

configOpts.Register()

flag.Parse()

// Set globals for etcd related packages.
etcd.SetGlobals()

var cfg config.Configuration
if err := xconfig.LoadFile(&cfg, *configFile, xconfig.Options{}); err != nil {
fmt.Printf("error loading config file: %v\n", err)
os.Exit(1)
if err := configOpts.MainLoad(&cfg, xconfig.Options{}); err != nil {
log.Fatal(err.Error())
}

rawLogger, err := cfg.Logging.BuildLogger()
Expand Down
19 changes: 7 additions & 12 deletions src/cmd/services/m3dbnode/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package main
import (
"flag"
"fmt"
"log"
_ "net/http/pprof" // pprof: for debug listen server if configured
"os"
"os/signal"
Expand All @@ -34,29 +35,23 @@ import (
dbserver "github.com/m3db/m3/src/dbnode/server"
coordinatorserver "github.com/m3db/m3/src/query/server"
xconfig "github.com/m3db/m3/src/x/config"
"github.com/m3db/m3/src/x/config/configflag"
"github.com/m3db/m3/src/x/etcd"
xos "github.com/m3db/m3/src/x/os"
)

var (
configFile = flag.String("f", "", "configuration file")
)

func main() {
flag.Parse()
var cfgOpts configflag.Options
cfgOpts.Register()

if len(*configFile) == 0 {
flag.Usage()
os.Exit(1)
}
flag.Parse()

// Set globals for etcd related packages.
etcd.SetGlobals()

var cfg config.Configuration
if err := xconfig.LoadFile(&cfg, *configFile, xconfig.Options{}); err != nil {
fmt.Fprintf(os.Stderr, "unable to load config from %s: %v\n", *configFile, err)
os.Exit(1)
if err := cfgOpts.MainLoad(&cfg, xconfig.Options{}); err != nil {
log.Fatal(err.Error())
}

if err := cfg.InitDefaultsAndValidate(); err != nil {
Expand Down
21 changes: 11 additions & 10 deletions src/cmd/services/m3query/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,30 @@ package main

import (
"flag"
"log"
_ "net/http/pprof" // pprof: for debug listen server if configured
"os"

"github.com/m3db/m3/src/cmd/services/m3query/config"
"github.com/m3db/m3/src/query/server"
xconfig "github.com/m3db/m3/src/x/config"
"github.com/m3db/m3/src/x/config/configflag"
"github.com/m3db/m3/src/x/etcd"
)

var configFiles xconfig.FlagStringSlice

func main() {
flag.Var(&configFiles, "f", "configuration file(s)")
flag.Parse()
var configOpts configflag.Options
configOpts.Register()

if len(configFiles) == 0 || len(configFiles[0]) == 0 {
flag.Usage()
os.Exit(1)
}
flag.Parse()

// Set globals for etcd related packages.
etcd.SetGlobals()

var cfg config.Configuration
if err := configOpts.MainLoad(&cfg, xconfig.Options{}); err != nil {
log.Fatal(err.Error())
}
server.Run(server.RunOptions{
ConfigFiles: configFiles,
Config: cfg,
})
}
16 changes: 2 additions & 14 deletions src/collector/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,7 @@ import (
// RunOptions provides options for running the server
// with backwards compatibility if only solely adding fields.
type RunOptions struct {
// ConfigFile is the config file to use.
ConfigFile string

// Config is an alternate way to provide configuration and will be used
// instead of parsing ConfigFile if ConfigFile is not specified.
// Config will be used to configure the application.
Config config.Configuration

// InterruptCh is a programmatic interrupt channel to supply to
Expand All @@ -58,15 +54,7 @@ type RunOptions struct {

// Run runs the server programmatically given a filename for the configuration file.
func Run(runOpts RunOptions) {
var cfg config.Configuration
if runOpts.ConfigFile != "" {
if err := xconfig.LoadFile(&cfg, runOpts.ConfigFile, xconfig.Options{}); err != nil {
fmt.Fprintf(os.Stderr, "unable to load %s: %v", runOpts.ConfigFile, err)
os.Exit(1)
}
} else {
cfg = runOpts.Config
}
cfg := runOpts.Config

ctx := context.Background()
logger, err := cfg.Logging.Build()
Expand Down
4 changes: 3 additions & 1 deletion src/dbnode/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,10 @@ func (c Configuration) NewAdminClient(
size = *c.AsyncWriteWorkerPoolSize
}

workerPoolInstrumentOpts := iopts.SetMetricsScope(writeRequestScope.SubScope("workerpool"))
workerPoolOpts := xsync.NewPooledWorkerPoolOptions().
SetGrowOnDemand(true)
SetGrowOnDemand(true).
SetInstrumentOptions(workerPoolInstrumentOpts)
workerPool, err := xsync.NewPooledWorkerPool(size, workerPoolOpts)
if err != nil {
return nil, fmt.Errorf("unable to create async worker pool: %v", err)
Expand Down
12 changes: 6 additions & 6 deletions src/dbnode/persist/fs/read_write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,12 +494,6 @@ func TestWriterOnlyWritesNonNilBytes(t *testing.T) {
filePathPrefix := filepath.Join(dir, "")
defer os.RemoveAll(dir)

checkedBytes := func(b []byte) checked.Bytes {
r := checked.NewBytes(b, nil)
r.IncRef()
return r
}

w := newTestWriter(t, filePathPrefix)
writerOpts := DataWriterOpenOptions{
BlockSize: testBlockSize,
Expand All @@ -526,3 +520,9 @@ func TestWriterOnlyWritesNonNilBytes(t *testing.T) {
{"foo", nil, []byte{1, 2, 3, 4, 5, 6}},
})
}

func checkedBytes(b []byte) checked.Bytes {
r := checked.NewBytes(b, nil)
r.IncRef()
return r
}
26 changes: 17 additions & 9 deletions src/dbnode/persist/fs/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,15 +152,7 @@ func (w *writer) Open(opts DataWriterOpenOptions) error {
blockStart = opts.Identifier.BlockStart
volumeIndex = opts.Identifier.VolumeIndex
)

w.blockSize = opts.BlockSize
w.start = blockStart
w.volumeIndex = volumeIndex
w.snapshotTime = opts.Snapshot.SnapshotTime
w.snapshotID = opts.Snapshot.SnapshotID
w.currIdx = 0
w.currOffset = 0
w.err = nil
w.reset(opts)

var (
shardDir string
Expand Down Expand Up @@ -229,6 +221,22 @@ func (w *writer) Open(opts DataWriterOpenOptions) error {
return nil
}

func (w *writer) reset(opts DataWriterOpenOptions) {
w.blockSize = opts.BlockSize
w.start = opts.Identifier.BlockStart
w.volumeIndex = opts.Identifier.VolumeIndex
w.snapshotTime = opts.Snapshot.SnapshotTime
w.snapshotID = opts.Snapshot.SnapshotID
w.currIdx = 0
w.currOffset = 0
w.err = nil
// This happens after writing the previous set of files index files, however, do it
// again to ensure they get cleared even if there was a premature error writing out the
// previous set of files which would have prevented them from being cleared.
w.indexEntries.releaseRefs()
w.indexEntries = w.indexEntries[:0]
}

func (w *writer) writeData(data []byte) error {
if len(data) == 0 {
return nil
Expand Down
Loading

0 comments on commit d11df61

Please sign in to comment.