Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http listener refactor #1915

Merged
merged 2 commits into from
Oct 24, 2016
Merged

http listener refactor #1915

merged 2 commits into from
Oct 24, 2016

Conversation

sparrc
Copy link
Contributor

@sparrc sparrc commented Oct 18, 2016

Required for all PRs:

  • CHANGELOG.md updated (we recommend not updating this until the PR has been approved by a maintainer)
  • Sign CLA (if not already signed)
  • README.md updated (if adding a new plugin)

in this commit:

  • chunks out the http request body to avoid making very large
    allocations.
  • establishes a limit for the maximum http request body size that the
    listener will accept.
  • utilizes a pool of byte buffers to reduce GC pressure.

t.acc = acc
t.pool = NewPool(100)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Magic number, but it needs to be something

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can probably be lowered, or made configurable. As it stands, the current code will allocate 100MB+ of RAM for the HTTP buffers, and there is no way for the user to specify otherwise. When it comes to allocating machine resources, I think that should be a configurable option. Especially since our pool isn't leaky, and once it's full, the memory will never drop back down or be released to the OS.

@sparrc sparrc force-pushed the http-listener-refactor branch 2 times, most recently from c2889ea to 9400799 Compare October 18, 2016 15:36
write_timeout = "10s"

## Maximum allowed http request body size in bytes.
## 0 means to use the default of 1,000,000,000 bytes (1 gigabyte)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears that default is 512MB above, not 1 GB.

Copy link

@joelegasse joelegasse left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few suggestions here and there.

toolarge and badrequest have enough in common that they should probably be replaced with a function similar to http.Error, taking the ResponseWriter, a status code, and a message.

@@ -0,0 +1,34 @@
package http_listener

type pool struct {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can just be type pool chan []byte, otherwise you're allocating a struct whose only field is a reference type.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either that, or also include the buffer size as a field in the struct. That would make it configurable at the type level, even if that doesn't get exposed as a user-facing config option.

buffers chan []byte
}

func NewPool(n int) *pool {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would replace this with return make(pool, n), I don't think it needs to pre-allocate every slot.

case p.buffers <- b:
default:
// the pool is full, so drop this buffer
b = nil

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line does nothing. b will be out of scope once this function returns, which will be immediately after this block, anyway.

t.acc = acc
t.pool = NewPool(100)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can probably be lowered, or made configurable. As it stands, the current code will allocate 100MB+ of RAM for the HTTP buffers, and there is no way for the user to specify otherwise. When it comes to allocating machine resources, I think that should be a configurable option. Especially since our pool isn't leaky, and once it's full, the memory will never drop back down or be released to the OS.

func (t *HttpListener) serveWrite(res http.ResponseWriter, req *http.Request) {
// Check that the content length is not too large for us to handle.
if req.ContentLength > t.MaxBodySize {
toolarge(res)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style nit: tooLarge and badRequest (below)

}

var return400 bool
var buf []byte

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a get() from the pool, and the same buffer should be used throughout the processing of a single request.

return
}

if err == io.ErrUnexpectedEOF || err == io.EOF || n == 0 {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

io.ReadFull only returns io.EOF if no bytes were read, the n == 0 check can be removed.

t.pool.put(buf)
newlinei := findnewline(body)
log.Printf("E! http_listener received a single line of %d bytes, maximum is %d bytes",
MAX_LINE_SIZE+newlinei, MAX_LINE_SIZE)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use len(buf) here

}
// rotate the bit remaining after the last newline to the front of the buffer
bufstart = len(buf) - i
copy(buf[:bufstart], buf[i:])

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copy(buf, buf[i:]) would also work here.

func findnewline(r io.Reader) int {
counter := 0
// read until the next newline:
var tmp [1]byte

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading one byte at a time (especially if it results in a syscall) is very inefficient. This might be better captured with some state in the main parsing loop. var badLineBytes int, and if that's greater than zero, scan for a newline in the next chunk, log the error, and drop those bytes.

@johnrengelman
Copy link
Contributor

I know this is a WIP, but I've tried running it and I get run-away memory growth.

@joelegasse
Copy link

This change also doesn't preserve the timestamping of batches for points that don't provide a timestamp.

@sparrc
Copy link
Contributor Author

sparrc commented Oct 18, 2016

@johnrengelman what size of batches are you writing in? approximate write load? approximate number of writers?

@johnrengelman
Copy link
Contributor

We're batching at 1000 points coming out of the host machines, there's
something like 200 - 300 host machines writing metrics (but this will need
to grow to 4x this many machines)
The host machines are posting their data to an AWS ELB, which is routing to
2 telegraf instances running the http_listener.
Those instances are batching at 2000 points and writing out to influx.

According to the ELB statistics, we're seeing something like 20 MB/s and 26
req/s, but we're seeing a lot of i/o timeouts in the http_listener tier, so
I'm not certain I believe these numbers.
Most of this is running on shared hardware in a cluster, so this week, i'm
going to have to try and spin up dedicated hardware for this spot, it could
be related to that.

I tried patching this change by removing the default case in the pool
implementation to create a new buffer and instead respond with an error,
this prevents the unbounded memory growth, so it seems to be basically
related to not being able to process inbound http requests fast enough.
That being said, if i allow the memory to grow unbounded, i don't ever see
i/o timeouts in this layer, but it seems memory is just never getting
released back, but this could be because the inbound data is just too much
for the hardware we are on.

I'll report back once I run on some dedicated hardware.

Alternatively, do you have any suggestions for a different implementation
for this? I was wondering if maybe I could run the influx_relay at this
layer. Our initial issues was just too many machines connecting to influx
directly, so this was our solution to abstract that, but I'm wondering if
we're just going down the wrong path.

On Tue, Oct 18, 2016 at 4:55 PM Cameron Sparr [email protected]
wrote:

@johnrengelman https://github.com/johnrengelman what size of batches
are you writing in? approximate write load? approximate number of writers?


You are receiving this because you were mentioned.

Reply to this email directly, view it on GitHub
#1915 (comment),
or mute the thread
https://github.com/notifications/unsubscribe-auth/AA90JIY6hN3u8rB_HYCPL9JjUF4jeRlNks5q1UBQgaJpZM4KZrA_
.

@sparrc
Copy link
Contributor Author

sparrc commented Oct 18, 2016

@johnrengelman At the moment I wouldn't recommend going down that path. The telegraf write path has been designed to make metrics easy to modify and make it simple to write output plugins for widely different protocols and services. The tradeoff has been high write throughput and performance. When running on SSDs, I would say that telegraf performs around half as well as InfluxDB. Significant work would need to be undertaken to optimize the telegraf write path to the level that InfluxDB is at.

If your InfluxDB instance is not keeping up with 26 req/s and 200-300 machines, then it sounds like it might be running on some very lean hardware. Unfortunately switching to telegraf as your http listener probably isn't going to help, and probably will only make your issues worse.

I don't think relay will help much, as that project only relays the http writes and doesn't do batching. Maybe one of us should start an influxdb-batcher project ;-)

@johnrengelman
Copy link
Contributor

That's an option I was discussing earlier today. In theory since the line
protocol is just line delimited strings, it could be very efficient to just
append then to a certain size and then send, correct?

I think our issue on the influx side was overloading it with too many http
connections coming from the hosts. Though to be fair, that system is being
run by a different group and I know there is some "clever" nginx proxying
going on, so it might have been at that layer.

On Tue, Oct 18, 2016 at 6:02 PM Cameron Sparr [email protected]
wrote:

@johnrengelman https://github.com/johnrengelman At the moment I
wouldn't recommend going down that path. The telegraf write path has been
designed to make metrics easy to modify and make it simple to write output
plugins for widely different protocols and services. The tradeoff has been
high write throughput and performance. When running on SSDs, I would say
that telegraf performs around half as well as InfluxDB. Significant work
would need to be undertaken to optimize the telegraf write path to the
level that InfluxDB is at.

If your InfluxDB instance is not keeping up with 26 req/s and 200-300
machines, then it sounds like it might be running on some very lean
hardware. Unfortunately switching to telegraf as your http listener
probably isn't going to help, and probably will only make your issues worse.

I don't think relay will help much, as that project only relays the http
writes and doesn't do batching. Maybe one of us should start an
influxdb-batcher project ;-)


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#1915 (comment),
or mute the thread
https://github.com/notifications/unsubscribe-auth/AA90JCDDbFEgPdHlCbJKkidA6otgBtddks5q1VASgaJpZM4KZrA_
.

@sparrc sparrc force-pushed the http-listener-refactor branch 6 times, most recently from ea70f6a to 3ed3b51 Compare October 21, 2016 13:38
Copy link

@joelegasse joelegasse left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few minor things, and some logic issues in the processing code.

@@ -11,6 +13,8 @@ type Buffer struct {
drops int
// total metrics added
total int

sync.Mutex

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be a named variable, mu, so that it can't be locked outside of this package.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should still be changed

// MAX_LINE_SIZE is the maximum size, in bytes, that can be allocated for
// a single InfluxDB point.
// 64 KB
DEFAULT_MAX_LINE_SIZE = 64 * 1000

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a power of two here might make the runtime happier, since for larger values, it will use memory pages directly (generally 4K), so 64 * 1024 would line up nicely.

var hangingBytes bool
buf := t.pool.get()
defer func() { t.pool.put(buf) }()
bufstart := 0

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: should be bufStart or just start

len(buf))
hangingBytes = true
return400 = true
continue

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably also reset start to zero.

return400 = true
}
// rotate the bit remaining after the last newline to the front of the buffer
bufstart = len(buf) - i

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should always just skip the newline, something like this:

i++ // start copying after the newline
start = len(buf) - i
if start > 0 {
    copy(buf, buf[i:])
}

@sparrc sparrc force-pushed the http-listener-refactor branch 3 times, most recently from 3cbcbc7 to e666485 Compare October 21, 2016 15:52
// MAX_LINE_SIZE is the maximum size, in bytes, that can be allocated for
// a single InfluxDB point.
// 64 KB
DEFAULT_MAX_LINE_SIZE = 64 * 1024
)

type HttpListener struct {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be HTTPListener

func (h *HttpListener) Start(acc telegraf.Accumulator) error {
h.mu.Lock()
defer h.mu.Unlock()
h.parser = influx.InfluxParser{}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line looks unecessary, unless I'm missing something. Is there some unexported state that needs to be reset if this HttpListener is reused?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope, you're right that it's unnecessary, I'll remove that line

var return400 bool
var hangingBytes bool
buf := h.pool.get()
defer func() { h.pool.put(buf) }()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defer h.pool.put(buf) will work, unless you want to delay evaluation of the captured variables, which is probably unnecessary

Copy link

@joelegasse joelegasse left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few minor things, and the one question about "resetting" the parser field. If there is a reason to force a zeroing of the field, add a comment.

@sparrc sparrc changed the title [WIP] http listener refactor http listener refactor Oct 24, 2016
in this commit:

- chunks out the http request body to avoid making very large
  allocations.
- establishes a limit for the maximum http request body size that the
  listener will accept.
- utilizes a pool of byte buffers to reduce GC pressure.
@sparrc sparrc merged commit c849b58 into master Oct 24, 2016
@sparrc sparrc deleted the http-listener-refactor branch October 25, 2016 13:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants