Skip to content

Commit

Permalink
[libbeat] [cleanup] make spool.{Spool, Settings} internal (elastic#16693
Browse files Browse the repository at this point in the history
)
  • Loading branch information
faec authored Mar 5, 2020
1 parent 9ff5bc3 commit 76b6bdc
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ The list below covers the major changes between 7.0.0-rc2 and master only.
- Deprecate test flags, `generate` and `update_expected`, in favor of `data`. {pull}15292[15292]
- Python 3 is required now to run python tests and tools. {pull}14798[14798]
- The type `memqueue.Broker` is no longer exported; instead of `memqueue.NewBroker`, call `memqueue.NewQueue` (which provides the same public interface). {pull}16667[16667]
- The disk spool types `spool.Spool` and `spool.Settings` have been renamed to the internal types `spool.diskSpool` and `spool.settings`. {pull}16693[16693]
- `queue.Eventer` has been renamed to `queue.ACKListener` {pull}16691[16691]
- Require logger as first parameter for `outputs.transport.transport#ProxyDialer` and `outputs.elasticsearch.client#BulkReadItemStatus`. {pull}16761[16761]

Expand Down
2 changes: 1 addition & 1 deletion libbeat/publisher/queue/spool/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func create(
log = defaultLogger()
}

return NewSpool(log, path, Settings{
return newDiskSpool(log, path, settings{
ACKListener: ackListener,
Mode: config.File.Permissions,
WriteBuffer: uint(config.Write.BufferSize),
Expand Down
26 changes: 13 additions & 13 deletions libbeat/publisher/queue/spool/spool.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ import (
"github.com/elastic/go-txfile/pq"
)

// Spool implements an on-disk queue.Queue.
type Spool struct {
// diskSpool implements an on-disk queue.Queue.
type diskSpool struct {
// producer/input support
inCtx *spoolCtx
inBroker *inBroker
Expand All @@ -53,8 +53,8 @@ type spoolCtx struct {
done chan struct{}
}

// Settings configure a new spool to be created.
type Settings struct {
// settings configure a new spool to be created.
type settings struct {
Mode os.FileMode

File txfile.Options
Expand All @@ -76,8 +76,8 @@ type Settings struct {
const minInFlushTimeout = 100 * time.Millisecond
const minOutFlushTimeout = 0 * time.Millisecond

// NewSpool creates and initializes a new file based queue.
func NewSpool(logger logger, path string, settings Settings) (*Spool, error) {
// newDiskSpool creates and initializes a new file based queue.
func newDiskSpool(logger logger, path string, settings settings) (*diskSpool, error) {
mode := settings.Mode
if mode == 0 {
mode = os.ModePerm
Expand Down Expand Up @@ -115,7 +115,7 @@ func NewSpool(logger logger, path string, settings Settings) (*Spool, error) {
return nil, err
}

spool := &Spool{
spool := &diskSpool{
inCtx: inCtx,
outCtx: outCtx,
}
Expand Down Expand Up @@ -159,7 +159,7 @@ func NewSpool(logger logger, path string, settings Settings) (*Spool, error) {
}

// Close shuts down the queue and closes the used file.
func (s *Spool) Close() error {
func (s *diskSpool) Close() error {
// stop all workers (waits for all workers to be finished)
s.outCtx.Close()
s.inCtx.Close()
Expand All @@ -174,33 +174,33 @@ func (s *Spool) Close() error {
}

// BufferConfig returns the queue initial buffer settings.
func (s *Spool) BufferConfig() queue.BufferConfig {
func (s *diskSpool) BufferConfig() queue.BufferConfig {
return queue.BufferConfig{Events: -1}
}

// Producer creates a new queue producer for publishing events.
func (s *Spool) Producer(cfg queue.ProducerConfig) queue.Producer {
func (s *diskSpool) Producer(cfg queue.ProducerConfig) queue.Producer {
return s.inBroker.Producer(cfg)
}

// Consumer creates a new queue consumer for consuming and acking events.
func (s *Spool) Consumer() queue.Consumer {
func (s *diskSpool) Consumer() queue.Consumer {
return s.outBroker.Consumer()
}

// onFlush is run whenever the queue signals it's write buffer being flushed.
// Flush events are forwarded to all workers.
// The onFlush callback is directly called by the queue writer (same go-routine)
// on Write or Flush operations.
func (s *Spool) onFlush(n uint) {
func (s *diskSpool) onFlush(n uint) {
s.inBroker.onFlush(n)
s.outBroker.onFlush(n)
}

// onACK is run whenever the queue signals events being acked and removed from
// the queue.
// ACK events are forwarded to all workers.
func (s *Spool) onACK(events, pages uint) {
func (s *diskSpool) onACK(events, pages uint) {
s.inBroker.onACK(events, pages)
}

Expand Down
8 changes: 4 additions & 4 deletions libbeat/publisher/queue/spool/spool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ var seed int64
var debug bool

type testQueue struct {
*Spool
*diskSpool
teardown func()
}

Expand Down Expand Up @@ -104,7 +104,7 @@ func makeTestQueue(
logger = new(silentLogger)
}

spool, err := NewSpool(logger, path, Settings{
spool, err := newDiskSpool(logger, path, settings{
WriteBuffer: writeBuffer,
WriteFlushTimeout: flushTimeout,
Codec: codecCBORL,
Expand All @@ -119,13 +119,13 @@ func makeTestQueue(
t.Fatal(err)
}

tq := &testQueue{Spool: spool, teardown: cleanPath}
tq := &testQueue{diskSpool: spool, teardown: cleanPath}
return tq
}
}

func (t *testQueue) Close() error {
err := t.Spool.Close()
err := t.diskSpool.Close()
t.teardown()
return err
}
Expand Down

0 comments on commit 76b6bdc

Please sign in to comment.