diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 32700e3659f..e6cb328d18d 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -7,38 +7,38 @@ "Deps": [ { "ImportPath": "github.com/elastic/libbeat/beat", - "Comment": "v1.0.0-beta4-53-g2155fb7", - "Rev": "2155fb77c3140e94eaa3050ef8315468696833b4" + "Comment": "v1.0.0-beta4-83-gd9d28e4", + "Rev": "d9d28e4e8f5ff1faf4f53163dfc31852c3101239" }, { "ImportPath": "github.com/elastic/libbeat/cfgfile", - "Comment": "v1.0.0-beta4-53-g2155fb7", - "Rev": "2155fb77c3140e94eaa3050ef8315468696833b4" + "Comment": "v1.0.0-beta4-83-gd9d28e4", + "Rev": "d9d28e4e8f5ff1faf4f53163dfc31852c3101239" }, { "ImportPath": "github.com/elastic/libbeat/common", - "Comment": "v1.0.0-beta4-53-g2155fb7", - "Rev": "2155fb77c3140e94eaa3050ef8315468696833b4" + "Comment": "v1.0.0-beta4-83-gd9d28e4", + "Rev": "d9d28e4e8f5ff1faf4f53163dfc31852c3101239" }, { "ImportPath": "github.com/elastic/libbeat/logp", - "Comment": "v1.0.0-beta4-53-g2155fb7", - "Rev": "2155fb77c3140e94eaa3050ef8315468696833b4" + "Comment": "v1.0.0-beta4-83-gd9d28e4", + "Rev": "d9d28e4e8f5ff1faf4f53163dfc31852c3101239" }, { "ImportPath": "github.com/elastic/libbeat/outputs", - "Comment": "v1.0.0-beta4-53-g2155fb7", - "Rev": "2155fb77c3140e94eaa3050ef8315468696833b4" + "Comment": "v1.0.0-beta4-83-gd9d28e4", + "Rev": "d9d28e4e8f5ff1faf4f53163dfc31852c3101239" }, { "ImportPath": "github.com/elastic/libbeat/publisher", - "Comment": "v1.0.0-beta4-53-g2155fb7", - "Rev": "2155fb77c3140e94eaa3050ef8315468696833b4" + "Comment": "v1.0.0-beta4-83-gd9d28e4", + "Rev": "d9d28e4e8f5ff1faf4f53163dfc31852c3101239" }, { "ImportPath": "github.com/elastic/libbeat/service", - "Comment": "v1.0.0-beta4-53-g2155fb7", - "Rev": "2155fb77c3140e94eaa3050ef8315468696833b4" + "Comment": "v1.0.0-beta4-83-gd9d28e4", + "Rev": "d9d28e4e8f5ff1faf4f53163dfc31852c3101239" }, { "ImportPath": "github.com/garyburd/redigo/internal", diff --git a/Godeps/_workspace/src/github.com/elastic/libbeat/logp/log.go b/Godeps/_workspace/src/github.com/elastic/libbeat/logp/log.go index ea7f8522f8c..5bfac072c90 100644 --- a/Godeps/_workspace/src/github.com/elastic/libbeat/logp/log.go +++ b/Godeps/_workspace/src/github.com/elastic/libbeat/logp/log.go @@ -45,15 +45,20 @@ func debugMessage(calldepth int, selector, format string, v ...interface{}) { return } } - if _log.toSyslog { - _log.syslog[LOG_INFO].Output(calldepth, fmt.Sprintf(format, v...)) - } - if _log.toStderr { - _log.logger.Output(calldepth, fmt.Sprintf("DBG "+format, v...)) - } - if _log.toFile { - _log.rotator.WriteLine([]byte(fmt.Sprintf("DBG "+format, v...))) - } + + send(calldepth+1, LOG_DEBUG, "DBG ", format, v...) + } +} + +func send(calldepth int, level Priority, prefix string, format string, v ...interface{}) { + if _log.toSyslog { + _log.syslog[level].Output(calldepth, fmt.Sprintf(format, v...)) + } + if _log.toStderr { + _log.logger.Output(calldepth, fmt.Sprintf(prefix+format, v...)) + } + if _log.toFile { + _log.rotator.WriteLine([]byte(fmt.Sprintf(prefix+format, v...))) } } @@ -73,15 +78,7 @@ func IsDebug(selector string) bool { func msg(level Priority, prefix string, format string, v ...interface{}) { if _log.level >= level { - if _log.toSyslog { - _log.syslog[level].Output(3, fmt.Sprintf(format, v...)) - } - if _log.toStderr { - _log.logger.Output(3, fmt.Sprintf(prefix+format, v...)) - } - if _log.toFile { - _log.rotator.WriteLine([]byte(fmt.Sprintf(prefix+format, v...))) - } + send(4, level, prefix, format, v...) } } @@ -130,24 +127,20 @@ func LogInit(level Priority, prefix string, toSyslog bool, toStderr bool, debugS } if _log.toSyslog { - for prio := LOG_EMERG; prio <= LOG_DEBUG; prio++ { - _log.syslog[prio] = openSyslog(prio, prefix) - if _log.syslog[prio] == nil { - // syslog not available - _log.toSyslog = false - break - } - } + SetToSyslog(true, prefix) } + if _log.toStderr { - _log.logger = log.New(os.Stderr, prefix, log.Lshortfile) + SetToStderr(true, prefix) } } func SetToStderr(toStderr bool, prefix string) { _log.toStderr = toStderr if _log.toStderr { - _log.logger = log.New(os.Stderr, prefix, log.Lshortfile) + // Add timestamp + flag := log.Ldate | log.Ltime | log.Lmicroseconds | log.LUTC | log.Lshortfile + _log.logger = log.New(os.Stderr, prefix, flag) } } @@ -156,6 +149,11 @@ func SetToSyslog(toSyslog bool, prefix string) { if _log.toSyslog { for prio := LOG_EMERG; prio <= LOG_DEBUG; prio++ { _log.syslog[prio] = openSyslog(prio, prefix) + if _log.syslog[prio] == nil { + // syslog not available + _log.toSyslog = false + break + } } } } diff --git a/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/elasticsearch/api.go b/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/elasticsearch/api.go index 4bc4832be89..db7c30d13f6 100644 --- a/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/elasticsearch/api.go +++ b/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/elasticsearch/api.go @@ -83,84 +83,90 @@ func readCountResult(obj []byte) (*CountResults, error) { // searchable. In case id is empty, a new id is created over a HTTP POST request. // Otherwise, a HTTP PUT request is issued. // Implements: http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html -func (es *Client) Index( +func (es *Connection) Index( index, docType, id string, params map[string]string, body interface{}, -) (*QueryResult, error) { +) (int, *QueryResult, error) { method := "PUT" if id == "" { method = "POST" } - resp, err := es.apiCall(method, index, docType, id, params, body) + status, resp, err := es.apiCall(method, index, docType, id, params, body) if err != nil { - return nil, err + return status, nil, err } - return readQueryResult(resp) + result, err := readQueryResult(resp) + return status, result, err } // Refresh an index. Call this after doing inserts or creating/deleting // indexes in unit tests. -func (es *Client) Refresh(index string) (*QueryResult, error) { - resp, err := es.apiCall("POST", index, "", "_refresh", nil, nil) +func (es *Connection) Refresh(index string) (int, *QueryResult, error) { + status, resp, err := es.apiCall("POST", index, "", "_refresh", nil, nil) if err != nil { - return nil, err + return status, nil, err } - return readQueryResult(resp) + result, err := readQueryResult(resp) + return status, result, err } // CreateIndex creates a new index, optionally with settings and mappings passed in // the body. // Implements: https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html // -func (es *Client) CreateIndex(index string, body interface{}) (*QueryResult, error) { - resp, err := es.apiCall("PUT", index, "", "", nil, body) +func (es *Connection) CreateIndex(index string, body interface{}) (int, *QueryResult, error) { + status, resp, err := es.apiCall("PUT", index, "", "", nil, body) if err != nil { - return nil, err + return status, nil, err } - return readQueryResult(resp) + result, err := readQueryResult(resp) + return status, result, err } // Delete deletes a typed JSON document from a specific index based on its id. // Implements: http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete.html -func (es *Client) Delete(index string, docType string, id string, params map[string]string) (*QueryResult, error) { - resp, err := es.apiCall("DELETE", index, docType, id, params, nil) +func (es *Connection) Delete(index string, docType string, id string, params map[string]string) (int, *QueryResult, error) { + status, resp, err := es.apiCall("DELETE", index, docType, id, params, nil) if err != nil { - return nil, err + return status, nil, err } - return readQueryResult(resp) + result, err := readQueryResult(resp) + return status, result, err } // A search request can be executed purely using a URI by providing request parameters. // Implements: http://www.elastic.co/guide/en/elasticsearch/reference/current/search-uri-request.html -func (es *Client) SearchURI(index string, docType string, params map[string]string) (*SearchResults, error) { - resp, err := es.apiCall("GET", index, docType, "_search", params, nil) +func (es *Connection) SearchURI(index string, docType string, params map[string]string) (int, *SearchResults, error) { + status, resp, err := es.apiCall("GET", index, docType, "_search", params, nil) if err != nil { - return nil, err + return status, nil, err } - return readSearchResult(resp) + result, err := readSearchResult(resp) + return status, result, err } -func (es *Client) CountSearchURI( +func (es *Connection) CountSearchURI( index string, docType string, params map[string]string, -) (*CountResults, error) { - resp, err := es.apiCall("GET", index, docType, "_count", params, nil) +) (int, *CountResults, error) { + status, resp, err := es.apiCall("GET", index, docType, "_count", params, nil) if err != nil { - return nil, err + return status, nil, err } - return readCountResult(resp) + result, err := readCountResult(resp) + return status, result, err } -func (es *Client) apiCall( +func (es *Connection) apiCall( method, index, docType, id string, params map[string]string, body interface{}, -) ([]byte, error) { +) (int, []byte, error) { path, err := makePath(index, docType, id) if err != nil { - return nil, err + return 0, nil, err } return es.request(method, path, params, body) } diff --git a/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/elasticsearch/bulkapi.go b/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/elasticsearch/bulkapi.go index 87a3d6f8a15..5ce149142c4 100644 --- a/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/elasticsearch/bulkapi.go +++ b/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/elasticsearch/bulkapi.go @@ -27,6 +27,10 @@ type bulkMetaIndex struct { DocType string `json:"_type"` } +type BulkResult struct { + Items []json.RawMessage `json:"items"` +} + func (r *bulkRequest) Send(meta, obj interface{}) error { var err error @@ -40,19 +44,20 @@ func (r *bulkRequest) Send(meta, obj interface{}) error { return err } -func (r *bulkRequest) Flush() (*QueryResult, error) { +func (r *bulkRequest) Flush() (int, *BulkResult, error) { if r.buf.Len() == 0 { logp.Debug("elasticsearch", "Empty channel. Wait for more data.") - return nil, nil + return 0, nil, nil } - resp, err := r.es.sendBulkRequest("POST", r.path, r.params, &r.buf) + status, resp, err := r.es.sendBulkRequest("POST", r.path, r.params, &r.buf) if err != nil { - return nil, err + return status, nil, err } r.buf.Truncate(0) - return readQueryResult(resp) + result, err := readBulkResult(resp) + return status, result, err } // Bulk performs many index/delete operations in a single API call. @@ -90,7 +95,7 @@ func (conn *Connection) BulkWith( return nil, nil } - resp, err := conn.sendBulkRequest("POST", path, params, &buf) + _, resp, err := conn.sendBulkRequest("POST", path, params, &buf) if err != nil { return nil, err } @@ -120,7 +125,7 @@ func (conn *Connection) sendBulkRequest( method, path string, params map[string]string, buf *bytes.Buffer, -) ([]byte, error) { +) (int, []byte, error) { url := makeURL(conn.URL, path, params) logp.Debug("elasticsearch", "Sending bulk request to %s", url) @@ -154,3 +159,16 @@ func bulkEncode(metaBuilder MetaBuilder, body []interface{}) bytes.Buffer { } return buf } + +func readBulkResult(obj []byte) (*BulkResult, error) { + if obj == nil { + return nil, nil + } + + var result BulkResult + err := json.Unmarshal(obj, &result) + if err != nil { + return nil, err + } + return &result, nil +} diff --git a/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/elasticsearch/client.go b/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/elasticsearch/client.go index 06e59fb0c31..7f465da0a2c 100644 --- a/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/elasticsearch/client.go +++ b/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/elasticsearch/client.go @@ -12,6 +12,7 @@ import ( "github.com/elastic/libbeat/common" "github.com/elastic/libbeat/logp" + "github.com/elastic/libbeat/outputs/mode" ) type Client struct { @@ -64,44 +65,129 @@ func (client *Client) Clone() *Client { func (client *Client) PublishEvents( events []common.MapStr, -) (n int, err error) { +) ([]common.MapStr, error) { if !client.connected { - return 0, ErrNotConnected + return events, ErrNotConnected } + // new request to store all events into request, err := client.startBulkRequest("", "", nil) if err != nil { - logp.Err( - "Failed to perform many index operations in a single API call: %s", - err) - return 0, err + logp.Err("Failed to perform any bulk index operations: %s", err) + return events, err } + // encode events into bulk request buffer, dropping failed elements from + // events slice + events = bulkEncodePublishRequest(request, client.index, events) + if len(events) == 0 { + return nil, nil + } + + // send bulk request + _, res, err := request.Flush() + if err != nil { + logp.Err("Failed to perform any bulk index operations: %s", err) + return events, err + } + + // check response for transient errors + events = bulkCollectPublishFails(res, events) + if len(events) > 0 { + return events, mode.ErrTempBulkFailure + } + + return nil, nil +} + +// fillBulkRequest encodes all bulk requests and returns slice of events +// successfully added to bulk request. +func bulkEncodePublishRequest( + requ *bulkRequest, + index string, + events []common.MapStr, +) []common.MapStr { + okEvents := events[:0] for _, event := range events { - ts := time.Time(event["@timestamp"].(common.Time)) - index := fmt.Sprintf("%s-%d.%02d.%02d", - client.index, ts.Year(), ts.Month(), ts.Day()) - meta := bulkMeta{ - Index: bulkMetaIndex{ - Index: index, - DocType: event["type"].(string), - }, - } - err := request.Send(meta, event) + meta := eventBulkMeta(index, event) + err := requ.Send(meta, event) if err != nil { logp.Err("Failed to encode event: %s", err) + continue } + + okEvents = append(okEvents, event) } + return okEvents +} - _, err = request.Flush() +func eventBulkMeta(index string, event common.MapStr) bulkMeta { + ts := time.Time(event["@timestamp"].(common.Time)).UTC() + index = fmt.Sprintf("%s-%d.%02d.%02d", index, + ts.Year(), ts.Month(), ts.Day()) + meta := bulkMeta{ + Index: bulkMetaIndex{ + Index: index, + DocType: event["type"].(string), + }, + } + return meta +} + +// bulkCollectPublishFails checks per item errors returning all events +// to be tried again due to error code returned for that items. If indexing an +// event failed due to some error in the event itself (e.g. does not respect mapping), +// the event will be dropped. +func bulkCollectPublishFails( + res *BulkResult, + events []common.MapStr, +) []common.MapStr { + failed := events[:0] + for i, rawItem := range res.Items { + status, msg, err := itemStatus(rawItem) + if err != nil { + logp.Info("Failed to parse bulk reponse for item (%i): %v", i, err) + // add index if response parse error as we can not determine success/fail + failed = append(failed, events[i]) + continue + } + + if status < 300 { + continue // ok value + } + + if status < 500 && status != 429 { + // hard failure, don't collect + logp.Warn("Can not index event (status=%v): %v", status, msg) + continue + } + + debug("Failed to insert data(%v): %v", i, events[i]) + logp.Info("Bulk item insert failed (i=%v, status=%v): %v", i, status, msg) + failed = append(failed, events[i]) + } + return failed +} + +func itemStatus(m json.RawMessage) (int, string, error) { + var item map[string]struct { + Status int `json:"status"` + Error string `json:"error"` + } + + err := json.Unmarshal(m, &item) if err != nil { - logp.Err( - "Failed to perform many index operations in a single API call; %s", - err) - return 0, err + logp.Err("Failed to parse bulk response item: %s", err) + return 0, "", err + } + + for _, r := range item { + return r.Status, r.Error, nil } - return len(events), nil + err = ErrResponseRead + logp.Err("%v", err) + return 0, "", err } func (client *Client) PublishEvent(event common.MapStr) error { @@ -115,16 +201,30 @@ func (client *Client) PublishEvent(event common.MapStr) error { logp.Debug("output_elasticsearch", "Publish event: %s", event) // insert the events one by one - _, err := client.Index(index, event["type"].(string), "", nil, event) + status, _, err := client.Index(index, event["type"].(string), "", nil, event) if err != nil { logp.Warn("Fail to insert a single event: %s", err) + if err == ErrJSONEncodeFailed { + // don't retry unencodable values + return nil + } + } + switch { + case status == 0: // event was not send yet + return nil + case status >= 500 || status == 429: // server error, retry + return err + case status >= 300 && status < 500: + // won't be able to index event in Elasticsearch => don't retry + return nil } + return nil } func (conn *Connection) Connect(timeout time.Duration) error { var err error - conn.connected, err = conn.Ping() + conn.connected, err = conn.Ping(timeout) if err != nil { return err } @@ -134,8 +234,8 @@ func (conn *Connection) Connect(timeout time.Duration) error { return nil } -func (conn *Connection) Ping() (bool, error) { - conn.http.Timeout = defaultEsOpenTimeout +func (conn *Connection) Ping(timeout time.Duration) (bool, error) { + conn.http.Timeout = timeout resp, err := conn.http.Head(conn.URL) if err != nil { return false, err @@ -159,7 +259,7 @@ func (conn *Connection) request( method, path string, params map[string]string, body interface{}, -) ([]byte, error) { +) (int, []byte, error) { url := makeURL(conn.URL, path, params) logp.Debug("elasticsearch", "%s %s %s", method, url, body) @@ -168,7 +268,7 @@ func (conn *Connection) request( var err error obj, err = json.Marshal(body) if err != nil { - return nil, ErrJSONEncodeFailed + return 0, nil, ErrJSONEncodeFailed } } @@ -178,11 +278,11 @@ func (conn *Connection) request( func (conn *Connection) execRequest( method, url string, body io.Reader, -) ([]byte, error) { +) (int, []byte, error) { req, err := http.NewRequest(method, url, body) if err != nil { logp.Warn("Failed to create request", err) - return nil, err + return 0, nil, err } req.Header.Add("Accept", "application/json") @@ -193,22 +293,22 @@ func (conn *Connection) execRequest( resp, err := conn.http.Do(req) if err != nil { conn.connected = false - return nil, err + return 0, nil, err } defer closing(resp.Body) status := resp.StatusCode if status >= 300 { conn.connected = false - return nil, fmt.Errorf("%v", resp.Status) + return status, nil, fmt.Errorf("%v", resp.Status) } obj, err := ioutil.ReadAll(resp.Body) if err != nil { conn.connected = false - return nil, err + return status, nil, err } - return obj, nil + return status, obj, nil } func closing(c io.Closer) { diff --git a/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/elasticsearch/output.go b/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/elasticsearch/output.go index 5e754673add..9f2a7c6f3d4 100644 --- a/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/elasticsearch/output.go +++ b/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/elasticsearch/output.go @@ -19,14 +19,17 @@ var ( // ErrJSONEncodeFailed indicates encoding failures ErrJSONEncodeFailed = errors.New("json encode failed") + + // ErrResponseRead indicates error parsing Elasticsearch response + ErrResponseRead = errors.New("bulk item status parse failed.") ) const ( - defaultEsOpenTimeout = 3000 * time.Millisecond - defaultMaxRetries = 3 - elasticsearchDefaultTimeout = 30 * time.Second + defaultBulkSize = 50 + + elasticsearchDefaultTimeout = 90 * time.Second ) func init() { @@ -66,6 +69,12 @@ func (out *elasticsearchOutput) init( return err } + // configure bulk size in config in case it is not set + if config.BulkMaxSize == nil { + bulkSize := defaultBulkSize + config.BulkMaxSize = &bulkSize + } + clients, err := mode.MakeClients(config, makeClientFactory(beat, tlsConfig, config)) if err != nil { return err @@ -93,7 +102,8 @@ func (out *elasticsearchOutput) init( } else { loadBalance := config.LoadBalance == nil || *config.LoadBalance if loadBalance { - m, err = mode.NewLoadBalancerMode(clients, maxRetries, waitRetry, timeout) + m, err = mode.NewLoadBalancerMode(clients, maxRetries, + waitRetry, timeout, maxWaitRetry) } else { m, err = mode.NewFailOverConnectionMode(clients, maxRetries, waitRetry, timeout) } diff --git a/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/elasticsearch/topology.go b/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/elasticsearch/topology.go index 32ba0e31cac..f4c388e9719 100644 --- a/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/elasticsearch/topology.go +++ b/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/elasticsearch/topology.go @@ -54,8 +54,8 @@ func (t *topology) EnableTTL() error { // already exists. If index could not be created, next api call to index will // fail anyway. index := ".packetbeat-topology" - _, _ = client.CreateIndex(index, nil) - _, err := client.Index(index, "server-ip", "_mapping", nil, setting) + _, _, _ = client.CreateIndex(index, nil) + _, _, err := client.Index(index, "server-ip", "_mapping", nil, setting) if err != nil { return err } @@ -96,7 +96,7 @@ func (t *topology) PublishIPs(name string, localAddrs []string) error { "ttl": fmt.Sprintf("%dms", t.TopologyExpire), "refresh": "true", } - _, err := client.Index( + _, _, err := client.Index( ".packetbeat-topology", //index "server-ip", //type name, // id @@ -126,14 +126,14 @@ func loadTopolgyMap(client *Client) (map[string]string, error) { docType := "server-ip" // get number of entries in index for search query to return all entries in one query - cntRes, err := client.CountSearchURI(index, docType, nil) + _, cntRes, err := client.CountSearchURI(index, docType, nil) if err != nil { logp.Err("Getting topology map fails with: %s", err) return nil, err } params := map[string]string{"size": strconv.Itoa(cntRes.Count)} - res, err := client.SearchURI(index, docType, params) + _, res, err := client.SearchURI(index, docType, params) if err != nil { logp.Err("Getting topology map fails with: %s", err) return nil, err diff --git a/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/elasticsearch/url.go b/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/elasticsearch/url.go index 0a2a00c5df0..68f760aca0a 100644 --- a/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/elasticsearch/url.go +++ b/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/elasticsearch/url.go @@ -10,6 +10,11 @@ import ( // Creates the url based on the url configuration. // Adds missing parts with defaults (scheme, host, port) func getURL(defaultScheme string, defaultPath string, rawURL string) (string, error) { + + if defaultScheme == "" { + defaultScheme = "http" + } + addr, err := url.Parse(rawURL) if err != nil { return "", err diff --git a/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/fileout/file.go b/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/fileout/file.go index d2bda1477ea..b64e6a53eef 100644 --- a/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/fileout/file.go +++ b/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/fileout/file.go @@ -42,7 +42,7 @@ func (out *fileOutput) init(beat string, config *outputs.MothershipConfig, topol // disable bulk support configDisableInt := -1 config.Flush_interval = &configDisableInt - config.Bulk_size = &configDisableInt + config.BulkMaxSize = &configDisableInt rotateeverybytes := uint64(config.Rotate_every_kb) * 1024 if rotateeverybytes == 0 { diff --git a/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/logstash/client.go b/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/logstash/client.go index e876186d581..f17d2830f24 100644 --- a/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/logstash/client.go +++ b/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/logstash/client.go @@ -65,9 +65,11 @@ func (l *lumberjackClient) PublishEvent(event common.MapStr) error { return err } -func (l *lumberjackClient) PublishEvents(events []common.MapStr) (int, error) { +func (l *lumberjackClient) PublishEvents( + events []common.MapStr, +) ([]common.MapStr, error) { if len(events) == 0 { - return 0, nil + return nil, nil } // prepare message payload @@ -76,7 +78,7 @@ func (l *lumberjackClient) PublishEvents(events []common.MapStr) (int, error) { } count, payload, err := l.compressEvents(events) if err != nil { - return 0, err + return events, err } if count == 0 { @@ -84,17 +86,17 @@ func (l *lumberjackClient) PublishEvents(events []common.MapStr) (int, error) { // as exported so no one tries to send/encode the same events once again // The compress/encode function already prints critical per failed encoding // failure. - return len(events), nil + return nil, nil } // send window size: if err = l.sendWindowSize(count); err != nil { - return l.onFail(0, err) + return l.onFail(events, err) } // send payload if err = l.sendCompressed(payload); err != nil { - return l.onFail(0, err) + return l.onFail(events, err) } // wait for ACK (accept partial ACK to reset timeout) @@ -104,7 +106,7 @@ func (l *lumberjackClient) PublishEvents(events []common.MapStr) (int, error) { // read until all acks ackSeq, err = l.readACK() if err != nil { - return l.onFail(ackSeq, err) + return l.onFail(events[ackSeq:], err) } } @@ -127,23 +129,26 @@ func (l *lumberjackClient) PublishEvents(events []common.MapStr) (int, error) { } } - return len(events), nil + return nil, nil } -func (l *lumberjackClient) onFail(ackSeq uint32, err error) (int, error) { +func (l *lumberjackClient) onFail( + events []common.MapStr, + err error, +) ([]common.MapStr, error) { // if timeout error, back off and ignore error nerr, ok := err.(net.Error) if !ok || !nerr.Timeout() { // no timeout error, close connection and return error _ = l.Close() - return int(ackSeq), err + return events, err } // if we've seen 3 consecutive timeout errors, close connection l.countTimeoutErr++ if l.countTimeoutErr == maxAllowedTimeoutErr { _ = l.Close() - return int(ackSeq), err + return events, err } // timeout error. reduce window size and return 0 published events. Send @@ -153,7 +158,7 @@ func (l *lumberjackClient) onFail(ackSeq uint32, err error) (int, error) { if l.windowSize < minWindowSize { l.windowSize = minWindowSize } - return int(ackSeq), nil + return events, nil } func (l *lumberjackClient) compressEvents( diff --git a/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/logstash/logstash.go b/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/logstash/logstash.go index 480897a7cc8..3c51a547f4c 100644 --- a/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/logstash/logstash.go +++ b/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/logstash/logstash.go @@ -42,11 +42,14 @@ type logstash struct { const ( logstashDefaultPort = 10200 - logstashDefaultTimeout = 30 * time.Second - defaultSendRetries = 3 + logstashDefaultTimeout = 30 * time.Second + logstasDefaultMaxTimeout = 90 * time.Second + defaultSendRetries = 3 ) var waitRetry = time.Duration(1) * time.Second + +// NOTE: maxWaitRetry has no effect on mode, as logstash client currently does not return ErrTempBulkFailure var maxWaitRetry = time.Duration(60) * time.Second func (lj *logstash) init( @@ -104,7 +107,8 @@ func (lj *logstash) init( } else { loadBalance := config.LoadBalance != nil && *config.LoadBalance if loadBalance { - m, err = mode.NewLoadBalancerMode(clients, sendRetries, waitRetry, timeout) + m, err = mode.NewLoadBalancerMode(clients, sendRetries, + waitRetry, timeout, maxWaitRetry) } else { m, err = mode.NewFailOverConnectionMode(clients, sendRetries, waitRetry, timeout) } diff --git a/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/mode/balance.go b/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/mode/balance.go index 72c4c96af29..7d5a5ffdd2a 100644 --- a/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/mode/balance.go +++ b/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/mode/balance.go @@ -32,8 +32,9 @@ import ( // Distributing events to workers is subject to timeout. If no worker is available to // pickup a message for sending, the message will be dropped internally. type LoadBalancerMode struct { - timeout time.Duration // send/retry timeout. Every timeout is a failed send attempt - waitRetry time.Duration // duration to wait during re-connection attempts + timeout time.Duration // Send/retry timeout. Every timeout is a failed send attempt + waitRetry time.Duration // Duration to wait during re-connection attempts. + maxWaitRetry time.Duration // Maximum send/retry timeout in backoff case. // maximum number of configured send attempts. If set to 0, publisher will // block until event has been successfully published. @@ -65,12 +66,13 @@ type eventsMessage struct { func NewLoadBalancerMode( clients []ProtocolClient, maxAttempts int, - waitRetry, timeout time.Duration, + waitRetry, timeout, maxWaitRetry time.Duration, ) (*LoadBalancerMode, error) { m := &LoadBalancerMode{ - timeout: timeout, - waitRetry: waitRetry, - maxAttempts: maxAttempts, + timeout: timeout, + maxWaitRetry: maxWaitRetry, + waitRetry: waitRetry, + maxAttempts: maxAttempts, work: make(chan eventsMessage), retries: make(chan eventsMessage, len(clients)*2), @@ -172,24 +174,51 @@ func (m *LoadBalancerMode) onMessage(client ProtocolClient, msg eventsMessage) { return } } else { - published := 0 events := msg.events - send := 0 - for published < len(events) { - n, err := client.PublishEvents(events[published:]) - if err != nil { - // retry only non-confirmed subset of events in batch - msg.events = msg.events[published:] + total := len(events) + var backoffCount uint + + for len(events) > 0 { + var err error - // reset attempt count if subset of message has been send - if send > 0 { + events, err = client.PublishEvents(events) + if err != nil { + // reset attempt count if subset of messages has been processed + if len(events) < total { msg.attemptsLeft = m.maxAttempts + 1 } - m.onFail(msg, err) - return + + if err != ErrTempBulkFailure { + // retry non-published subset of events in batch + msg.events = events + m.onFail(msg, err) + return + } + + msg.attemptsLeft-- + if m.maxAttempts > 0 && msg.attemptsLeft <= 0 { + // no more attempts left => drop + outputs.SignalFailed(msg.signaler, err) + return + } + + // wait before retry + backoff := time.Duration(int64(m.waitRetry) * (1 << backoffCount)) + if backoff > m.maxWaitRetry { + backoff = m.maxWaitRetry + } else { + backoffCount++ + } + select { + case <-m.done: // shutdown + outputs.SignalFailed(msg.signaler, err) + return + case <-time.After(backoff): + } + + // reset total count for temporary failure loop + total = len(events) } - published += n - send++ } } outputs.SignalCompleted(msg.signaler) diff --git a/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/mode/failover.go b/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/mode/failover.go index d4597935e05..7a7255a6d3b 100644 --- a/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/mode/failover.go +++ b/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/mode/failover.go @@ -105,10 +105,9 @@ func (f *FailOverConnectionMode) connect(active int) error { // unavailable. On failure PublishEvents tries to connect to another configured // connection by random. func (f *FailOverConnectionMode) PublishEvents( - trans outputs.Signaler, + signaler outputs.Signaler, events []common.MapStr, ) error { - published := 0 fails := 0 var err error @@ -121,18 +120,18 @@ func (f *FailOverConnectionMode) PublishEvents( } // loop until all events have been send in case client supports partial sends - for published < len(events) { + for len(events) > 0 { + var err error conn := f.conns[f.active] - n, err := conn.PublishEvents(events[published:]) + events, err = conn.PublishEvents(events) if err != nil { logp.Info("Error publishing events (retrying): %s", err) break } - published += n } - if published == len(events) { - outputs.SignalCompleted(trans) + if len(events) == 0 { + outputs.SignalCompleted(signaler) return nil } @@ -145,7 +144,7 @@ func (f *FailOverConnectionMode) PublishEvents( fails++ } - outputs.SignalFailed(trans, err) + outputs.SignalFailed(signaler, err) return nil } diff --git a/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/mode/mode.go b/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/mode/mode.go index 673faf24fb9..40918b83a98 100644 --- a/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/mode/mode.go +++ b/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/mode/mode.go @@ -52,15 +52,19 @@ type ProtocolClient interface { // must be set. If connection has been lost, IsConnected must return false // in future calls. // PublishEvents is free to publish only a subset of given events, even in - // error case. On return n indicates the number of events guaranteed to be - // published. - PublishEvents(events []common.MapStr) (n int, err error) + // error case. On return nextEvents contains all events not yet published. + PublishEvents(events []common.MapStr) (nextEvents []common.MapStr, err error) // PublishEvent sends one event to the clients sink. On failure and error is // returned. PublishEvent(event common.MapStr) error } +var ( + // ErrTempBulkFailure indicates PublishEvents fail temporary to retry. + ErrTempBulkFailure = errors.New("temporary bulk send failure") +) + // MakeClients will create a list from of ProtocolClient instances from // outputer configuration host list and client factory function. func MakeClients( diff --git a/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/mode/single.go b/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/mode/single.go index 81a53e0393a..b94692dffcf 100644 --- a/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/mode/single.go +++ b/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/mode/single.go @@ -65,7 +65,6 @@ func (s *SingleConnectionMode) PublishEvents( signaler outputs.Signaler, events []common.MapStr, ) error { - published := 0 fails := 0 var backoffCount uint var err error @@ -76,18 +75,18 @@ func (s *SingleConnectionMode) PublishEvents( goto sendFail } - for published < len(events) { - n, err := s.conn.PublishEvents(events[published:]) + for len(events) > 0 { + var err error + events, err = s.conn.PublishEvents(events) if err != nil { logp.Info("Error publishing events (retrying): %s", err) break } fails = 0 - published += n } - if published == len(events) { + if len(events) == 0 { outputs.SignalCompleted(signaler) return nil } diff --git a/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/outputs.go b/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/outputs.go index 85af56c4a29..7d51b3c3b3f 100644 --- a/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/outputs.go +++ b/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/outputs.go @@ -28,7 +28,7 @@ type MothershipConfig struct { Number_of_files int DataType string Flush_interval *int - Bulk_size *int + BulkMaxSize *int `yaml:"bulk_max_size"` Max_retries *int Pretty *bool TLS *TLSConfig diff --git a/Godeps/_workspace/src/github.com/elastic/libbeat/publisher/async.go b/Godeps/_workspace/src/github.com/elastic/libbeat/publisher/async.go index dcbcf2ba831..c67adcf2918 100644 --- a/Godeps/_workspace/src/github.com/elastic/libbeat/publisher/async.go +++ b/Godeps/_workspace/src/github.com/elastic/libbeat/publisher/async.go @@ -47,8 +47,8 @@ func (p *asyncPublisher) onMessage(m message) { // be implemented in the furute. // If m.signal is nil, NewSplitSignaler will return nil -> signaler will // only set if client did send one - if m.signal != nil && len(p.outputs) > 1 { - m.signal = outputs.NewSplitSignaler(m.signal, len(p.outputs)) + if m.context.signal != nil && len(p.outputs) > 1 { + m.context.signal = outputs.NewSplitSignaler(m.context.signal, len(p.outputs)) } for _, o := range p.outputs { o.send(m) @@ -59,12 +59,12 @@ func (p *asyncPublisher) client() eventPublisher { return asyncClient(p.send) } -func (c asyncClient) PublishEvent(event common.MapStr) bool { - return c.send(message{event: event}) +func (c asyncClient) PublishEvent(ctx *context, event common.MapStr) bool { + return c.send(message{context: *ctx, event: event}) } -func (c asyncClient) PublishEvents(events []common.MapStr) bool { - return c.send(message{events: events}) +func (c asyncClient) PublishEvents(ctx *context, events []common.MapStr) bool { + return c.send(message{context: *ctx, events: events}) } func (c asyncClient) send(m message) bool { @@ -82,8 +82,8 @@ func asyncOutputer(ws *workerSignal, worker *outputWorker) worker { } maxBulkSize := defaultBulkSize - if config.Bulk_size != nil { - maxBulkSize = *config.Bulk_size + if config.BulkMaxSize != nil { + maxBulkSize = *config.BulkMaxSize } // batching disabled diff --git a/Godeps/_workspace/src/github.com/elastic/libbeat/publisher/bulk.go b/Godeps/_workspace/src/github.com/elastic/libbeat/publisher/bulk.go index 17b298a35b4..2389396bd69 100644 --- a/Godeps/_workspace/src/github.com/elastic/libbeat/publisher/bulk.go +++ b/Godeps/_workspace/src/github.com/elastic/libbeat/publisher/bulk.go @@ -52,9 +52,9 @@ func (b *bulkWorker) run() { return case m := <-b.queue: if m.event != nil { // single event - b.onEvent(m.signal, m.event) + b.onEvent(m.context.signal, m.event) } else { // batch of events - b.onEvents(m.signal, m.events) + b.onEvents(m.context.signal, m.events) } // buffer full? @@ -108,8 +108,11 @@ func (b *bulkWorker) onEvents(signal outputs.Signaler, events []common.MapStr) { } func (b *bulkWorker) publish() { + // TODO: remember/merge and forward context options to output worker b.output.send(message{ - signal: outputs.NewCompositeSignaler(b.pending...), + context: context{ + signal: outputs.NewCompositeSignaler(b.pending...), + }, events: b.events, }) diff --git a/Godeps/_workspace/src/github.com/elastic/libbeat/publisher/client.go b/Godeps/_workspace/src/github.com/elastic/libbeat/publisher/client.go index f7a0d4429d4..1c944e90f5a 100644 --- a/Godeps/_workspace/src/github.com/elastic/libbeat/publisher/client.go +++ b/Godeps/_workspace/src/github.com/elastic/libbeat/publisher/client.go @@ -24,11 +24,6 @@ type client struct { publisher *PublisherType } -type publishOptions struct { - confirm bool - sync bool -} - // ClientOption allows API users to set additional options when publishing events. type ClientOption func(option *publishOptions) @@ -47,14 +42,16 @@ func Sync(options *publishOptions) { } func (c *client) PublishEvent(event common.MapStr, opts ...ClientOption) bool { - return c.getClient(opts).PublishEvent(event) + options, client := c.getClient(opts) + return client.PublishEvent(&context{publishOptions: options}, event) } func (c *client) PublishEvents(events []common.MapStr, opts ...ClientOption) bool { - return c.getClient(opts).PublishEvents(events) + options, client := c.getClient(opts) + return client.PublishEvents(&context{publishOptions: options}, events) } -func (c *client) getClient(opts []ClientOption) eventPublisher { +func (c *client) getClient(opts []ClientOption) (publishOptions, eventPublisher) { debug("send event") options := publishOptions{} for _, opt := range opts { @@ -62,9 +59,9 @@ func (c *client) getClient(opts []ClientOption) eventPublisher { } if options.confirm { - return c.publisher.syncPublisher.client(!options.sync) + return options, c.publisher.syncPublisher.client() } - return c.publisher.asyncPublisher.client() + return options, c.publisher.asyncPublisher.client() } // PublishEvent will publish the event on the channel. Options will be ignored. diff --git a/Godeps/_workspace/src/github.com/elastic/libbeat/publisher/output.go b/Godeps/_workspace/src/github.com/elastic/libbeat/publisher/output.go index 3c5b5bd23f4..567ff42201c 100644 --- a/Godeps/_workspace/src/github.com/elastic/libbeat/publisher/output.go +++ b/Godeps/_workspace/src/github.com/elastic/libbeat/publisher/output.go @@ -10,8 +10,9 @@ import ( type outputWorker struct { messageWorker - out outputs.BulkOutputer - config outputs.MothershipConfig + out outputs.BulkOutputer + config outputs.MothershipConfig + maxBulkSize int } func newOutputWorker( @@ -20,7 +21,16 @@ func newOutputWorker( ws *workerSignal, hwm int, ) *outputWorker { - o := &outputWorker{out: outputs.CastBulkOutputer(out), config: config} + maxBulkSize := defaultBulkSize + if config.BulkMaxSize != nil { + maxBulkSize = *config.BulkMaxSize + } + + o := &outputWorker{ + out: outputs.CastBulkOutputer(out), + config: config, + maxBulkSize: maxBulkSize, + } o.messageWorker.init(ws, hwm, o) return o } @@ -30,22 +40,79 @@ func (o *outputWorker) onStop() {} func (o *outputWorker) onMessage(m message) { if m.event != nil { - debug("output worker: publish single event") - ts := time.Time(m.event["@timestamp"].(common.Time)).UTC() - _ = o.out.PublishEvent(m.signal, ts, m.event) + o.onEvent(&m.context, m.event) } else { - if len(m.events) == 0 { - debug("output worker: no events to publish") - outputs.SignalCompleted(m.signal) - return + o.onBulk(&m.context, m.events) + } +} + +func (o *outputWorker) onEvent(ctx *context, event common.MapStr) { + debug("output worker: publish single event") + ts := time.Time(event["@timestamp"].(common.Time)).UTC() + + if !ctx.sync { + _ = o.out.PublishEvent(ctx.signal, ts, event) + return + } + + signal := outputs.NewSyncSignal() + for { + o.out.PublishEvent(signal, ts, event) + if signal.Wait() { + outputs.SignalCompleted(ctx.signal) + break } + } +} - debug("output worker: publish %v events", len(m.events)) - ts := time.Time(m.events[0]["@timestamp"].(common.Time)).UTC() - err := o.out.BulkPublish(m.signal, ts, m.events) +func (o *outputWorker) onBulk(ctx *context, events []common.MapStr) { + if len(events) == 0 { + debug("output worker: no events to publish") + outputs.SignalCompleted(ctx.signal) + return + } + var sync *outputs.SyncSignal + if ctx.sync { + sync = outputs.NewSyncSignal() + } + + if o.maxBulkSize < 0 || len(events) <= o.maxBulkSize { + o.sendBulk(sync, ctx, events) + return + } + + // start splitting bulk request + splits := (len(events) + (o.maxBulkSize - 1)) / o.maxBulkSize + ctx.signal = outputs.NewSplitSignaler(ctx.signal, splits) + for len(events) > 0 { + sz := o.maxBulkSize + if sz > len(events) { + sz = len(events) + } + o.sendBulk(sync, ctx, events[:sz]) + events = events[sz:] + } +} + +func (o *outputWorker) sendBulk( + sync *outputs.SyncSignal, + ctx *context, + events []common.MapStr, +) { + debug("output worker: publish %v events", len(events)) + ts := time.Time(events[0]["@timestamp"].(common.Time)).UTC() + + if sync == nil { + err := o.out.BulkPublish(ctx.signal, ts, events) if err != nil { logp.Info("Error bulk publishing events: %s", err) } + return + } + + for done := false; !done; done = sync.Wait() { + o.out.BulkPublish(sync, ts, events) } + outputs.SignalCompleted(ctx.signal) } diff --git a/Godeps/_workspace/src/github.com/elastic/libbeat/publisher/preprocess.go b/Godeps/_workspace/src/github.com/elastic/libbeat/publisher/preprocess.go index 54ed5c46a25..02f1cf43095 100644 --- a/Godeps/_workspace/src/github.com/elastic/libbeat/publisher/preprocess.go +++ b/Godeps/_workspace/src/github.com/elastic/libbeat/publisher/preprocess.go @@ -66,7 +66,7 @@ func (p *preprocessor) onMessage(m message) { // return if no event is left if len(ignore) == len(events) { debug("no event left, complete send") - outputs.SignalCompleted(m.signal) + outputs.SignalCompleted(m.context.signal) return } @@ -84,15 +84,15 @@ func (p *preprocessor) onMessage(m message) { if publisher.disabled { debug("publisher disabled") - outputs.SignalCompleted(m.signal) + outputs.SignalCompleted(m.context.signal) return } debug("preprocessor forward") if single { - p.handler.onMessage(message{signal: m.signal, event: events[0]}) + p.handler.onMessage(message{context: m.context, event: events[0]}) } else { - p.handler.onMessage(message{signal: m.signal, events: events}) + p.handler.onMessage(message{context: m.context, events: events}) } } diff --git a/Godeps/_workspace/src/github.com/elastic/libbeat/publisher/publish.go b/Godeps/_workspace/src/github.com/elastic/libbeat/publisher/publish.go index a1678188281..b9e21aa738a 100644 --- a/Godeps/_workspace/src/github.com/elastic/libbeat/publisher/publish.go +++ b/Godeps/_workspace/src/github.com/elastic/libbeat/publisher/publish.go @@ -27,8 +27,18 @@ var debug = logp.MakeDebug("publish") // EventPublisher provides the interface for beats to publish events. type eventPublisher interface { - PublishEvent(event common.MapStr) bool - PublishEvents(events []common.MapStr) bool + PublishEvent(ctx *context, event common.MapStr) bool + PublishEvents(ctx *context, events []common.MapStr) bool +} + +type context struct { + publishOptions + signal outputs.Signaler +} + +type publishOptions struct { + confirm bool + sync bool } type TransactionalEventPublisher interface { @@ -183,7 +193,7 @@ func (publisher *PublisherType) Init( output := plugin.Output config := plugin.Config - debug("create output worker: %p, %p", config.Flush_interval, config.Bulk_size) + debug("create output worker: %p, %p", config.Flush_interval, config.BulkMaxSize) outputers = append(outputers, newOutputWorker(config, output, &publisher.wsOutput, 1000)) diff --git a/Godeps/_workspace/src/github.com/elastic/libbeat/publisher/sync.go b/Godeps/_workspace/src/github.com/elastic/libbeat/publisher/sync.go index 40b86d6ead0..6e63eb18a2a 100644 --- a/Godeps/_workspace/src/github.com/elastic/libbeat/publisher/sync.go +++ b/Godeps/_workspace/src/github.com/elastic/libbeat/publisher/sync.go @@ -18,42 +18,31 @@ func newSyncPublisher(pub *PublisherType) *syncPublisher { return s } -func (p *syncPublisher) client(confirmOnly bool) eventPublisher { - if confirmOnly { - return syncClient(p.forward) - } - return syncClient(p.forceForward) +func (p *syncPublisher) client() eventPublisher { + return syncClient(p.forward) } func (p *syncPublisher) onStop() {} func (p *syncPublisher) onMessage(m message) { - signal := outputs.NewSplitSignaler(m.signal, len(p.pub.Output)) - m.signal = signal + signal := outputs.NewSplitSignaler(m.context.signal, len(p.pub.Output)) + m.context.signal = signal for _, o := range p.pub.Output { o.send(m) } } -func (c syncClient) PublishEvent(event common.MapStr) bool { - return c(message{event: event}) +func (c syncClient) PublishEvent(ctx *context, event common.MapStr) bool { + return c(message{context: *ctx, event: event}) } -func (c syncClient) PublishEvents(events []common.MapStr) bool { - return c(message{events: events}) +func (c syncClient) PublishEvents(ctx *context, events []common.MapStr) bool { + return c(message{context: *ctx, events: events}) } func (p *syncPublisher) forward(m message) bool { sync := outputs.NewSyncSignal() - m.signal = sync + m.context.signal = sync p.send(m) return sync.Wait() } - -func (p *syncPublisher) forceForward(m message) bool { - for { - if ok := p.forward(m); ok { - return true - } - } -} diff --git a/Godeps/_workspace/src/github.com/elastic/libbeat/publisher/worker.go b/Godeps/_workspace/src/github.com/elastic/libbeat/publisher/worker.go index 2ff3324423a..40fd51a21ec 100644 --- a/Godeps/_workspace/src/github.com/elastic/libbeat/publisher/worker.go +++ b/Godeps/_workspace/src/github.com/elastic/libbeat/publisher/worker.go @@ -18,9 +18,9 @@ type messageWorker struct { } type message struct { - signal outputs.Signaler - event common.MapStr - events []common.MapStr + context context + event common.MapStr + events []common.MapStr } type workerSignal struct { @@ -87,6 +87,6 @@ func (ws *workerSignal) Init() { func stopQueue(qu chan message) { close(qu) for msg := range qu { // clear queue and send fail signal - outputs.SignalFailed(msg.signal, nil) + outputs.SignalFailed(msg.context.signal, nil) } }