diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 62c2b8c47cbe..1df0396344dd 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -63,6 +63,7 @@ https://github.com/elastic/beats/compare/v6.6.0...6.x[Check the HEAD diff] - Gracefully handle TLS options when enrolling a Beat. {issue}9129[9129] - The backing off now implements jitter to better distribute the load. {issue}10172[10172] - Fix TLS certificate DoS vulnerability. {pull}10303[10303] +- Fix panic and file unlock in spool on atomic operation (arm, x86-32). File lock was not released when panic occurs, leading to the beat deadlocking on startup. {pull}10289[10289] *Auditbeat* diff --git a/NOTICE.txt b/NOTICE.txt index cb82b0d8f8ba..ad796f1283cd 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -556,8 +556,8 @@ Elasticsearch, B.V. (https://www.elastic.co/). -------------------------------------------------------------------- Dependency: github.com/elastic/go-txfile -Version: v0.0.3 -Revision: 389b527ad365f6fc6cf5fa7e0ba5a2294ad2f3ed +Version: v0.0.6 +Revision: fa3d87c14381c01aaa1c7901877fb5e483294964 License type (autodetected): Apache-2.0 ./vendor/github.com/elastic/go-txfile/LICENSE: -------------------------------------------------------------------- diff --git a/libbeat/publisher/queue/spool/inbroker.go b/libbeat/publisher/queue/spool/inbroker.go index 98354149f908..784880ea847e 100644 --- a/libbeat/publisher/queue/spool/inbroker.go +++ b/libbeat/publisher/queue/spool/inbroker.go @@ -127,6 +127,9 @@ func (b *inBroker) Producer(cfg queue.ProducerConfig) queue.Producer { // run in the same go-routine as the Flush was executed from. // Only the (*inBroker).eventLoop triggers a flush. func (b *inBroker) onFlush(n uint) { + log := b.ctx.logger + log.Debug("inbroker: onFlush ", n) + if n == 0 { return } @@ -379,7 +382,7 @@ func (b *inBroker) stateWithTimer() bool { b.handleCancel(&req) case <-b.timer.C: - log.Debug("inbroker (stateWithTimer): flush timeout") + log.Debug("inbroker (stateWithTimer): flush timeout", b.bufferedEvents) b.timer.Stop(true) @@ -394,7 +397,7 @@ func (b *inBroker) stateWithTimer() bool { if b.bufferedEvents > 0 { // flush did not push all events? Restart timer. - log.Debug(" inbroker (stateWithTimer): start flush timer") + log.Debug(" inbroker (stateWithTimer): start flush timer", b.bufferedEvents) b.timer.Start() break } diff --git a/libbeat/publisher/queue/spool/outbroker.go b/libbeat/publisher/queue/spool/outbroker.go index f2fb878eaa04..e29a2b4edbb7 100644 --- a/libbeat/publisher/queue/spool/outbroker.go +++ b/libbeat/publisher/queue/spool/outbroker.go @@ -83,7 +83,18 @@ var errRetry = errors.New("retry") func newOutBroker(ctx *spoolCtx, qu *pq.Queue, flushTimeout time.Duration) (*outBroker, error) { reader := qu.Reader() - avail, err := reader.Available() + + var ( + avail uint + err error + ) + func() { + if err = reader.Begin(); err != nil { + return + } + defer reader.Done() + avail, err = reader.Available() + }() if err != nil { return nil, err } diff --git a/vendor/github.com/elastic/go-txfile/CHANGELOG.md b/vendor/github.com/elastic/go-txfile/CHANGELOG.md index 0799e3261aff..2a060d4a6c05 100644 --- a/vendor/github.com/elastic/go-txfile/CHANGELOG.md +++ b/vendor/github.com/elastic/go-txfile/CHANGELOG.md @@ -14,6 +14,26 @@ This project adheres to [Semantic Versioning](http://semver.org/). ### Fixed +## [0.0.6] + +### Fixed +- Fix flush callback not being executed on success. PR #34 + +## [0.0.5] + +### Fixed +- Panic on atomic operation (arm, x86-32) and File lock not released when panic occurs. PR #31 + +## [0.0.4] + +### Added +- Added `Observer` to txfile for collecting per transaction metrics. PR #23 +- Make file syncing configurable. PR #29 +- Added `Observer` to pq package for collecting operational metrics. PR #26 + +### Changed +- Queue reader requires explicit transaction start/stop calls. PR #27 + ## [0.0.3] ### Fixed @@ -42,6 +62,9 @@ This project adheres to [Semantic Versioning](http://semver.org/). - Windows Fix: Max mmaped area must not exceed actual file size on windows. PR #11 -[Unreleased]: https://github.com/elastic/go-txfile/compare/v0.0.3...HEAD +[Unreleased]: https://github.com/elastic/go-txfile/compare/v0.0.6...HEAD +[0.0.6]: https://github.com/elastic/go-txfile/compare/v0.0.5...v0.0.6 +[0.0.5]: https://github.com/elastic/go-txfile/compare/v0.0.4...v0.0.5 +[0.0.4]: https://github.com/elastic/go-txfile/compare/v0.0.3...v0.0.4 [0.0.3]: https://github.com/elastic/go-txfile/compare/v0.0.2...v0.0.3 [0.0.2]: https://github.com/elastic/go-txfile/compare/v0.0.1...v0.0.2 diff --git a/vendor/github.com/elastic/go-txfile/alloc.go b/vendor/github.com/elastic/go-txfile/alloc.go index 0fa74a26786d..3d1be0950d2a 100644 --- a/vendor/github.com/elastic/go-txfile/alloc.go +++ b/vendor/github.com/elastic/go-txfile/alloc.go @@ -89,6 +89,7 @@ type ( data txAllocArea meta txAllocArea options txAllocOptions // per transaction allocation options + stats txAllocStats } txAllocArea struct { @@ -107,6 +108,17 @@ type ( overflowAreaEnabled bool // enable allocating pages with ID > maxPages for metadata metaGrowPercentage int // limit of meta area in use, so to allocate new pages into the meta area } + + txAllocStats struct { + data txAllocAreaStats + meta txAllocAreaStats + overflow txAllocAreaStats // overflow region allocations/frees + toMeta uint // number of pages moved from data area to meta area + } + + txAllocAreaStats struct { + alloc, freed uint + } ) // allocCommitState keeps track of the new allocator state during the commit. @@ -164,9 +176,6 @@ func (a *allocator) fileCommitPrepare( ) { st.tx = tx st.updated = forceUpdate || tx.Updated() - if st.updated { - a.MetaAllocator().FreeRegions(tx, a.freelistPages) - } } func (a *allocator) fileCommitAlloc(st *allocCommitState) reason { @@ -222,6 +231,8 @@ func (a *allocator) fileCommitAlloc(st *allocCommitState) reason { // Remove pages from end of overflow area from meta freelist + adjust end marker st.metaList, st.overflowFreed = releaseOverflowPages(newMetaList, a.maxPages, metaEndMarker) if st.overflowFreed > 0 { + st.tx.stats.overflow.freed += st.overflowFreed + newEnd := metaEndMarker - PageID(st.overflowFreed) if metaEndMarker > dataEndMarker { // shrink overflow area, which was allocated from data area dataEndMarker = newEnd @@ -373,6 +384,21 @@ func (a *allocArea) rollback(st *txAllocArea) { // metaManager // ----------- +func (mm *metaManager) onGrow(st *txAllocState, n uint, overflow bool) { + if overflow { + st.stats.overflow.alloc += n + } + st.stats.toMeta += n +} + +func (mm *metaManager) onAlloc(st *txAllocState, n uint) { + st.stats.meta.alloc++ +} + +func (mm *metaManager) onFree(st *txAllocState, n uint) { + st.stats.meta.freed++ +} + func (mm *metaManager) DataAllocator() *dataAllocator { return (*dataAllocator)(mm) } @@ -449,9 +475,7 @@ func (mm *metaManager) tryGrow( } da.AllocRegionsWith(st, avail, func(reg region) { - st.manager.moveToMeta.Add(reg) - mm.metaTotal += uint(reg.count) - mm.meta.freelist.AddRegion(reg) + mm.transferToMeta(st, reg) }) // allocate from overflow area @@ -461,7 +485,9 @@ func (mm *metaManager) tryGrow( } allocFromArea(&st.meta, &mm.meta.endMarker, required, func(reg region) { // st.manager.fromOverflow.Add(reg) - mm.metaTotal += uint(reg.count) + n := uint(reg.count) + mm.onGrow(st, n, true) + mm.metaTotal += n mm.meta.freelist.AddRegion(reg) }) if mm.maxPages == 0 && mm.data.endMarker < mm.meta.endMarker { @@ -474,23 +500,28 @@ func (mm *metaManager) tryGrow( // Enough memory available in data area. Try to allocate continuous region first reg := da.AllocContinuousRegion(st, count) if reg.id != 0 { - st.manager.moveToMeta.Add(reg) - mm.metaTotal += uint(reg.count) - mm.meta.freelist.AddRegion(reg) + mm.transferToMeta(st, reg) return true } // no continuous memory block -> allocate single regions n := da.AllocRegionsWith(st, count, func(reg region) { - st.manager.moveToMeta.Add(reg) - mm.metaTotal += uint(reg.count) - mm.meta.freelist.AddRegion(reg) + mm.transferToMeta(st, reg) }) return n == count } +func (mm *metaManager) transferToMeta(st *txAllocState, reg region) { + n := uint(reg.count) + st.manager.moveToMeta.Add(reg) + mm.onGrow(st, n, false) + mm.metaTotal += uint(reg.count) + mm.meta.freelist.AddRegion(reg) +} + func (mm *metaManager) Free(st *txAllocState, id PageID) { // mark page as freed for now + mm.onFree(st, 1) st.meta.freed.Add(id) } @@ -540,6 +571,14 @@ func (a *dataAllocator) Avail(_ *txAllocState) uint { return avail } +func (a *dataAllocator) onAlloc(st *txAllocState, n uint) { + st.stats.data.alloc += n +} + +func (a *dataAllocator) onFree(st *txAllocState, n uint) { + st.stats.data.freed += n +} + func (a *dataAllocator) AllocContinuousRegion( st *txAllocState, n uint, @@ -551,6 +590,7 @@ func (a *dataAllocator) AllocContinuousRegion( reg := allocContFromFreelist(&a.data.freelist, &st.data, allocFromBeginning, n) if reg.id != 0 { + a.onAlloc(st, n) return reg } @@ -564,6 +604,8 @@ func (a *dataAllocator) AllocContinuousRegion( if a.meta.endMarker < a.data.endMarker { a.meta.endMarker = a.data.endMarker } + + a.onAlloc(st, n) return reg } @@ -589,6 +631,8 @@ func (a *dataAllocator) AllocRegionsWith( a.meta.endMarker = a.data.endMarker } } + + a.onAlloc(st, count) return count } @@ -599,6 +643,8 @@ func (a *dataAllocator) Free(st *txAllocState, id PageID) { panic(fmt.Sprintf("freed page ID %v out of bounds", id)) } + a.onFree(st, 1) + if !st.data.new.Has(id) { // fast-path, page has not been allocated in current transaction st.data.freed.Add(id) @@ -656,6 +702,8 @@ func (a *walAllocator) Alloc(st *txAllocState) PageID { if reg.id == 0 { return 0 } + + mm.onAlloc(st, 1) st.meta.allocated.Add(reg.id) return reg.id } @@ -666,7 +714,9 @@ func (a *walAllocator) AllocRegionsWith(st *txAllocState, n uint, fn func(region return 0 } - return allocFromFreelist(&a.meta.freelist, &st.meta, allocFromBeginning, n, fn) + count := allocFromFreelist(&a.meta.freelist, &st.meta, allocFromBeginning, n, fn) + mm.onAlloc(st, count) + return count } func (a *walAllocator) Free(st *txAllocState, id PageID) { @@ -692,7 +742,9 @@ func (a *metaAllocator) AllocRegionsWith( return 0 } - return allocFromFreelist(&a.meta.freelist, &st.meta, allocFromEnd, n, fn) + count := allocFromFreelist(&a.meta.freelist, &st.meta, allocFromEnd, n, fn) + mm.onAlloc(st, count) + return count } func (a *metaAllocator) AllocRegions(st *txAllocState, n uint) regionList { diff --git a/vendor/github.com/elastic/go-txfile/dev-tools/lib/mage/gotool/go.go b/vendor/github.com/elastic/go-txfile/dev-tools/lib/mage/gotool/go.go new file mode 100644 index 000000000000..ed82eb745aed --- /dev/null +++ b/vendor/github.com/elastic/go-txfile/dev-tools/lib/mage/gotool/go.go @@ -0,0 +1,255 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package gotool + +import ( + "os" + "strings" + + "github.com/magefile/mage/mg" + "github.com/magefile/mage/sh" +) + +// Args holds parameters, environment variables and flag information used to +// pass to the go tool. +type Args struct { + extra map[string]string // extra flags one can pass to the command + env map[string]string + flags map[string]string + pos []string +} + +// ArgOpt is a functional option adding info to Args once executed. +type ArgOpt func(args *Args) + +type goTest func(opts ...ArgOpt) error + +// Test runs `go test` and provides optionals for adding command line arguments. +var Test goTest = runGoTest + +// ListProjectPackages lists all packages in the current project +func ListProjectPackages() ([]string, error) { + return ListPackages("./...") +} + +// ListPackages calls `go list` for every package spec given. +func ListPackages(pkgs ...string) ([]string, error) { + return getLines(callGo(nil, "list", pkgs...)) +} + +// ListTestFiles lists all go and cgo test files available in a package. +func ListTestFiles(pkg string) ([]string, error) { + const tmpl = `{{ range .TestGoFiles }}{{ printf "%s\n" . }}{{ end }}` + + `{{ range .XTestGoFiles }}{{ printf "%s\n" . }}{{ end }}` + + return getLines(callGo(nil, "list", "-f", tmpl, pkg)) +} + +// HasTests returns true if the given package contains test files. +func HasTests(pkg string) (bool, error) { + files, err := ListTestFiles(pkg) + if err != nil { + return false, err + } + return len(files) > 0, nil +} + +func (goTest) WithCoverage(to string) ArgOpt { + return combine(flagArg("-cover", ""), flagArgIf("-test.coverprofile", to)) +} +func (goTest) Short(b bool) ArgOpt { return flagBoolIf("-test.short", b) } +func (goTest) Use(bin string) ArgOpt { return extraArgIf("use", bin) } +func (goTest) OS(os string) ArgOpt { return envArgIf("GOOS", os) } +func (goTest) ARCH(arch string) ArgOpt { return envArgIf("GOARCH", arch) } +func (goTest) Create() ArgOpt { return flagArg("-c", "") } +func (goTest) Out(path string) ArgOpt { return flagArg("-o", path) } +func (goTest) Package(path string) ArgOpt { return posArg(path) } +func (goTest) Verbose() ArgOpt { return flagArg("-test.v", "") } +func runGoTest(opts ...ArgOpt) error { + args := buildArgs(opts) + if bin := args.Val("use"); bin != "" { + flags := map[string]string{} + for k, v := range args.flags { + if strings.HasPrefix(k, "-test.") { + flags[k] = v + } + } + + useArgs := &Args{} + *useArgs = *args + useArgs.flags = flags + + _, err := sh.Exec(useArgs.env, os.Stdout, os.Stderr, bin, useArgs.build()...) + return err + } + + return runVGo("test", args) +} + +func getLines(out string, err error) ([]string, error) { + if err != nil { + return nil, err + } + + lines := strings.Split(out, "\n") + res := lines[:0] + for _, line := range lines { + line = strings.TrimSpace(line) + if len(line) > 0 { + res = append(res, line) + } + } + + return res, nil +} + +func callGo(env map[string]string, cmd string, opts ...string) (string, error) { + args := []string{cmd} + args = append(args, opts...) + return sh.OutputWith(env, mg.GoCmd(), args...) +} + +func runVGo(cmd string, args *Args) error { + return execGoWith(func(env map[string]string, cmd string, args ...string) error { + _, err := sh.Exec(env, os.Stdout, os.Stderr, cmd, args...) + return err + }, cmd, args) +} + +func runGo(cmd string, args *Args) error { + return execGoWith(sh.RunWith, cmd, args) +} + +func execGoWith( + fn func(map[string]string, string, ...string) error, + cmd string, args *Args, +) error { + cliArgs := []string{cmd} + cliArgs = append(cliArgs, args.build()...) + return fn(args.env, mg.GoCmd(), cliArgs...) +} + +func posArg(value string) ArgOpt { + return func(a *Args) { a.Add(value) } +} + +func extraArg(k, v string) ArgOpt { + return func(a *Args) { a.Extra(k, v) } +} + +func extraArgIf(k, v string) ArgOpt { + if v == "" { + return nil + } + return extraArg(k, v) +} + +func envArg(k, v string) ArgOpt { + return func(a *Args) { a.Env(k, v) } +} + +func envArgIf(k, v string) ArgOpt { + if v == "" { + return nil + } + return envArg(k, v) +} + +func flagArg(flag, value string) ArgOpt { + return func(a *Args) { a.Flag(flag, value) } +} + +func flagArgIf(flag, value string) ArgOpt { + if value == "" { + return nil + } + return flagArg(flag, value) +} + +func flagBoolIf(flag string, b bool) ArgOpt { + if b { + return flagArg(flag, "") + } + return nil +} + +func combine(opts ...ArgOpt) ArgOpt { + return func(a *Args) { + for _, opt := range opts { + if opt != nil { + opt(a) + } + } + } +} + +func buildArgs(opts []ArgOpt) *Args { + a := &Args{} + combine(opts...)(a) + return a +} + +// Extra sets a special k/v pair to be interpreted by the execution function. +func (a *Args) Extra(k, v string) { + if a.extra == nil { + a.extra = map[string]string{} + } + a.extra[k] = v +} + +// Val returns a special functions value for a given key. +func (a *Args) Val(k string) string { + if a.extra == nil { + return "" + } + return a.extra[k] +} + +// Env sets an environmant variable to be passed to the child process on exec. +func (a *Args) Env(k, v string) { + if a.env == nil { + a.env = map[string]string{} + } + a.env[k] = v +} + +// Flag adds a flag to be passed to the child process on exec. +func (a *Args) Flag(flag, value string) { + if a.flags == nil { + a.flags = map[string]string{} + } + a.flags[flag] = value +} + +// Add adds a positional argument to be passed to the child process on exec. +func (a *Args) Add(p string) { + a.pos = append(a.pos, p) +} + +func (a *Args) build() []string { + args := make([]string, 0, 2*len(a.flags)+len(a.pos)) + for k, v := range a.flags { + args = append(args, k) + if v != "" { + args = append(args, v) + } + } + + args = append(args, a.pos...) + return args +} diff --git a/vendor/github.com/elastic/go-txfile/dev-tools/lib/mage/mgenv/mgenv.go b/vendor/github.com/elastic/go-txfile/dev-tools/lib/mage/mgenv/mgenv.go new file mode 100644 index 000000000000..b065a7d62ef8 --- /dev/null +++ b/vendor/github.com/elastic/go-txfile/dev-tools/lib/mage/mgenv/mgenv.go @@ -0,0 +1,102 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package mgenv + +import ( + "fmt" + "os" + "sort" + "strconv" +) + +// Var holds an environment variables name, default value and doc string. +type Var struct { + name string + other string + doc string +} + +var envVars = map[string]Var{} +var envKeys []string + +func makeVar(name, other, doc string) Var { + if v, exists := envVars[name]; exists { + return v + } + + v := Var{name, other, doc} + envVars[name] = v + envKeys = append(envKeys, name) + sort.Strings(envKeys) + return v +} + +// MakeEnv builds a dictionary of defined environment variables, such that +// these can be passed to other processes (e.g. providers) +func MakeEnv() map[string]string { + m := make(map[string]string, len(envVars)) + for k, v := range envVars { + m[k] = v.Get() + } + return m +} + +// Keys returns the keys of registered environment variables. The keys returned +// are sorted. +// Note: The returned slice must not be changed or appended to. +func Keys() []string { + return envKeys +} + +// Find returns a registered Var by name. +func Find(name string) (Var, bool) { + v, ok := envVars[name] + return v, ok +} + +// String registers an environment variable and reads the current contents. +func String(name, other, doc string) string { + v := makeVar(name, other, doc) + return v.Get() +} + +// Bool registers an environment variable and interprets the current variable as bool. +func Bool(name string, other bool, doc string) bool { + v := makeVar(name, fmt.Sprint(other), doc) + b, err := strconv.ParseBool(v.Get()) + return err == nil && b +} + +// Name returns the environment variables name +func (v Var) Name() string { return v.name } + +// Default returns the environment variables default value as string. +func (v Var) Default() string { return v.other } + +// Doc returns the doc-string. +func (v Var) Doc() string { return v.doc } + +// Get reads an environment variable. Get returns the default value if the +// variable is not present or empty. +func (v Var) Get() string { + val := os.Getenv(v.name) + if val == "" { + return v.Default() + } + return val +} diff --git a/vendor/github.com/elastic/go-txfile/dev-tools/lib/mage/xbuild/docker.go b/vendor/github.com/elastic/go-txfile/dev-tools/lib/mage/xbuild/docker.go new file mode 100644 index 000000000000..9c4f8f1d667a --- /dev/null +++ b/vendor/github.com/elastic/go-txfile/dev-tools/lib/mage/xbuild/docker.go @@ -0,0 +1,74 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package xbuild + +import ( + "fmt" + + "github.com/magefile/mage/sh" +) + +// DockerImage provides based on downloadable docker images. +type DockerImage struct { + Image string + Workdir string + Volumes map[string]string + Env map[string]string +} + +// Build pulls the required image. +func (p DockerImage) Build() error { + return sh.Run("docker", "pull", p.Image) +} + +// Run executes the command in a temporary docker container. The container is +// deleted after its execution. +func (p DockerImage) Run(env map[string]string, cmdAndArgs ...string) error { + spec := []string{"run", "--rm", "-i", "-t"} + for k, v := range mergeEnv(p.Env, env) { + spec = append(spec, "-e", fmt.Sprintf("%v=%v", k, v)) + } + for k, v := range p.Volumes { + spec = append(spec, "-v", fmt.Sprintf("%v:%v", k, v)) + } + if w := p.Workdir; w != "" { + spec = append(spec, "-w", w) + } + + spec = append(spec, p.Image) + for _, v := range cmdAndArgs { + if v != "" { + spec = append(spec, v) + } + } + + return sh.RunV("docker", spec...) +} + +func mergeEnv(a, b map[string]string) map[string]string { + merged := make(map[string]string, len(a)+len(b)) + copyEnv(merged, a) + copyEnv(merged, b) + return merged +} + +func copyEnv(to, from map[string]string) { + for k, v := range from { + to[k] = v + } +} diff --git a/vendor/github.com/elastic/go-txfile/dev-tools/lib/mage/xbuild/xbuild.go b/vendor/github.com/elastic/go-txfile/dev-tools/lib/mage/xbuild/xbuild.go new file mode 100644 index 000000000000..d9699cc1fdc2 --- /dev/null +++ b/vendor/github.com/elastic/go-txfile/dev-tools/lib/mage/xbuild/xbuild.go @@ -0,0 +1,71 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package xbuild + +import ( + "fmt" + + "github.com/magefile/mage/mg" +) + +// Registry of available cross build environment providers. +type Registry struct { + table map[OSArch]Provider +} + +// Provider defines available functionality all cross build providers MUST implement. +type Provider interface { + // Build the environment + Build() error + + // Run command within environment. + Run(env map[string]string, cmdAndArgs ...string) error +} + +// OSArch tuple. +type OSArch struct { + OS string + Arch string +} + +// NewRegistry creates a new Regsitry. +func NewRegistry(tbl map[OSArch]Provider) *Registry { + return &Registry{tbl} +} + +// Find finds a provider by OS and Architecture name. +// Returns error if no provider can be found. +func (r *Registry) Find(os, arch string) (Provider, error) { + p := r.table[OSArch{os, arch}] + if p == nil { + return nil, fmt.Errorf("No provider for %v:%v defined", os, arch) + } + return p, nil +} + +// With calls fn with a provider matching the requires OS and ARCH. Returns +// and error if no provider can be found or function itself errors. +func (r *Registry) With(os, arch string, fn func(Provider) error) error { + p, err := r.Find(os, arch) + if err != nil { + return err + } + + mg.Deps(p.Build) + return fn(p) +} diff --git a/vendor/github.com/elastic/go-txfile/file.go b/vendor/github.com/elastic/go-txfile/file.go index e714df851056..65f3580f7ce8 100644 --- a/vendor/github.com/elastic/go-txfile/file.go +++ b/vendor/github.com/elastic/go-txfile/file.go @@ -36,10 +36,19 @@ import ( // pages of type PageSize. Pages within the file are only accessible by page IDs // from within active transactions. type File struct { - path string - readonly bool - file vfs.File - size int64 // real file size + // Atomic fields. + // Do not move: Must be 64bit-word aligned on some architectures. + txids uint64 + + observer Observer + + path string + readonly bool + file vfs.File + + size int64 // real file size (updated on mmap update only) + sizeEstimate int64 // estimated real file size based on last update and the total vs. used mmaped region + locks lock wg sync.WaitGroup // local async workers wait group writer writer @@ -53,7 +62,7 @@ type File struct { meta [2]*metaPage metaActive int - txids uint64 + stats FileStats } // internal contants @@ -66,6 +75,17 @@ const ( minRequiredFileSize = initSize ) +var maxMmapSize uint + +func init() { + if math.MaxUint32 == maxUint { + maxMmapSize = 2 * sz1GB + } else { + tmp := uint64(0x1FFFFFFFFFFF) + maxMmapSize = uint(tmp) + } +} + // Open opens or creates a new transactional file. // Open tries to create the file, if the file does not exist yet. Returns an // error if file access fails, file can not be locked or file meta pages are @@ -99,6 +119,8 @@ func Open(path string, mode os.FileMode, opts Options) (*File, error) { tracef("open file: %p (%v)\n", f, path) traceMetaPage(f.getMetaPage()) + + f.reportOpen() return f, nil } @@ -179,6 +201,7 @@ func newFile( maxSize: maxSize, pageSize: pageSize, }, + observer: opts.Observer, } f.locks.init() @@ -196,7 +219,7 @@ func newFile( "page limit not configured on allocator") // create asynchronous writer - f.writer.Init(file, f.allocator.pageSize) + f.writer.Init(file, f.allocator.pageSize, opts.Sync) f.wg.Add(1) go func() { defer f.wg.Done() @@ -226,6 +249,36 @@ func (f *File) init(metaActive int, opts Options) reason { return nil } +func (f *File) reportOpen() { + const numFileHeaders = 2 + + meta := f.getMetaPage() + fileEnd := uint(meta.dataEndMarker.Get()) + if m := uint(meta.metaEndMarker.Get()); m > fileEnd { + fileEnd = m + } + + metaArea := uint(meta.metaTotal.Get()) + metaInUse := metaArea - f.allocator.meta.freelist.Avail() + dataInUse := fileEnd - numFileHeaders - metaArea - f.allocator.data.freelist.Avail() + + f.stats = FileStats{ + Version: meta.version.Get(), + Size: uint64(f.size), + MaxSize: meta.maxSize.Get(), + PageSize: meta.pageSize.Get(), + MetaArea: metaArea, + DataAllocated: dataInUse, + MetaAllocated: metaInUse, + } + + o := f.observer + if o == nil { + return + } + o.OnOpen(f.stats) +} + // Close closes the file, after all transactions have been quit. After closing // a file, no more transactions can be started. func (f *File) Close() error { @@ -296,13 +349,25 @@ func (f *File) beginTx(settings TxOptions) (*Tx, reason) { } tracef("request new transaction (readonly: %v)\n", settings.Readonly) + + // Acquire transaction log. + // Unlock on panic, so applications will not be blocked in case they try to + // defer some close operations on the file. + ok := false lock := f.locks.TxLock(settings.Readonly) lock.Lock() - tracef("init new transaction (readonly: %v)\n", settings.Readonly) + defer cleanup.IfNot(&ok, lock.Unlock) txid := atomic.AddUint64(&f.txids, 1) + + tracef("init new transaction (readonly: %v)\n", settings.Readonly) + tx := newTx(f, txid, lock, settings) tracef("begin transaction: %p (readonly: %v)\n", tx, settings.Readonly) + + tx.onBegin() + + ok = true return tx, nil } @@ -383,6 +448,7 @@ func (f *File) mmap() reason { return f.err(op).of(InvalidFileSize).report(msg) } f.size = fileSize + f.sizeEstimate = fileSize // reset estimate maxSize := f.allocator.maxSize if em := uint(f.allocator.meta.endMarker); maxSize > 0 && em > f.allocator.maxPages { @@ -587,14 +653,6 @@ func readMeta(f vfs.File, off int64) (metaPage, reason) { // That is, exponential grows with values of 64KB, 128KB, 512KB, 1024KB, and so on. // Once 1GB is reached, the mmaped area is always a multiple of 1GB. func computeMmapSize(minSize, maxSize, pageSize uint) (uint, reason) { - var maxMapSize uint - if math.MaxUint32 == maxUint { - maxMapSize = 2 * sz1GB - } else { - tmp := uint64(0x1FFFFFFFFFFF) - maxMapSize = uint(tmp) - } - if maxSize != 0 { // return maxSize as multiple of pages. Round downwards in case maxSize // is not multiple of pages @@ -623,7 +681,7 @@ func computeMmapSize(minSize, maxSize, pageSize uint) (uint, reason) { // allocate number of 1GB blocks to fulfill minSize sz := ((minSize + (sz1GB - 1)) / sz1GB) * sz1GB - if sz > maxMapSize { + if sz > maxMmapSize { return 0, raiseInvalidParamf("mmap size of %v bytes is too large", sz) } diff --git a/vendor/github.com/elastic/go-txfile/magefile.go b/vendor/github.com/elastic/go-txfile/magefile.go new file mode 100644 index 000000000000..7ea7530a4007 --- /dev/null +++ b/vendor/github.com/elastic/go-txfile/magefile.go @@ -0,0 +1,288 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//+build mage + +package main + +import ( + "errors" + "fmt" + "os" + "path" + "path/filepath" + "runtime" + "strings" + + "github.com/magefile/mage/mg" + "github.com/magefile/mage/sh" + + "github.com/elastic/go-txfile/dev-tools/lib/mage/gotool" + "github.com/elastic/go-txfile/dev-tools/lib/mage/mgenv" + "github.com/elastic/go-txfile/dev-tools/lib/mage/xbuild" +) + +// Info namespace is used to print additional docs, help messages, and other info. +type Info mg.Namespace + +// Prepare namespace is used to prepare/download/build common depenendencies for other tasks to run. +type Prepare mg.Namespace + +// Check runs pre-build checks on the environment and source code. (e.g. linters) +type Check mg.Namespace + +// Build namespace defines the set of build targets +type Build mg.Namespace + +const buildHome = "build" + +// environment variables +var ( + envBuildOS = mgenv.String("BUILD_OS", runtime.GOOS, "(string) set compiler target GOOS") + envBuildArch = mgenv.String("BUILD_ARCH", runtime.GOARCH, "(string) set compiler target GOARCH") + envTestUseBin = mgenv.Bool("TEST_USE_BIN", false, "(bool) reuse prebuild test binary when running tests") + envTestShort = mgenv.Bool("TEST_SHORT", false, "(bool) run tests with -short flag") + envFailFast = mgenv.Bool("FAIL_FAST", false, "(bool) do not run other tasks on failure") +) + +var xProviders = xbuild.NewRegistry(map[xbuild.OSArch]xbuild.Provider{ + xbuild.OSArch{"linux", "arm"}: &xbuild.DockerImage{ + Image: "balenalib/revpi-core-3-alpine-golang:latest-edge-build", + Workdir: "/go/src/github.com/elastic/go-txfile", + Volumes: map[string]string{ + filepath.Join(os.Getenv("GOPATH"), "src"): "/go/src", + }, + }, +}) + +// targets + +// Env prints environment info +func (Info) Env() { + printTitle("Mage environment variables") + for _, k := range mgenv.Keys() { + v, _ := mgenv.Find(k) + fmt.Printf("%v=%v\n", k, v.Get()) + } + fmt.Println() + + printTitle("Go info") + sh.RunV(mg.GoCmd(), "env") + fmt.Println() + + printTitle("docker info") + sh.RunV("docker", "version") +} + +// Vars prints the list of registered environment variables +func (Info) Vars() { + for _, k := range mgenv.Keys() { + v, _ := mgenv.Find(k) + fmt.Printf("%v=%v : %v\n", k, v.Default(), v.Doc()) + } +} + +// All runs all Prepare tasks +func (Prepare) All() { mg.Deps(Prepare.Dirs) } + +// Dirs creates requires build directories for storing artifacts +func (Prepare) Dirs() error { return mkdir("build") } + +// Lint runs golint +func (Check) Lint() error { + return errors.New("TODO: implement me") +} + +// Clean removes build artifacts +func Clean() error { + return sh.Rm(buildHome) +} + +// Mage builds the magefile binary for reuse +func (Build) Mage() error { + mg.Deps(Prepare.Dirs) + + goos := envBuildOS + goarch := envBuildArch + out := filepath.Join(buildHome, fmt.Sprintf("mage-%v-%v", goos, goarch)) + return sh.Run("mage", "-f", "-goos="+goos, "-goarch="+goarch, "-compile", out) +} + +// Test builds the per package unit test executables. +func (Build) Test() error { + mg.Deps(Prepare.Dirs) + + return withList(gotool.ListProjectPackages, failFastEach, func(pkg string) error { + tst := gotool.Test + return tst( + tst.OS(envBuildOS), + tst.ARCH(envBuildArch), + tst.Create(), + tst.WithCoverage(""), + tst.Out(path.Join(buildHome, pkg, path.Base(pkg))), + tst.Package(pkg), + ) + }) +} + +// Test runs the unit tests. +func Test() error { + mg.Deps(Prepare.Dirs) + + if crossBuild() { + return withXProvider(func(p xbuild.Provider) error { + mg.Deps(Build.Mage, Build.Test) + + env := mgenv.MakeEnv() + env["TEST_USE_BIN"] = "true" + return p.Run(env, "./build/mage-linux-arm", useIf("-v", mg.Verbose()), "test") + }) + } + + return withList(gotool.ListProjectPackages, failFastEach, func(pkg string) error { + fmt.Println("Test:", pkg) + if b, err := gotool.HasTests(pkg); !b { + fmt.Printf("Skipping %v: No tests found\n", pkg) + return err + } + + home := path.Join(buildHome, pkg) + if err := mkdir(home); err != nil { + return err + } + + tst := gotool.Test + bin := path.Join(home, path.Base(pkg)) + return tst( + tst.Use(useIf(bin, existsFile(bin) && envTestUseBin)), + tst.WithCoverage(path.Join(home, "cover.out")), + tst.Short(envTestShort), + tst.Out(bin), + tst.Package(pkg), + tst.Verbose(), + ) + }) +} + +// helpers + +func withList( + gen func() ([]string, error), + mode func(...func() error) error, + fn func(string) error, +) error { + list, err := gen() + if err != nil { + return err + } + + ops := make([]func() error, len(list)) + for i, v := range list { + v := v + ops[i] = func() error { return fn(v) } + } + + return mode(ops...) +} + +func useIf(s string, b bool) string { + if b { + return s + } + return "" +} + +func existsFile(path string) bool { + fi, err := os.Stat(path) + return err == nil && fi.Mode().IsRegular() +} + +func mkdirs(paths ...string) error { + for _, p := range paths { + if err := mkdir(p); err != nil { + return err + } + } + return nil +} + +func mkdir(path string) error { + return os.MkdirAll(path, os.ModeDir|0700) +} + +func failFastEach(ops ...func() error) error { + mode := each + if envFailFast { + mode = and + } + return mode(ops...) +} + +func each(ops ...func() error) error { + var errs []error + for _, op := range ops { + if err := op(); err != nil { + errs = append(errs, err) + } + } + return makeErrs(errs) +} + +func and(ops ...func() error) error { + for _, op := range ops { + if err := op(); err != nil { + return err + } + } + return nil +} + +type multiErr []error + +func makeErrs(errs []error) error { + if len(errs) == 0 { + return nil + } + return multiErr(errs) +} + +func (m multiErr) Error() string { + var bld strings.Builder + for _, err := range m { + if bld.Len() > 0 { + bld.WriteByte('\n') + bld.WriteString(err.Error()) + } + } + return bld.String() +} + +func printTitle(s string) { + fmt.Println(s) + for range s { + fmt.Print("=") + } + fmt.Println() +} + +func crossBuild() bool { + return envBuildArch != runtime.GOARCH || envBuildOS != runtime.GOOS +} + +func withXProvider(fn func(p xbuild.Provider) error) error { + return xProviders.With(envBuildOS, envBuildArch, fn) +} diff --git a/vendor/github.com/elastic/go-txfile/observe.go b/vendor/github.com/elastic/go-txfile/observe.go new file mode 100644 index 000000000000..b0602df645cf --- /dev/null +++ b/vendor/github.com/elastic/go-txfile/observe.go @@ -0,0 +1,71 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package txfile + +import "time" + +// Observer defines common callbacks to observe errors, transactions and other +// state changes in txfile. The callbacks must not block, so to not block any +// file operations. +type Observer interface { + + // OnOpen reports initial file stats when successfully open a file. + // + // Memory stats are reported in sizes. Page counts can be derived by dividing + // the sizes by pageSz. + // + // Derived metrics: + // dataAreaSz = maxSz - metaAreaSz // total data area size + // dataAreaActive = dataAreaSz - avail // data area bytes currently in use + OnOpen(stats FileStats) + + // OnBegin reports the start of a new transaction. + OnTxBegin(readonly bool) + + // OnClose is used to signal the end of a transaction. + // + // If readonly is set, the transaction we a readonly transaction. Only the + // Duration, Total, and Accessed fields will be set. + // Only if `commit` is set will the reported stats be affective in upcoming + // file operations (pages written/freed). + OnTxClose(file FileStats, tx TxStats) +} + +// FileStats reports the current file state like version and allocated/free space. +type FileStats struct { + Version uint32 // lates file-header version + Size uint64 // actual file size (changes if file did grow dynamically due to allocations) + MaxSize uint64 // max file size as stores in file header + PageSize uint32 // file page size + MetaArea uint // total pages reserved for the meta area + DataAllocated uint // data pages in use + MetaAllocated uint // meta pages in use +} + +// TxStats contains common statistics collected during the life-cycle of a transaction. +type TxStats struct { + Readonly bool // set if transaction is readonly. In this case only Duration, Total and Accessed will be set. + Commit bool // If set reported stats will be affective in future file operations. Otherwise allocation stats will have no effect. + Duration time.Duration // total duration the transaction was live + Total uint // total number of pages accessed(written, read, changed) during the transaction + Accessed uint // number of accessed existing pages (read) + Allocated uint // temporarily allocated pages + Freed uint // total number of freed pages + Written uint // total number of pages being written to + Updated uint // number of pages with changed contents +} diff --git a/vendor/github.com/elastic/go-txfile/opts.go b/vendor/github.com/elastic/go-txfile/opts.go index 0ffbbfe323c9..05b0c13b3aa1 100644 --- a/vendor/github.com/elastic/go-txfile/opts.go +++ b/vendor/github.com/elastic/go-txfile/opts.go @@ -22,6 +22,9 @@ type Options struct { // Additional flags. Flags Flag + // Configure file sync behavior + Sync SyncMode + // MaxSize sets the maximum file size in bytes. This should be a multiple of PageSize. // If it's not a multiple of PageSize, the actual files maximum size is rounded downwards // to the next multiple of PageSize. @@ -45,6 +48,8 @@ type Options struct { // Open file in readonly mode. Readonly bool + + Observer Observer } // Flag configures file opening behavior. @@ -65,6 +70,25 @@ const ( FlagUpdMaxSize ) +// SyncMode selects the file syncing behavior +type SyncMode uint8 + +const ( + // SyncDefault lets the implementation choose the default syncing mode + SyncDefault SyncMode = iota + + // SyncData prefers fdatasync if available. Still uses fsync (or similar) if + // implementation wants to enforce fsync. + SyncData + + // SyncFull enforces fsync/or similar. + SyncFull + + // SyncNone disable syncing. Do not use this in production environments, as + // this can easily cause file corruption. + SyncNone +) + // Validate checks if all fields in Options are consistent with the File implementation. func (o *Options) Validate() error { if o.Flags.check(FlagUpdMaxSize) { diff --git a/vendor/github.com/elastic/go-txfile/page.go b/vendor/github.com/elastic/go-txfile/page.go index 9584772597d0..786f3bf96b80 100644 --- a/vendor/github.com/elastic/go-txfile/page.go +++ b/vendor/github.com/elastic/go-txfile/page.go @@ -48,6 +48,14 @@ func newPage(tx *Tx, id PageID) *Page { return &Page{id: id, ondiskID: id, tx: tx} } +func (p *Page) onWriteNew() { + p.tx.accessStats.New++ +} + +func (p *Page) onUpdated() { + p.tx.accessStats.Update++ +} + // ID returns the pages PageID. The ID can be used to store a reference // to this page, for use within another transaction. func (p *Page) ID() PageID { return p.id } @@ -70,10 +78,24 @@ func (p *Page) MarkDirty() error { if err := p.canWrite(op); err != nil { return err } - p.flags.dirty = true + + p.setDirty() return nil } +func (p *Page) setDirty() { + if p.flags.dirty { + return + } + + p.flags.dirty = true + if p.flags.new { + p.onWriteNew() + } else { + p.onUpdated() + } +} + // Free marks a page as free. Freeing a dirty page will return an error. // The page will be returned to the allocator when the transaction commits. func (p *Page) Free() error { @@ -203,7 +225,7 @@ func (p *Page) SetBytes(contents []byte) error { p.bytes = contents } - p.flags.dirty = true + p.setDirty() return nil } diff --git a/vendor/github.com/elastic/go-txfile/pq/access.go b/vendor/github.com/elastic/go-txfile/pq/access.go index a3415adceee3..c5b1cedc6aa9 100644 --- a/vendor/github.com/elastic/go-txfile/pq/access.go +++ b/vendor/github.com/elastic/go-txfile/pq/access.go @@ -80,6 +80,10 @@ func (a *access) rootPage(tx *txfile.Tx) (*txfile.Page, error) { return tx.Page(a.rootID) } +func (a *access) RootFileOffset() uintptr { + return a.Offset(a.rootID, uintptr(a.rootOff)) +} + // LoadRootPage accesses the queue root page from within the passed write // transaction. // The Root page it's content is loaded into the write buffer for manipulations. diff --git a/vendor/github.com/elastic/go-txfile/pq/ack.go b/vendor/github.com/elastic/go-txfile/pq/ack.go index 49ca3495c397..df98ada97668 100644 --- a/vendor/github.com/elastic/go-txfile/pq/ack.go +++ b/vendor/github.com/elastic/go-txfile/pq/ack.go @@ -18,6 +18,8 @@ package pq import ( + "time" + "github.com/elastic/go-txfile" "github.com/elastic/go-txfile/internal/invariant" ) @@ -27,6 +29,9 @@ type acker struct { accessor *access active bool + hdrOffset uintptr + observer Observer + totalEventCount uint totalFreedPages uint @@ -40,8 +45,14 @@ type ackState struct { read position // New on-disk read pointer, pointing to first not-yet ACKed event. } -func newAcker(accessor *access, cb func(uint, uint)) *acker { - return &acker{active: true, accessor: accessor, ackCB: cb} +func newAcker(accessor *access, off uintptr, o Observer, cb func(uint, uint)) *acker { + return &acker{ + hdrOffset: off, + observer: o, + active: true, + accessor: accessor, + ackCB: cb, + } } func (a *acker) close() { @@ -66,16 +77,34 @@ func (a *acker) handle(n uint) error { traceln("acker: pq ack events:", n) + start := time.Now() + events, pages, err := a.cleanup(n) + if o := a.observer; o != nil { + failed := err != nil + o.OnQueueACK(a.hdrOffset, ACKStats{ + Duration: time.Since(start), + Failed: failed, + Events: events, + Pages: pages, + }) + } + return err +} + +func (a *acker) cleanup(n uint) (events uint, pages uint, err error) { + const op = "pq/ack-cleanup" + state, err := a.initACK(n) + events, pages = n, uint(len(state.free)) if err != nil { - return a.errWrap(op, err) + return events, pages, a.errWrap(op, err) } // start write transaction to free pages and update the next read offset in // the queue root tx, txErr := a.accessor.BeginCleanup() if txErr != nil { - return a.errWrap(op, txErr).report("failed to init cleanup tx") + return events, pages, a.errWrap(op, txErr).report("failed to init cleanup tx") } defer tx.Close() @@ -83,19 +112,19 @@ func (a *acker) handle(n uint) error { for _, id := range state.free { page, err := tx.Page(id) if err != nil { - return a.errWrapPage(op, err, id).report("can not access page to be freed") + return events, pages, a.errWrapPage(op, err, id).report("can not access page to be freed") } traceln("free page", id) if err := page.Free(); err != nil { - return a.errWrapPage(op, err, id).report("releasing page failed") + return events, pages, a.errWrapPage(op, err, id).report("releasing page failed") } } // update queue header hdrPage, hdr, err := a.accessor.LoadRootPage(tx) if err != nil { - return err + return events, pages, err } a.accessor.WritePosition(&hdr.head, state.head) a.accessor.WritePosition(&hdr.read, state.read) @@ -105,7 +134,7 @@ func (a *acker) handle(n uint) error { traceQueueHeader(hdr) if err := tx.Commit(); err != nil { - return a.errWrap(op, err).report("failed to commit changes") + return events, pages, a.errWrap(op, err).report("failed to commit changes") } a.totalEventCount += n @@ -116,7 +145,7 @@ func (a *acker) handle(n uint) error { a.ackCB(n, uint(len(state.free))) } - return nil + return events, pages, nil } // initACK uses a read-transaction to collect pages to be removed from list and @@ -198,9 +227,9 @@ func (a *acker) queueRange(hdr *queuePage) (head, start, end position) { func (a *acker) collectFreePages(c *txCursor, endID uint64) ([]txfile.PageID, bool, reason) { const op = "pq/collect-acked-pages" var ( - ids []txfile.PageID - firstID, lastID uint64 - cleanAll = false + ids []txfile.PageID + lastID uint64 + cleanAll = false ) for { @@ -209,27 +238,32 @@ func (a *acker) collectFreePages(c *txCursor, endID uint64) ([]txfile.PageID, bo return nil, false, a.errWrap(op, err) } - // stop searching if endID is in the current page + next := hdr.next.Get() + + // stop searching if current page is the last page. The last page must + // be active for the writer to add more events and link new pages. + isWritePage := next == 0 + + // stop searching if endID is in the current write page dataOnlyPage := hdr.off.Get() == 0 // no event starts within this page if !dataOnlyPage { - firstID, lastID = hdr.first.Get(), hdr.last.Get() + lastID = hdr.last.Get() // inc 'lastID', so to hold on current page if endID would point to next // the page. This helps the reader, potentially pointing to the current // page, if next page has not been committed when reading events. lastID++ - if idLessEq(firstID, endID) && idLessEq(endID, lastID) { + // remove page if endID points past current data page + keepPage := isWritePage || idLessEq(endID, lastID) + if keepPage { break } } - // stop searching if current page is the last page. The last page must - // be active for the writer to add more events and link new pages. - lastPage := hdr.next.Get() == 0 - if lastPage { + if isWritePage { cleanAll = true - invariant.Check(lastID+1 == endID, "last event ID and ack event id missmatch") + invariant.Checkf(lastID+1 == endID, "last event ID (%v) and ack event id (%v) missmatch", lastID, endID) break } diff --git a/vendor/github.com/elastic/go-txfile/pq/buffer.go b/vendor/github.com/elastic/go-txfile/pq/buffer.go index bedba29dea46..bf77bbe2887b 100644 --- a/vendor/github.com/elastic/go-txfile/pq/buffer.go +++ b/vendor/github.com/elastic/go-txfile/pq/buffer.go @@ -42,12 +42,18 @@ type buffer struct { eventHdrPage *page eventHdrOffset int eventHdrSize int + + // stats + countPages uint } func newBuffer(pool *pagePool, page *page, pages, pageSize, hdrSz int) *buffer { payloadSz := pageSize - hdrSz avail := payloadSz * pages + tracef("init writer buffer with pages=%v, pageSize=%v, hdrSize=%v, avail=%v\n", + pages, pageSize, hdrSz, avail) + b := &buffer{ head: nil, tail: nil, @@ -72,6 +78,7 @@ func newBuffer(pool *pagePool, page *page, pages, pageSize, hdrSz int) *buffer { b.avail -= contentsLength b.payload = page.Data[page.Meta.EndOff:] b.page = page + b.countPages++ } return b @@ -97,7 +104,7 @@ func (b *buffer) Append(data []byte) { data = data[n:] b.avail -= n - tracef("writer: append %v bytes to (page: %v, off: %v)\n", n, b.page.Meta.ID, b.page.Meta.EndOff) + tracef("writer: append %v bytes to (page: %v, off: %v, avail: %v)\n", n, b.page.Meta.ID, b.page.Meta.EndOff, b.avail) b.page.Meta.EndOff += uint32(n) } @@ -120,10 +127,12 @@ func (b *buffer) advancePage() { } func (b *buffer) newPage() *page { + b.countPages++ return b.pool.NewPage() } func (b *buffer) releasePage(p *page) { + b.countPages-- b.pool.Release(p) } @@ -193,29 +202,43 @@ func (b *buffer) CommitEvent(id uint64) { // Pages returns start and end page to be serialized. // The `end` page must not be serialized -func (b *buffer) Pages() (start, end *page) { +func (b *buffer) Pages() (start, end *page, n uint) { + traceln("get buffer active page range") + if b.head == nil || !b.head.Dirty() { - return nil, nil + traceln("buffer empty") + return nil, nil, 0 } if b.eventHdrPage == nil { + traceln("no active page") + if b.tail.Dirty() { - return b.head, nil + traceln("tail is dirty") + return b.head, nil, b.countPages } + + traceln("tail is not dirty") for current := b.head; current != nil; current = current.Next { if !current.Dirty() { - return b.head, current + return b.head, current, n } + n++ } invariant.Unreachable("tail if list dirty and not dirty?") } end = b.eventHdrPage + n = b.countPages if end.Dirty() { + traceln("active page is dirty") end = end.Next + } else { + traceln("active page is clean") + n-- } - return b.head, end + return b.head, end, n } // Reset removes all but the last page non-dirty page from the buffer. diff --git a/vendor/github.com/elastic/go-txfile/pq/errkind_string.go b/vendor/github.com/elastic/go-txfile/pq/errkind_string.go index 5858f7258d18..6d37de8ab494 100644 --- a/vendor/github.com/elastic/go-txfile/pq/errkind_string.go +++ b/vendor/github.com/elastic/go-txfile/pq/errkind_string.go @@ -4,9 +4,9 @@ package pq import "strconv" -const _ErrKind_name = "no errorfailed to initialize queueinvalid parameterinvalid page sizeinvalid queue configqueue is already closedreader is already closedwriter is already closedno queue rootqueue root is invalidunsupported queue versioninvalid ack on empty queuetoo many events ackedfailed to seek to next pagefailed to read page" +const _ErrKind_name = "no errorfailed to initialize queueinvalid parameterinvalid page sizeinvalid queue configqueue is already closedreader is already closedwriter is already closedno queue rootqueue root is invalidunsupported queue versioninvalid ack on empty queuetoo many events ackedfailed to seek to next pagefailed to read pageno active transactionunexpected active transaction" -var _ErrKind_index = [...]uint16{0, 8, 34, 51, 68, 88, 111, 135, 159, 172, 193, 218, 244, 265, 292, 311} +var _ErrKind_index = [...]uint16{0, 8, 34, 51, 68, 88, 111, 135, 159, 172, 193, 218, 244, 265, 292, 311, 332, 361} func (i ErrKind) String() string { if i < 0 || i >= ErrKind(len(_ErrKind_index)-1) { diff --git a/vendor/github.com/elastic/go-txfile/pq/error.go b/vendor/github.com/elastic/go-txfile/pq/error.go index 66244c1cd2bf..dc000276fe0c 100644 --- a/vendor/github.com/elastic/go-txfile/pq/error.go +++ b/vendor/github.com/elastic/go-txfile/pq/error.go @@ -58,21 +58,23 @@ type errorCtx struct { //go:generate stringer -type=ErrKind -linecomment=true const ( - NoError ErrKind = iota // no error - InitFailed // failed to initialize queue - InvalidParam // invalid parameter - InvalidPageSize // invalid page size - InvalidConfig // invalid queue config - QueueClosed // queue is already closed - ReaderClosed // reader is already closed - WriterClosed // writer is already closed - NoQueueRoot // no queue root - InvalidQueueRoot // queue root is invalid - QueueVersion // unsupported queue version - ACKEmptyQueue // invalid ack on empty queue - ACKTooMany // too many events acked - SeekFail // failed to seek to next page - ReadFail // failed to read page + NoError ErrKind = iota // no error + InitFailed // failed to initialize queue + InvalidParam // invalid parameter + InvalidPageSize // invalid page size + InvalidConfig // invalid queue config + QueueClosed // queue is already closed + ReaderClosed // reader is already closed + WriterClosed // writer is already closed + NoQueueRoot // no queue root + InvalidQueueRoot // queue root is invalid + QueueVersion // unsupported queue version + ACKEmptyQueue // invalid ack on empty queue + ACKTooMany // too many events acked + SeekFail // failed to seek to next page + ReadFail // failed to read page + InactiveTx // no active transaction + UnexpectedActiveTx // unexpected active transaction ) // Error returns a user readable error message. diff --git a/vendor/github.com/elastic/go-txfile/pq/observe.go b/vendor/github.com/elastic/go-txfile/pq/observe.go new file mode 100644 index 000000000000..76f0440f523c --- /dev/null +++ b/vendor/github.com/elastic/go-txfile/pq/observe.go @@ -0,0 +1,73 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pq + +import "time" + +// Observer defines common callbacks to observe operations, outcomes and stats +// on queues. +// Each callback reports the header offset for uniquely identifying a queue in +// case a file holds many queues. +type Observer interface { + OnQueueInit(headerOffset uintptr, version uint32, available uint) + + OnQueueFlush(headerOffset uintptr, stats FlushStats) + + OnQueueRead(headerOffset uintptr, stats ReadStats) + + OnQueueACK(headerOffset uintptr, stats ACKStats) +} + +// FlushStats reports internal stats on the most recent flush operation. +type FlushStats struct { + Duration time.Duration // duration of flush operation + Oldest, Newest time.Time // timestamp of oldest/newest event in buffer + + Failed bool // set to true if flush operation failed + OutOfMemory bool // set to true if flush failed due to the file being full + + Pages uint // number of pages to be flushed + Allocate uint // number of pages to allocate during flush operation + Events uint // number of events to be flushed + + BytesTotal uint // total number of bytes written (ignoring headers, just event sizes) + BytesMin uint // size of 'smallest' event in current transaction + BytesMax uint // size of 'biggest' event in current transaction +} + +// ReadStats reports stats on the most recent transaction for reading events. +type ReadStats struct { + Duration time.Duration // duration of read transaction + + Skipped uint // number of events skipped (e.g. upon error while reading/parsing) + Read uint // number of events read + + BytesTotal uint // total number of bytes read (ignoring headers). Include partially but skipped events + BytesSkipped uint // number of event bytes skipped + BytesMin uint // size of 'smallest' event fully read in current transaction + BytesMax uint // size of 'biggest' event fully read in current transaction +} + +// ACKStats reports stats on the most recent ACK transaction. +type ACKStats struct { + Duration time.Duration + Failed bool + + Events uint // number of released events + Pages uint // number of released pages +} diff --git a/vendor/github.com/elastic/go-txfile/pq/pq.go b/vendor/github.com/elastic/go-txfile/pq/pq.go index 4dba15f3ad48..90367f977c4f 100644 --- a/vendor/github.com/elastic/go-txfile/pq/pq.go +++ b/vendor/github.com/elastic/go-txfile/pq/pq.go @@ -31,7 +31,9 @@ import ( type Queue struct { accessor access - id queueID + id queueID + version uint32 + hdrOffset uintptr // TODO: add support for multiple named readers with separate ACK handling. @@ -66,6 +68,8 @@ type Settings struct { // Optional ACK callback. Will be use to notify number of events being successfully // ACKed and pages being freed. ACKed func(event, pages uint) + + Observer Observer } // MakeRoot prepares the queue header (empty queue). @@ -113,7 +117,6 @@ func New(delegate Delegate, settings Settings) (*Queue, error) { root := castQueueRootPage(rootBuf[:]) if root.version.Get() != queueVersion { - cause := &Error{ kind: InitFailed, msg: fmt.Sprintf("queue version %v", root.version.Get()), @@ -123,9 +126,23 @@ func New(delegate Delegate, settings Settings) (*Queue, error) { tracef("open queue: %p (pageSize: %v)\n", q, pageSize) traceQueueHeader(root) + + q.version = root.version.Get() + q.hdrOffset = q.accessor.RootFileOffset() + q.onInit() return q, nil } +func (q *Queue) onInit() { + o := q.settings.Observer + if o == nil { + return + } + + avail, _ := q.Active() + o.OnQueueInit(q.hdrOffset, q.version, avail) +} + // Close will try to flush the current write buffer, // but after closing the queue, no more reads or writes can be executed func (q *Queue) Close() error { @@ -194,7 +211,7 @@ func (q *Queue) Writer() (*Writer, error) { writeBuffer := q.settings.WriteBuffer flushed := q.settings.Flushed - writer, err := newWriter(&q.accessor, q.pagePool, writeBuffer, tail, flushed) + writer, err := newWriter(&q.accessor, q.hdrOffset, q.settings.Observer, q.pagePool, writeBuffer, tail, flushed) if err != nil { return nil, q.accessor.errWrap(op, err) } @@ -208,7 +225,7 @@ func (q *Queue) Writer() (*Writer, error) { // The reader is not thread safe. func (q *Queue) Reader() *Reader { if q.reader == nil { - q.reader = newReader(&q.accessor) + q.reader = newReader(q.settings.Observer, &q.accessor) } return q.reader } @@ -227,7 +244,7 @@ func (q *Queue) Active() (uint, error) { func (q *Queue) getAcker() *acker { if q.acker == nil { - q.acker = newAcker(&q.accessor, q.settings.ACKed) + q.acker = newAcker(&q.accessor, q.hdrOffset, q.settings.Observer, q.settings.ACKed) } return q.acker } diff --git a/vendor/github.com/elastic/go-txfile/pq/reader.go b/vendor/github.com/elastic/go-txfile/pq/reader.go index 501c2cbfb42a..e1d057a6aecd 100644 --- a/vendor/github.com/elastic/go-txfile/pq/reader.go +++ b/vendor/github.com/elastic/go-txfile/pq/reader.go @@ -18,6 +18,8 @@ package pq import ( + "time" + "github.com/elastic/go-txfile" "github.com/elastic/go-txfile/internal/invariant" ) @@ -29,22 +31,30 @@ type Reader struct { active bool tx *txfile.Tx + + hdrOff uintptr + observer Observer + txStart time.Time + stats ReadStats } type readState struct { - id uint64 - endID uint64 // id of next, yet unwritten event. - eventBytes int // number of unread bytes in current event + id uint64 + endID uint64 // id of next, yet unwritten event. + totEventBytes int // number of total bytes in current event + eventBytes int // number of unread bytes in current event cursor cursor } -func newReader(accessor *access) *Reader { +func newReader(observer Observer, accessor *access) *Reader { return &Reader{ active: true, accessor: accessor, + observer: observer, state: readState{ - eventBytes: -1, + eventBytes: -1, + totEventBytes: -1, cursor: cursor{ pageSize: accessor.PageSize(), }, @@ -64,15 +74,8 @@ func (r *Reader) Available() (uint, error) { return 0, r.errOf(op, err) } - var err reason - func() { - var tx *txfile.Tx - tx, err = r.beginTx() - if err == nil { - defer tx.Close() - err = r.updateQueueState(tx) - } - }() + tx := r.tx + err := r.updateQueueState(tx) if err != nil { return 0, r.errWrap(op, err) } @@ -89,12 +92,16 @@ func (r *Reader) Available() (uint, error) { func (r *Reader) Begin() error { const op = "pq/reader-begin" - if r.tx != nil { - r.tx.Close() + var sig ErrKind = NoError + switch { + case r.isClosed(): + sig = ReaderClosed + case r.isTxActive(): + sig = UnexpectedActiveTx } - if err := r.canRead(); err != NoError { - return r.errOf(op, err) + if sig != NoError { + return r.errOf(op, sig) } tx, err := r.beginTx() @@ -103,6 +110,8 @@ func (r *Reader) Begin() error { } r.tx = tx + r.txStart = time.Now() + r.stats = ReadStats{} // zero out last stats on begin return nil } @@ -113,6 +122,17 @@ func (r *Reader) Done() { } r.tx.Close() + + if r.state.eventBytes < 0 && r.state.totEventBytes > 0 { + // did read complete event -> adapt stats + r.adoptEventStats() + } + + r.stats.Duration = time.Since(r.txStart) + if o := r.observer; o != nil { + o.OnQueueRead(r.hdrOff, r.stats) + } + r.tx = nil } @@ -142,16 +162,6 @@ func (r *Reader) Read(b []byte) (int, error) { func (r *Reader) readInto(to []byte) ([]byte, reason) { tx := r.tx - if tx == nil { - t, err := r.beginTx() - if err != nil { - return nil, err - } - - tx = t - defer tx.Close() - } - n := r.state.eventBytes if L := len(to); L < n { n = L @@ -198,18 +208,10 @@ func (r *Reader) Next() (int, error) { } tx := r.tx - if tx == nil { - t, err := r.beginTx() - if err != nil { - return -1, r.errWrap(op, err) - } - - tx = t - defer tx.Close() - } - cursor := makeTxCursor(tx, r.accessor, &r.state.cursor) + r.adoptEventStats() + // in event? Skip contents if r.state.eventBytes > 0 { err := cursor.Skip(r.state.eventBytes) @@ -265,9 +267,42 @@ func (r *Reader) Next() (int, error) { } L := int(hdr.sz.Get()) r.state.eventBytes = L + r.state.totEventBytes = L return L, nil } +func (r *Reader) adoptEventStats() { + if r.state.totEventBytes < 0 { + // no active event + return + } + + // update stats: + skipping := r.state.eventBytes > 0 + + if skipping { + r.stats.Skipped++ + r.stats.BytesSkipped += uint(r.state.eventBytes) + r.stats.BytesTotal += uint(r.state.totEventBytes - r.state.eventBytes) + } else { + bytes := uint(r.state.totEventBytes) + r.stats.BytesTotal += bytes + if r.stats.Read == 0 { + r.stats.BytesMin = bytes + r.stats.BytesMax = bytes + } else { + if r.stats.BytesMin > bytes { + r.stats.BytesMin = bytes + } + if r.stats.BytesMax < bytes { + r.stats.BytesMax = bytes + } + } + + r.stats.Read++ + } +} + func (r *Reader) updateQueueState(tx *txfile.Tx) reason { const op = "pq/reader-update-queue-state" @@ -308,12 +343,23 @@ func (r *Reader) beginTx() (*txfile.Tx, reason) { } func (r *Reader) canRead() ErrKind { - if !r.active { + if r.isClosed() { return ReaderClosed } + if !r.isTxActive() { + return InactiveTx + } return NoError } +func (r *Reader) isClosed() bool { + return !r.active +} + +func (r *Reader) isTxActive() bool { + return r.tx != nil +} + func (r *Reader) err(op string) *Error { return &Error{op: op, ctx: r.errCtx()} } diff --git a/vendor/github.com/elastic/go-txfile/pq/writer.go b/vendor/github.com/elastic/go-txfile/pq/writer.go index dc7d1b4bc99f..dbb3c73960f3 100644 --- a/vendor/github.com/elastic/go-txfile/pq/writer.go +++ b/vendor/github.com/elastic/go-txfile/pq/writer.go @@ -18,8 +18,12 @@ package pq import ( + "time" + "github.com/elastic/go-txfile" "github.com/elastic/go-txfile/internal/cleanup" + "github.com/elastic/go-txfile/internal/invariant" + "github.com/elastic/go-txfile/txerr" ) // Writer is used to push new events onto the queue. @@ -30,6 +34,9 @@ import ( type Writer struct { active bool + hdrOffset uintptr + observer Observer + accessor *access flushCB func(uint) @@ -45,12 +52,19 @@ type writeState struct { eventID uint64 eventBytes int + + activeEventBytes uint + minEventSize uint + maxEventSize uint + tsOldest, tsNewest time.Time } const defaultMinPages = 5 func newWriter( accessor *access, + off uintptr, + o Observer, pagePool *pagePool, writeBuffer uint, end position, @@ -68,6 +82,9 @@ func newWriter( pages = defaultMinPages } + tracef("create queue writer with initBufferSize=%v, actualBufferSize=%v, pageSize=%v, pages=%v\n", + writeBuffer, pageSize*pages, pageSize, pages) + var tail *page if end.page != 0 { traceln("writer load endpage: ", end) @@ -85,8 +102,10 @@ func newWriter( } w := &Writer{ - active: true, - accessor: accessor, + active: true, + hdrOffset: off, + observer: o, + accessor: accessor, state: writeState{ buf: newBuffer(pagePool, tail, pages, pageSize, szEventPageHeader), eventID: end.id, @@ -106,7 +125,7 @@ func (w *Writer) close() error { return nil } - err := w.doFlush() + err := w.flushBuffer() if err != nil { return w.errWrap(op, err) } @@ -124,7 +143,7 @@ func (w *Writer) Write(p []byte) (int, error) { } if w.state.buf.Avail() <= len(p) { - if err := w.doFlush(); err != nil { + if err := w.flushBuffer(); err != nil { return 0, w.errWrap(op, err) } } @@ -151,13 +170,32 @@ func (w *Writer) Next() error { hdr.sz.Set(uint32(w.state.eventBytes)) w.state.buf.CommitEvent(w.state.eventID) w.state.buf.ReserveHdr(szEventHeader) + + sz := uint(w.state.eventBytes) + ts := time.Now() + w.state.activeEventBytes += sz + if w.state.activeEventCount == 0 { + w.state.minEventSize = sz + w.state.maxEventSize = sz + w.state.tsOldest = ts + w.state.tsNewest = ts + } else { + if sz < w.state.minEventSize { + w.state.minEventSize = sz + } + if sz > w.state.maxEventSize { + w.state.maxEventSize = sz + } + w.state.tsNewest = ts + } + w.state.eventBytes = 0 w.state.eventID++ w.state.activeEventCount++ // check if we need to flush if w.state.buf.Avail() <= szEventHeader { - if err := w.doFlush(); err != nil { + if err := w.flushBuffer(); err != nil { return w.errWrap(op, err) } } @@ -174,40 +212,94 @@ func (w *Writer) Flush() error { return w.errOf(op, err) } - if err := w.doFlush(); err != nil { + if err := w.flushBuffer(); err != nil { return w.errWrap(op, err) } + return nil +} + +func (w *Writer) flushBuffer() error { + activeEventCount := w.state.activeEventCount + + start := time.Now() + pages, allocated, err := w.doFlush() + + if o := w.observer; o != nil { + failed := err != nil + o.OnQueueFlush(w.hdrOffset, FlushStats{ + Duration: time.Since(start), + Oldest: w.state.tsOldest, + Newest: w.state.tsNewest, + Failed: failed, + OutOfMemory: failed && (txerr.Is(txfile.OutOfMemory, err) || + txerr.Is(txfile.NoDiskSpace, err)), + Pages: pages, + Allocate: allocated, + Events: activeEventCount, + BytesTotal: w.state.activeEventBytes, + BytesMin: w.state.minEventSize, + BytesMax: w.state.maxEventSize, + }) + } + + if err != nil { + return err + } + + // reset internal stats on success + w.state.totalEventCount += activeEventCount + w.state.totalAllocPages += allocated + + traceln("Write buffer flushed. Total events: %v, total pages allocated: %v", + w.state.totalEventCount, + w.state.totalAllocPages) + + w.state.activeEventCount = 0 + w.state.activeEventBytes = 0 + w.state.minEventSize = 0 + w.state.maxEventSize = 0 + + if w.flushCB != nil { + w.flushCB(activeEventCount) + } return nil } -func (w *Writer) doFlush() error { - start, end := w.state.buf.Pages() +func (w *Writer) doFlush() (pages, allocated uint, err error) { + start, end, pages := w.state.buf.Pages() if start == nil || start == end { - return nil + return 0, 0, nil } traceln("writer flush", w.state.activeEventCount) + tracef("flush page range: start=%p, end=%p, n=%v\n", start, end, pages) // unallocated points to first page in list that must be allocated. All // pages between unallocated and end require a new page to be allocated. var unallocated *page + allocated = pages for current := start; current != end; current = current.Next { + tracef("check page assigned: %p (%v)\n", current, current.Assigned()) + if !current.Assigned() { unallocated = current break } + allocated-- } + tracef("start allocating pages from %p (n=%v)\n", unallocated, allocated) + tx, txErr := w.accessor.BeginWrite() if txErr != nil { - return w.errWrap("", txErr) + return pages, allocated, w.errWrap("", txErr) } defer tx.Close() rootPage, queueHdr, err := w.accessor.LoadRootPage(tx) if err != nil { - return w.errWrap("", err) + return pages, allocated, w.errWrap("", err) } traceQueueHeader(queueHdr) @@ -215,15 +307,19 @@ func (w *Writer) doFlush() error { ok := false allocN, txErr := allocatePages(tx, unallocated, end) if txErr != nil { - return w.errWrap("", txErr) + return pages, allocated, w.errWrap("", txErr) } + + traceln("allocated pages:", allocN) + invariant.Checkf(allocN == allocated, "allocation counter mismatch (expected=%v, actual=%v)", allocated, allocN) + linkPages(start, end) defer cleanup.IfNot(&ok, func() { unassignPages(unallocated, end) }) traceln("write queue pages") last, txErr := flushPages(tx, start, end) if txErr != nil { - return w.errWrap("", txErr) + return pages, allocated, w.errWrap("", txErr) } // update queue root @@ -232,7 +328,7 @@ func (w *Writer) doFlush() error { txErr = tx.Commit() if txErr != nil { - return w.errWrap("", txErr) + return pages, allocated, w.errWrap("", txErr) } // mark write as success -> no error-cleanup required @@ -244,24 +340,10 @@ func (w *Writer) doFlush() error { } w.state.buf.Reset(last) - - activeEventCount := w.state.activeEventCount - w.state.totalEventCount += activeEventCount - w.state.totalAllocPages += uint(allocN) - - traceln("Write buffer flushed. Total events: %v, total pages allocated: %v", - w.state.totalEventCount, - w.state.totalAllocPages) - - w.state.activeEventCount = 0 - if w.flushCB != nil { - w.flushCB(activeEventCount) - } - - return nil + return pages, allocated, nil } -func (w *Writer) updateRootHdr(hdr *queuePage, start, last *page, allocated int) { +func (w *Writer) updateRootHdr(hdr *queuePage, start, last *page, allocated uint) { if hdr.head.offset.Get() == 0 { w.accessor.WritePosition(&hdr.head, position{ page: start.Meta.ID, @@ -313,19 +395,19 @@ func (w *Writer) errPageCtx(id txfile.PageID) errorCtx { return w.accessor.errPageCtx(id) } -func allocatePages(tx *txfile.Tx, start, end *page) (int, error) { +func allocatePages(tx *txfile.Tx, start, end *page) (uint, error) { if start == nil { return 0, nil } - allocN := 0 + var allocN uint for current := start; current != end; current = current.Next { allocN++ } tracef("allocate %v queue pages\n", allocN) - txPages, err := tx.AllocN(allocN) + txPages, err := tx.AllocN(int(allocN)) if err != nil { return 0, err } diff --git a/vendor/github.com/elastic/go-txfile/tx.go b/vendor/github.com/elastic/go-txfile/tx.go index 7a2738318d5d..211d76247d29 100644 --- a/vendor/github.com/elastic/go-txfile/tx.go +++ b/vendor/github.com/elastic/go-txfile/tx.go @@ -19,6 +19,7 @@ package txfile import ( "sync" + "time" "github.com/elastic/go-txfile/internal/cleanup" "github.com/elastic/go-txfile/internal/invariant" @@ -45,6 +46,16 @@ type Tx struct { // scheduled WAL updates wal txWalState + + // transaction stats + tsStart time.Time + accessStats txAccessStats +} + +type txAccessStats struct { + New uint + Read uint + Update uint } // TxOptions adds some per transaction options user can set. @@ -109,6 +120,96 @@ func newTx(file *File, id uint64, lock sync.Locker, settings TxOptions) *Tx { return tx } +func (tx *Tx) onBegin() { + o := tx.file.observer + if o == nil { + return + } + + tx.tsStart = time.Now() + o.OnTxBegin(tx.flags.readonly) +} + +// onClose is called when a readonly transaction is closed. +func (tx *Tx) onClose() { + o := tx.file.observer + if o == nil { + return + } + + accessed := tx.accessStats.Read + o.OnTxClose(tx.file.stats, TxStats{ + Readonly: true, + Duration: time.Since(tx.tsStart), + Total: accessed, + Accessed: accessed, + }) +} + +// onRollback is called when a writable transaction is closed or rolled back without commit. +func (tx *Tx) onRollback() { + o := tx.file.observer + if o == nil { + return + } + + read := tx.accessStats.Read + updated := tx.accessStats.Update + new := tx.accessStats.New + + o.OnTxClose(tx.file.stats, TxStats{ + Readonly: false, + Commit: false, + Duration: time.Since(tx.tsStart), + Total: read + updated + new, + Accessed: read, + Updated: updated, + Written: updated + new, + Allocated: tx.alloc.stats.data.alloc, + Freed: tx.alloc.stats.data.freed, + }) +} + +// onCommit is called after a writable transaction did succeed. +func (tx *Tx) onCommit() { + allocStats := &tx.alloc.stats + + fileStats := &tx.file.stats + fileStats.Size = uint64(tx.file.sizeEstimate) + fileStats.MetaArea = tx.file.allocator.metaTotal + fileStats.MetaAllocated = tx.file.allocator.metaTotal - tx.file.allocator.meta.freelist.Avail() + fileStats.DataAllocated += allocStats.data.alloc - allocStats.data.freed - allocStats.toMeta + + o := tx.file.observer + if o == nil { + return + } + + read := tx.accessStats.Read + updated := tx.accessStats.Update + new := tx.accessStats.New + + o.OnTxClose(tx.file.stats, TxStats{ + Readonly: false, + Commit: true, + Duration: time.Since(tx.tsStart), + Total: read + updated + new, + Accessed: read, + Allocated: allocStats.data.alloc - allocStats.toMeta, + Freed: allocStats.data.freed, + Written: updated + new, + Updated: updated, + }) +} + +// onAccess is called when a the memory page pointer is requested. +func (tx *Tx) onAccess() { + tx.accessStats.Read++ +} + +func (tx *Tx) onWALTransfer(n int) { // number of wal pages copied into data area +} + // Writable returns true if the transaction supports file modifications. func (tx *Tx) Writable() bool { return !tx.flags.readonly @@ -276,7 +377,7 @@ func (tx *Tx) doCheckpointWAL() { for i := range ids { id, walID := ids[i], walIDS[i] - contents := tx.file.mmapedPage(walID) + contents := tx.access(walID) if contents == nil { panic("invalid WAL mapping") } @@ -291,6 +392,7 @@ func (tx *Tx) doCheckpointWAL() { tx.freeWALID(id, walID) } + tx.onWALTransfer(len(ids)) tx.flags.checkpoint = true } @@ -300,10 +402,12 @@ func (tx *Tx) finishWith(fn func() reason) reason { } defer tx.close() - if !tx.flags.readonly { - return fn() + if tx.flags.readonly { + tx.onClose() + return nil } - return nil + + return fn() } func (tx *Tx) close() { @@ -321,12 +425,14 @@ func (tx *Tx) commitChanges() reason { defer cleanup.IfNot(&commitOK, tx.rollbackChanges) err := tx.tryCommitChanges() - if commitOK = err == nil; !commitOK { + commitOK = err == nil + if !commitOK { return err } traceMetaPage(tx.file.getMetaPage()) - return err + tx.onCommit() + return nil } // tryCommitChanges attempts to write flush all pages written and update the @@ -358,10 +464,6 @@ func (tx *Tx) tryCommitChanges() reason { pending, exclusive := tx.file.locks.Pending(), tx.file.locks.Exclusive() - newMetaBuf := tx.prepareMetaBuffer() - newMeta := newMetaBuf.cast() - newMeta.root.Set(tx.rootID) // update data root - // give concurrent read transactions a chance to complete, but don't allow // for new read transactions to start while executing the commit pending.Lock() @@ -391,38 +493,14 @@ func (tx *Tx) tryCommitChanges() reason { return err } - var csAlloc allocCommitState - tx.file.allocator.fileCommitPrepare(&csAlloc, &tx.alloc, false) - - // 2. allocate new file pages for new meta data to be written - if err := tx.file.wal.fileCommitAlloc(tx, &csWAL); err != nil { - return err - } - csAlloc.updated = csAlloc.updated || len(csWAL.allocRegions) > 0 - - if err := tx.file.allocator.fileCommitAlloc(&csAlloc); err != nil { - return err - } + csAlloc := tx.commitPrepareAlloc() - // 3. serialize page mappings and new freelist - err = tx.file.wal.fileCommitSerialize(&csWAL, uint(tx.PageSize()), tx.scheduleWrite) + // 2. - 5. Commit changes to file + metaID, err := tx.tryCommitChangesToFile(&csWAL, &csAlloc) if err != nil { return err } - err = tx.file.allocator.fileCommitSerialize(&csAlloc, tx.scheduleWrite) - if err != nil { - return err - } - - // 4. sync all new contents and metadata before updating the ondisk meta page. - tx.file.writer.Sync(tx.writeSync, syncDataOnly) - - // 5. finalize on-disk transaction by writing new meta page. - tx.file.wal.fileCommitMeta(newMeta, &csWAL) - tx.file.allocator.fileCommitMeta(newMeta, &csAlloc) - metaID := tx.syncNewMeta(&newMetaBuf) - // 6. wait for all pages beeing written and synced, // before updating in memory state. err = tx.writeSync.Wait() @@ -471,6 +549,13 @@ func (tx *Tx) tryCommitChanges() reason { err = tx.file.truncate(requiredFileSz) } else if int(expectedMMapSize) > len(tx.file.mapped) { err = tx.file.mmapUpdate() + } else { + sz := expectedMMapSize + if sz < tx.file.size { + sz = tx.file.size + } + + tx.file.sizeEstimate = sz } if err != nil { return err @@ -488,6 +573,48 @@ func (tx *Tx) tryCommitChanges() reason { return nil } +func (tx *Tx) tryCommitChangesToFile( + csWAL *walCommitState, + csAlloc *allocCommitState, +) (metaID int, err reason) { + newMetaBuf := tx.prepareMetaBuffer() + newMeta := newMetaBuf.cast() + newMeta.root.Set(tx.rootID) // update data root + + // 2. allocate new file pages for new meta data to be written + if err := tx.file.wal.fileCommitAlloc(tx, csWAL); err != nil { + return metaID, err + } + csAlloc.updated = csAlloc.updated || len(csWAL.allocRegions) > 0 + + if err := tx.file.allocator.fileCommitAlloc(csAlloc); err != nil { + return metaID, err + } + + // 3. serialize page mappings and new freelist + err = tx.file.wal.fileCommitSerialize(csWAL, uint(tx.PageSize()), tx.scheduleCommitMetaWrite) + if err != nil { + return metaID, err + } + + err = tx.file.allocator.fileCommitSerialize(csAlloc, tx.scheduleCommitMetaWrite) + if err != nil { + return metaID, err + } + + // 4. sync all new contents and metadata before updating the ondisk meta page. + tx.file.writer.Sync(tx.writeSync, syncDataOnly) + + // 5. finalize on-disk transaction by writing new meta page. + tx.file.wal.fileCommitMeta(newMeta, csWAL) + tx.file.allocator.fileCommitMeta(newMeta, csAlloc) + metaID = tx.syncNewMeta(&newMetaBuf) + + // 6. wait for all pages beeing written and synced, + // before updating in memory state. + return metaID, nil +} + func checkTruncate( st *txAllocState, sz, mmapSz, maxSz int64, @@ -552,15 +679,36 @@ func (tx *Tx) commitPrepareWAL() (walCommitState, reason) { } if st.updated { - tx.metaAllocator().FreeRegions(&tx.alloc, tx.file.wal.metaPages) + tx.freeMetaRegions(tx.file.wal.metaPages) } return st, nil } +func (tx *Tx) commitPrepareAlloc() (state allocCommitState) { + tx.file.allocator.fileCommitPrepare(&state, &tx.alloc, false) + if state.updated { + tx.freeMetaRegions(tx.file.allocator.freelistPages) + } + return state +} + +func (tx *Tx) freeMetaRegions(rl regionList) { + tx.metaAllocator().FreeRegions(&tx.alloc, rl) +} + func (tx *Tx) access(id PageID) []byte { + tx.onAccess() return tx.file.mmapedPage(id) } +// scheduleCommitMetaWrite is used to schedule a page write for the file meta +// data like free list or page mappings. scheduleCommitMetaWrite must only be +// used during file updates in the commit phase. +func (tx *Tx) scheduleCommitMetaWrite(id PageID, buf []byte) reason { + tx.accessStats.New++ + return tx.scheduleWrite(id, buf) +} + func (tx *Tx) scheduleWrite(id PageID, buf []byte) reason { tx.file.writer.Schedule(tx.writeSync, id, buf) return nil @@ -583,6 +731,9 @@ func (tx *Tx) scheduleWrite(id PageID, buf []byte) reason { // - Truncate file only if pages in overflow area have been allocated. // - If maxSize == 0, truncate file to old end marker. func (tx *Tx) rollbackChanges() { + tracef("rollback changes in transaction: %p\n", tx) + tx.onRollback() + tx.file.allocator.Rollback(&tx.alloc) maxPages := tx.file.allocator.maxPages @@ -664,7 +815,7 @@ func (tx *Tx) Alloc() (page *Page, err error) { } err = tx.allocPagesWith(op, 1, func(p *Page) { page = p }) - return + return page, err } // AllocN allocates n potentially non-contious, yet empty pages. @@ -687,6 +838,7 @@ func (tx *Tx) AllocN(n int) (pages []*Page, err error) { if err != nil { return nil, err } + return pages, nil } @@ -714,6 +866,7 @@ func (tx *Tx) allocPagesWith(op string, n int, fn func(*Page)) reason { if count == 0 { return tx.err(op).of(OutOfMemory).reportf("not enough memory to allocate %v data page(s)", n) } + return nil } diff --git a/vendor/github.com/elastic/go-txfile/write.go b/vendor/github.com/elastic/go-txfile/write.go index e9f8578d93d9..5fa215764693 100644 --- a/vendor/github.com/elastic/go-txfile/write.go +++ b/vendor/github.com/elastic/go-txfile/write.go @@ -37,6 +37,8 @@ type writer struct { fsync []syncMsg fsync0 [8]syncMsg + syncMode SyncMode + pending int // number of scheduled writes since last sync published int // number of writes executed since last sync } @@ -90,8 +92,13 @@ const ( syncDataOnly ) -func (w *writer) Init(target writable, pageSize uint) { +func (w *writer) Init(target writable, pageSize uint, syncMode SyncMode) { + if syncMode == SyncDefault { + syncMode = SyncData + } + w.target = target + w.syncMode = syncMode w.pageSize = pageSize w.cond = sync.NewCond(&w.mux) w.scheduled = w.scheduled0[:0] @@ -175,24 +182,15 @@ func (w *writer) Run() (bool, reason) { // execute pending fsync: if fsync := cmd.fsync; fsync != nil { - const op = "txfile/write-sync" - - resetErr := cmd.syncFlags.Test(syncResetErr) if err == nil { - syncFlag := vfs.SyncAll - if cmd.syncFlags.Test(syncDataOnly) { - syncFlag = vfs.SyncDataOnly - } - - if syncErr := w.target.Sync(syncFlag); syncErr != nil { - err = errOp(op).causedBy(syncErr) - } + err = w.execSync(cmd) } fsync.err = err traceln("done fsync") fsync.Release() + resetErr := cmd.syncFlags.Test(syncResetErr) if resetErr { err = nil } @@ -200,6 +198,27 @@ func (w *writer) Run() (bool, reason) { } } +func (w *writer) execSync(cmd command) reason { + const op = "txfile/write-sync" + + syncFlag := vfs.SyncAll + switch w.syncMode { + case SyncNone: + return nil + + case SyncData: + if cmd.syncFlags.Test(syncDataOnly) { + syncFlag = vfs.SyncDataOnly + } + } + + if err := w.target.Sync(syncFlag); err != nil { + return errOp(op).causedBy(err) + } + + return nil +} + func (w *writer) nextCommand(buf []writeMsg) (command, bool) { w.mux.Lock() defer w.mux.Unlock() diff --git a/vendor/vendor.json b/vendor/vendor.json index 446fc2a79764..018e8e0d4a60 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -920,100 +920,124 @@ "revisionTime": "2019-01-07T12:18:35Z" }, { - "checksumSHA1": "tNszmkpuJYZMX8l8rlnvBDtoc1M=", + "checksumSHA1": "bNf3GDGhZh86bfCIMM5c5AYfo3g=", "path": "github.com/elastic/go-txfile", - "revision": "389b527ad365f6fc6cf5fa7e0ba5a2294ad2f3ed", - "revisionTime": "2018-08-07T16:52:25Z", - "version": "v0.0.3", - "versionExact": "v0.0.3" + "revision": "fa3d87c14381c01aaa1c7901877fb5e483294964", + "revisionTime": "2019-01-24T17:28:32Z", + "version": "v0.0.6", + "versionExact": "v0.0.6" + }, + { + "checksumSHA1": "scDqDI8APDj/tB973/ehmPufSLc=", + "path": "github.com/elastic/go-txfile/dev-tools/lib/mage/gotool", + "revision": "fa3d87c14381c01aaa1c7901877fb5e483294964", + "revisionTime": "2019-01-24T17:28:32Z", + "version": "v0.0.6", + "versionExact": "v0.0.6" + }, + { + "checksumSHA1": "/WzE2caCHChUpoRAlXDU5uLTU5I=", + "path": "github.com/elastic/go-txfile/dev-tools/lib/mage/mgenv", + "revision": "fa3d87c14381c01aaa1c7901877fb5e483294964", + "revisionTime": "2019-01-24T17:28:32Z", + "version": "v0.0.6", + "versionExact": "v0.0.6" + }, + { + "checksumSHA1": "kD/TFYofNqdOjb0nMN0w3LC/nNU=", + "path": "github.com/elastic/go-txfile/dev-tools/lib/mage/xbuild", + "revision": "fa3d87c14381c01aaa1c7901877fb5e483294964", + "revisionTime": "2019-01-24T17:28:32Z", + "version": "v0.0.6", + "versionExact": "v0.0.6" }, { "checksumSHA1": "re2W5hqGml/Q8vnx+DT3ooUNWxo=", "path": "github.com/elastic/go-txfile/internal/cleanup", - "revision": "389b527ad365f6fc6cf5fa7e0ba5a2294ad2f3ed", - "revisionTime": "2018-08-07T16:52:25Z", - "version": "v0.0.3", - "versionExact": "v0.0.3" + "revision": "fa3d87c14381c01aaa1c7901877fb5e483294964", + "revisionTime": "2019-01-24T17:28:32Z", + "version": "v0.0.6", + "versionExact": "v0.0.6" }, { "checksumSHA1": "HjNNDapvfXgOJqs7l7pS3ho6SSI=", "path": "github.com/elastic/go-txfile/internal/invariant", - "revision": "389b527ad365f6fc6cf5fa7e0ba5a2294ad2f3ed", - "revisionTime": "2018-08-07T16:52:25Z", - "version": "v0.0.3", - "versionExact": "v0.0.3" + "revision": "fa3d87c14381c01aaa1c7901877fb5e483294964", + "revisionTime": "2019-01-24T17:28:32Z", + "version": "v0.0.6", + "versionExact": "v0.0.6" }, { "checksumSHA1": "HLMF+V6Pt3YLUNOgmd2nR+vz9vM=", "path": "github.com/elastic/go-txfile/internal/iter", - "revision": "389b527ad365f6fc6cf5fa7e0ba5a2294ad2f3ed", - "revisionTime": "2018-08-07T16:52:25Z", - "version": "v0.0.3", - "versionExact": "v0.0.3" + "revision": "fa3d87c14381c01aaa1c7901877fb5e483294964", + "revisionTime": "2019-01-24T17:28:32Z", + "version": "v0.0.6", + "versionExact": "v0.0.6" }, { "checksumSHA1": "EAIqvdq5S3FNBoTBAI/U02AwTSU=", "path": "github.com/elastic/go-txfile/internal/strbld", - "revision": "389b527ad365f6fc6cf5fa7e0ba5a2294ad2f3ed", - "revisionTime": "2018-08-07T16:52:25Z", - "version": "v0.0.3", - "versionExact": "v0.0.3" + "revision": "fa3d87c14381c01aaa1c7901877fb5e483294964", + "revisionTime": "2019-01-24T17:28:32Z", + "version": "v0.0.6", + "versionExact": "v0.0.6" }, { "checksumSHA1": "0ThnGUs4X5RsvRRyNt31TqcOxLM=", "path": "github.com/elastic/go-txfile/internal/tracelog", - "revision": "389b527ad365f6fc6cf5fa7e0ba5a2294ad2f3ed", - "revisionTime": "2018-08-07T16:52:25Z", - "version": "v0.0.3", - "versionExact": "v0.0.3" + "revision": "fa3d87c14381c01aaa1c7901877fb5e483294964", + "revisionTime": "2019-01-24T17:28:32Z", + "version": "v0.0.6", + "versionExact": "v0.0.6" }, { "checksumSHA1": "lejstOrGPfa+tJohvIOK/AjdLa4=", "path": "github.com/elastic/go-txfile/internal/vfs", - "revision": "389b527ad365f6fc6cf5fa7e0ba5a2294ad2f3ed", - "revisionTime": "2018-08-07T16:52:25Z", - "version": "v0.0.3", - "versionExact": "v0.0.3" + "revision": "fa3d87c14381c01aaa1c7901877fb5e483294964", + "revisionTime": "2019-01-24T17:28:32Z", + "version": "v0.0.6", + "versionExact": "v0.0.6" }, { "checksumSHA1": "Wqp2VCpbcmfOFuZJrYkaxpvQQrE=", "path": "github.com/elastic/go-txfile/internal/vfs/osfs", - "revision": "389b527ad365f6fc6cf5fa7e0ba5a2294ad2f3ed", - "revisionTime": "2018-08-07T16:52:25Z", - "version": "v0.0.3", - "versionExact": "v0.0.3" + "revision": "fa3d87c14381c01aaa1c7901877fb5e483294964", + "revisionTime": "2019-01-24T17:28:32Z", + "version": "v0.0.6", + "versionExact": "v0.0.6" }, { "checksumSHA1": "x+Zecu6NkXTdGlX2s6npntDOiCQ=", "path": "github.com/elastic/go-txfile/internal/vfs/osfs/osfstest", - "revision": "389b527ad365f6fc6cf5fa7e0ba5a2294ad2f3ed", - "revisionTime": "2018-08-07T16:52:25Z", - "version": "v0.0.3", - "versionExact": "v0.0.3" + "revision": "fa3d87c14381c01aaa1c7901877fb5e483294964", + "revisionTime": "2019-01-24T17:28:32Z", + "version": "v0.0.6", + "versionExact": "v0.0.6" }, { - "checksumSHA1": "NO6sRhSBLtJxWPpTvwWEqSQh65I=", + "checksumSHA1": "LYeqHmalUZgk3oOHtJyPOKlM/j4=", "path": "github.com/elastic/go-txfile/pq", - "revision": "389b527ad365f6fc6cf5fa7e0ba5a2294ad2f3ed", - "revisionTime": "2018-08-07T16:52:25Z", - "version": "v0.0.3", - "versionExact": "v0.0.3" + "revision": "fa3d87c14381c01aaa1c7901877fb5e483294964", + "revisionTime": "2019-01-24T17:28:32Z", + "version": "v0.0.6", + "versionExact": "v0.0.6" }, { "checksumSHA1": "fCx++6A9uzyCsDUanAIJb77u0MI=", "path": "github.com/elastic/go-txfile/txerr", - "revision": "389b527ad365f6fc6cf5fa7e0ba5a2294ad2f3ed", - "revisionTime": "2018-08-07T16:52:25Z", - "version": "v0.0.3", - "versionExact": "v0.0.3" + "revision": "fa3d87c14381c01aaa1c7901877fb5e483294964", + "revisionTime": "2019-01-24T17:28:32Z", + "version": "v0.0.6", + "versionExact": "v0.0.6" }, { "checksumSHA1": "YMpex/CiJrYKGH05rpQyzdUaU2E=", "path": "github.com/elastic/go-txfile/txfiletest", - "revision": "389b527ad365f6fc6cf5fa7e0ba5a2294ad2f3ed", - "revisionTime": "2018-08-07T16:52:25Z", - "version": "v0.0.3", - "versionExact": "v0.0.3" + "revision": "fa3d87c14381c01aaa1c7901877fb5e483294964", + "revisionTime": "2019-01-24T17:28:32Z", + "version": "v0.0.6", + "versionExact": "v0.0.6" }, { "checksumSHA1": "Yb61Nqnh+3igFci61hv9WYgk/hc=",