diff --git a/freezer/freezer_url.go b/freezer/freezer_url.go index 02467b1..7cd65f9 100644 --- a/freezer/freezer_url.go +++ b/freezer/freezer_url.go @@ -2,11 +2,13 @@ package freezer import ( "fmt" + "io" "net/url" "strconv" "strings" "time" + "github.com/hashicorp/go-multierror" "github.com/uw-labs/freezer" "github.com/uw-labs/straw" "github.com/uw-labs/substrate" @@ -44,14 +46,13 @@ func newFreezerSink(u *url.URL) (substrate.AsyncMessageSink, error) { } } - var streamstore straw.StreamStore - var err error + var ( + store straw.StreamStore + err error + ) switch u.Scheme { case "freezer+dir": - streamstore, err = strawOpen("file:///") - if err != nil { - return nil, err - } + store, err = strawOpen("file:///") case "freezer+s3": u1 := url.URL{ Scheme: "s3", @@ -73,27 +74,43 @@ func newFreezerSink(u *url.URL) (substrate.AsyncMessageSink, error) { } u1.RawQuery = newVals.Encode() + store, err = strawOpen(u1.String()) - streamstore, err = strawOpen(u1.String()) - if err != nil { - return nil, err - } default: return nil, fmt.Errorf("unsupported scheme : %s", u.Scheme) } + if err != nil { + return nil, err + } conf := AsyncMessageSinkConfig{ - StreamStore: streamstore, + StreamStore: store, MaxUnflushedMessages: maxUnflushed, FreezerConfig: freezer.MessageSinkConfig{ Path: u.Path, CompressionType: ct, }, } - return sinker(conf) + sink, err := sinker(conf) + if err != nil { + return nil, err + } + return closeStoreSink{ + AsyncMessageSink: sink, + store: store, + }, nil } var sinker = NewAsyncMessageSink +type closeStoreSink struct { + substrate.AsyncMessageSink + store straw.StreamStore +} + +func (sink closeStoreSink) Close() (err error) { + return closeAll([]io.Closer{sink.AsyncMessageSink, sink.store}) +} + func newFreezerSource(u *url.URL) (substrate.AsyncMessageSource, error) { q := u.Query() @@ -107,21 +124,17 @@ func newFreezerSource(u *url.URL) (substrate.AsyncMessageSource, error) { return nil, fmt.Errorf("unknown compression type : %s", cts) } + var ( + store straw.StreamStore + err error + ) + switch u.Scheme { case "freezer+dir": - ss, err := strawOpen("file:///") + store, err = strawOpen("file:///") if err != nil { return nil, err } - conf := AsyncMessageSourceConfig{ - StreamStore: ss, - FreezerConfig: freezer.MessageSourceConfig{ - Path: u.Path, - PollPeriod: 10 * time.Second, - CompressionType: ct, - }, - } - return sourcer(conf) case "freezer+s3": u1 := url.URL{Scheme: "s3", Host: u.Hostname()} @@ -139,26 +152,51 @@ func newFreezerSource(u *url.URL) (substrate.AsyncMessageSource, error) { } } u1.RawQuery = newVals.Encode() + store, err = strawOpen(u1.String()) - ss, err := strawOpen(u1.String()) - if err != nil { - return nil, err - } - conf := AsyncMessageSourceConfig{ - StreamStore: ss, - FreezerConfig: freezer.MessageSourceConfig{ - Path: u.Path, - PollPeriod: 10 * time.Second, - CompressionType: ct, - }, - } - return sourcer(conf) default: return nil, fmt.Errorf("unsupported scheme : %s", u.Scheme) } + if err != nil { + return nil, err + } + conf := AsyncMessageSourceConfig{ + StreamStore: store, + FreezerConfig: freezer.MessageSourceConfig{ + Path: u.Path, + PollPeriod: 10 * time.Second, + CompressionType: ct, + }, + } + source, err := sourcer(conf) + if err != nil { + return nil, err + } + return &closeStoreSource{ + AsyncMessageSource: source, + store: store, + }, nil } var sourcer = NewAsyncMessageSource +type closeStoreSource struct { + substrate.AsyncMessageSource + store straw.StreamStore +} + +func (sink closeStoreSource) Close() (err error) { + return closeAll([]io.Closer{sink.AsyncMessageSource, sink.store}) +} + var strawOpen = straw.Open + +func closeAll(closers []io.Closer) (outErr error) { + for _, c := range closers { + if err := c.Close(); err != nil { + outErr = multierror.Append(outErr, err) + } + } + return outErr +}