Skip to content

Commit

Permalink
feat(ch): use new writer API to encode blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
tdakkota committed Aug 16, 2024
1 parent b23efa6 commit 716154a
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 2 deletions.
6 changes: 6 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Client struct {
lg *zap.Logger
conn net.Conn
buf *proto.Buffer
writer *proto.Writer
reader *proto.Reader
info proto.ClientHello
server proto.ServerHello
Expand Down Expand Up @@ -500,6 +501,11 @@ func Connect(ctx context.Context, conn net.Conn, opt Options) (*Client, error) {
c.compression = proto.CompressionDisabled
}

if _, ok := conn.(*net.TCPConn); writevAvailable && // writev available only on Unix platforms.
ok && c.compression == proto.CompressionDisabled { // Could not be used with TLS and compression.
c.writer = proto.NewWriter(c.conn, c.buf)
}

handshakeCtx, cancel := context.WithTimeout(ctx, opt.HandshakeTimeout)
defer cancel()
if err := c.handshake(handshakeCtx); err != nil {
Expand Down
28 changes: 26 additions & 2 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,18 @@ func (c *Client) encodeBlock(ctx context.Context, tableName string, input []prot
BucketNum: -1,
}
}
if err := b.EncodeBlock(c.buf, c.protocolVersion, input); err != nil {
return errors.Wrap(err, "encode")

if w := c.writer; w != nil {
if err := b.WriteBlock(w, c.protocolVersion, input); err != nil {
return err
}
if err := c.flushWritev(ctx); err != nil {
return errors.Wrap(err, "write buffers")
}
} else {
if err := b.EncodeBlock(c.buf, c.protocolVersion, input); err != nil {
return errors.Wrap(err, "encode")
}
}

// Performing compression.
Expand All @@ -311,6 +321,20 @@ func (c *Client) encodeBlock(ctx context.Context, tableName string, input []prot
return nil
}

func (c *Client) flushWritev(ctx context.Context) error {
if err := ctx.Err(); err != nil {
return errors.Wrap(err, "context")
}
if deadline, ok := ctx.Deadline(); ok {
if err := c.conn.SetWriteDeadline(deadline); err != nil {
return errors.Wrap(err, "set write deadline")
}
// Reset deadline.
defer func() { _ = c.conn.SetWriteDeadline(time.Time{}) }()
}
return c.writer.Flush()
}

// encodeBlankBlock encodes block with zero columns and rows which is special
// case for "end of data".
func (c *Client) encodeBlankBlock(ctx context.Context) error {
Expand Down
5 changes: 5 additions & 0 deletions writev_other.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
//go:build !unix

package ch

const writevAvailable = false
5 changes: 5 additions & 0 deletions writev_unix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
//go:build unix

package ch

const writevAvailable = true

0 comments on commit 716154a

Please sign in to comment.