From deff06f8500648742aecd06565611f34c50c5e29 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Thu, 3 Sep 2015 10:48:37 -0600 Subject: [PATCH] add copier service This commit adds the copier service which allows one server to copy shards from another server. This will be used for moving shards in the cluster. --- CHANGELOG.md | 1 + cmd/influxd/run/server.go | 11 + services/copier/internal/internal.pb.go | 57 ++++++ services/copier/internal/internal.proto | 9 + services/copier/service.go | 261 ++++++++++++++++++++++++ services/copier/service_test.go | 184 +++++++++++++++++ tcp/mux.go | 14 ++ tsdb/engine.go | 2 + tsdb/engine/b1/b1.go | 19 ++ tsdb/engine/bz1/bz1.go | 19 ++ tsdb/shard.go | 3 + 11 files changed, 580 insertions(+) create mode 100644 services/copier/internal/internal.pb.go create mode 100644 services/copier/internal/internal.proto create mode 100644 services/copier/service.go create mode 100644 services/copier/service_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index eda3ef352b8..e5479145474 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ With this release InfluxDB is moving to Go 1.5. - [#3916](https://github.com/influxdb/influxdb/pull/3916): New statistics and diagnostics support. Graphite first to be instrumented. - [#3901](https://github.com/influxdb/influxdb/pull/3901): Add consistency level option to influx cli Thanks @takayuki - [#3876](https://github.com/influxdb/influxdb/pull/3876): Allow the following syntax in CQs: INTO "1hPolicy".:MEASUREMENT +- [#3975](https://github.com/influxdb/influxdb/pull/3975): Add shard copy service ### Bugfixes - [#3804](https://github.com/influxdb/influxdb/pull/3804): init.d script fixes, fixes issue 3803. diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index e3f0ee2f546..956c7f707da 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -18,6 +18,7 @@ import ( "github.com/influxdb/influxdb/services/admin" "github.com/influxdb/influxdb/services/collectd" "github.com/influxdb/influxdb/services/continuous_querier" + "github.com/influxdb/influxdb/services/copier" "github.com/influxdb/influxdb/services/graphite" "github.com/influxdb/influxdb/services/hh" "github.com/influxdb/influxdb/services/httpd" @@ -57,6 +58,7 @@ type Server struct { // These references are required for the tcp muxer. ClusterService *cluster.Service SnapshotterService *snapshotter.Service + CopierService *copier.Service Monitor *monitor.Monitor @@ -134,6 +136,7 @@ func NewServer(c *Config, version string) (*Server, error) { s.appendClusterService(c.Cluster) s.appendPrecreatorService(c.Precreator) s.appendSnapshotterService() + s.appendCopierService() s.appendAdminService(c.Admin) s.appendContinuousQueryService(c.ContinuousQuery) s.appendHTTPDService(c.HTTPD) @@ -170,6 +173,13 @@ func (s *Server) appendSnapshotterService() { s.SnapshotterService = srv } +func (s *Server) appendCopierService() { + srv := copier.NewService() + srv.TSDBStore = s.TSDBStore + s.Services = append(s.Services, srv) + s.CopierService = srv +} + func (s *Server) appendRetentionPolicyService(c retention.Config) { if !c.Enabled { return @@ -324,6 +334,7 @@ func (s *Server) Open() error { s.ClusterService.Listener = mux.Listen(cluster.MuxHeader) s.SnapshotterService.Listener = mux.Listen(snapshotter.MuxHeader) + s.CopierService.Listener = mux.Listen(copier.MuxHeader) go mux.Serve(ln) // Open meta store. diff --git a/services/copier/internal/internal.pb.go b/services/copier/internal/internal.pb.go new file mode 100644 index 00000000000..81732a93b5d --- /dev/null +++ b/services/copier/internal/internal.pb.go @@ -0,0 +1,57 @@ +// Code generated by protoc-gen-gogo. +// source: internal/internal.proto +// DO NOT EDIT! + +/* +Package internal is a generated protocol buffer package. + +It is generated from these files: + internal/internal.proto + +It has these top-level messages: + Request + Response +*/ +package internal + +import proto "github.com/gogo/protobuf/proto" +import math "math" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = math.Inf + +type Request struct { + ShardID *uint64 `protobuf:"varint,1,req" json:"ShardID,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *Request) Reset() { *m = Request{} } +func (m *Request) String() string { return proto.CompactTextString(m) } +func (*Request) ProtoMessage() {} + +func (m *Request) GetShardID() uint64 { + if m != nil && m.ShardID != nil { + return *m.ShardID + } + return 0 +} + +type Response struct { + Error *string `protobuf:"bytes,1,opt" json:"Error,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *Response) Reset() { *m = Response{} } +func (m *Response) String() string { return proto.CompactTextString(m) } +func (*Response) ProtoMessage() {} + +func (m *Response) GetError() string { + if m != nil && m.Error != nil { + return *m.Error + } + return "" +} + +func init() { +} diff --git a/services/copier/internal/internal.proto b/services/copier/internal/internal.proto new file mode 100644 index 00000000000..fb14c68124e --- /dev/null +++ b/services/copier/internal/internal.proto @@ -0,0 +1,9 @@ +package internal; + +message Request { + required uint64 ShardID = 1; +} + +message Response { + optional string Error = 1; +} diff --git a/services/copier/service.go b/services/copier/service.go new file mode 100644 index 00000000000..e245fac7289 --- /dev/null +++ b/services/copier/service.go @@ -0,0 +1,261 @@ +package copier + +import ( + "encoding/binary" + "errors" + "fmt" + "io" + "log" + "net" + "os" + "strings" + "sync" + + "github.com/gogo/protobuf/proto" + "github.com/influxdb/influxdb/services/copier/internal" + "github.com/influxdb/influxdb/tcp" + "github.com/influxdb/influxdb/tsdb" +) + +//go:generate protoc --gogo_out=. internal/internal.proto + +// MuxHeader is the header byte used for the TCP muxer. +const MuxHeader = 6 + +// Service manages the listener for the endpoint. +type Service struct { + wg sync.WaitGroup + err chan error + + TSDBStore interface { + Shard(id uint64) *tsdb.Shard + } + + Listener net.Listener + Logger *log.Logger +} + +// NewService returns a new instance of Service. +func NewService() *Service { + return &Service{ + err: make(chan error), + Logger: log.New(os.Stderr, "[copier] ", log.LstdFlags), + } +} + +// Open starts the service. +func (s *Service) Open() error { + s.Logger.Println("Starting copier service") + + s.wg.Add(1) + go s.serve() + return nil +} + +// Close implements the Service interface. +func (s *Service) Close() error { + if s.Listener != nil { + s.Listener.Close() + } + s.wg.Wait() + return nil +} + +// SetLogger sets the internal logger to the logger passed in. +func (s *Service) SetLogger(l *log.Logger) { + s.Logger = l +} + +// Err returns a channel for fatal out-of-band errors. +func (s *Service) Err() <-chan error { return s.err } + +// serve serves shard copy requests from the listener. +func (s *Service) serve() { + defer s.wg.Done() + + for { + // Wait for next connection. + conn, err := s.Listener.Accept() + if err != nil && strings.Contains(err.Error(), "connection closed") { + s.Logger.Println("copier listener closed") + return + } else if err != nil { + s.Logger.Println("error accepting copier request: ", err.Error()) + continue + } + + // Handle connection in separate goroutine. + s.wg.Add(1) + go func(conn net.Conn) { + defer s.wg.Done() + defer conn.Close() + if err := s.handleConn(conn); err != nil { + s.Logger.Println(err) + } + }(conn) + } +} + +// handleConn processes conn. This is run in a separate goroutine. +func (s *Service) handleConn(conn net.Conn) error { + // Read request from connection. + req, err := s.readRequest(conn) + if err != nil { + return fmt.Errorf("read request: %s", err) + } + + // Retrieve shard. + sh := s.TSDBStore.Shard(req.GetShardID()) + + // Return error response if the shard doesn't exist. + if sh == nil { + if err := s.writeResponse(conn, &internal.Response{ + Error: proto.String(fmt.Sprintf("shard not found: id=%d", req.GetShardID())), + }); err != nil { + return fmt.Errorf("write error response: %s", err) + } + return nil + } + + // Write successful response. + if err := s.writeResponse(conn, &internal.Response{}); err != nil { + return fmt.Errorf("write response: %s", err) + } + + // Write shard to response. + if _, err := sh.WriteTo(conn); err != nil { + return fmt.Errorf("write shard: %s", err) + } + + return nil +} + +// readRequest reads and unmarshals a Request from r. +func (s *Service) readRequest(r io.Reader) (*internal.Request, error) { + // Read request length. + var n uint32 + if err := binary.Read(r, binary.BigEndian, &n); err != nil { + return nil, fmt.Errorf("read request length: %s", err) + } + + // Read body. + buf := make([]byte, n) + if _, err := io.ReadFull(r, buf); err != nil { + return nil, fmt.Errorf("read request: %s", err) + } + + // Unmarshal request. + req := &internal.Request{} + if err := proto.Unmarshal(buf, req); err != nil { + return nil, fmt.Errorf("unmarshal request: %s", err) + } + + return req, nil +} + +// writeResponse marshals and writes a Response to w. +func (s *Service) writeResponse(w io.Writer, resp *internal.Response) error { + // Marshal the response to a byte slice. + buf, err := proto.Marshal(resp) + if err != nil { + return fmt.Errorf("marshal error: %s", err) + } + + // Write response length to writer. + if err := binary.Write(w, binary.BigEndian, uint32(len(buf))); err != nil { + return fmt.Errorf("write response length error: %s", err) + } + + // Write body to writer. + if _, err := w.Write(buf); err != nil { + return fmt.Errorf("write body error: %s", err) + } + + return nil +} + +// Client represents a client for connecting remotely to a copier service. +type Client struct { + host string +} + +// NewClient return a new instance of Client. +func NewClient(host string) *Client { + return &Client{ + host: host, + } +} + +// ShardReader returns a reader for streaming shard data. +// Returned ReadCloser must be closed by the caller. +func (c *Client) ShardReader(id uint64) (io.ReadCloser, error) { + // Connect to remote server. + conn, err := tcp.Dial("tcp", c.host, MuxHeader) + if err != nil { + return nil, err + } + + // Send request to server. + if err := c.writeRequest(conn, &internal.Request{ShardID: proto.Uint64(id)}); err != nil { + return nil, fmt.Errorf("write request: %s", err) + } + + // Read response from the server. + resp, err := c.readResponse(conn) + if err != nil { + return nil, fmt.Errorf("read response: %s", err) + } + + // If there was an error then return it and close connection. + if resp.GetError() != "" { + conn.Close() + return nil, errors.New(resp.GetError()) + } + + // Returning remaining stream for caller to consume. + return conn, nil +} + +// writeRequest marshals and writes req to w. +func (c *Client) writeRequest(w io.Writer, req *internal.Request) error { + // Marshal request. + buf, err := proto.Marshal(req) + if err != nil { + return fmt.Errorf("marshal request: %s", err) + } + + // Write request length. + if err := binary.Write(w, binary.BigEndian, uint32(len(buf))); err != nil { + return fmt.Errorf("write request length: %s", err) + } + + // Send request to server. + if _, err := w.Write(buf); err != nil { + return fmt.Errorf("write request body: %s", err) + } + + return nil +} + +// readResponse reads and unmarshals a Response from r. +func (c *Client) readResponse(r io.Reader) (*internal.Response, error) { + // Read response length. + var n uint32 + if err := binary.Read(r, binary.BigEndian, &n); err != nil { + return nil, fmt.Errorf("read response length: %s", err) + } + + // Read response. + buf := make([]byte, n) + if _, err := io.ReadFull(r, buf); err != nil { + return nil, fmt.Errorf("read response: %s", err) + } + + // Unmarshal response. + resp := &internal.Response{} + if err := proto.Unmarshal(buf, resp); err != nil { + return nil, fmt.Errorf("unmarshal response: %s", err) + } + + return resp, nil +} diff --git a/services/copier/service_test.go b/services/copier/service_test.go new file mode 100644 index 00000000000..a5266087d7f --- /dev/null +++ b/services/copier/service_test.go @@ -0,0 +1,184 @@ +package copier_test + +import ( + "bytes" + "encoding/binary" + "io" + "io/ioutil" + "log" + "net" + "os" + "path/filepath" + "testing" + + "github.com/influxdb/influxdb/services/copier" + "github.com/influxdb/influxdb/tcp" + "github.com/influxdb/influxdb/tsdb" + _ "github.com/influxdb/influxdb/tsdb/engine" +) + +// Ensure the service can return shard data. +func TestService_handleConn(t *testing.T) { + s := MustOpenService() + defer s.Close() + + // Mock shard. + sh := MustOpenShard(123) + defer sh.Close() + s.TSDBStore.ShardFn = func(id uint64) *tsdb.Shard { + if id != 123 { + t.Fatalf("unexpected id: %d", id) + } + return sh.Shard + } + + // Create client and request shard from service. + c := copier.NewClient(s.Addr().String()) + r, err := c.ShardReader(123) + if err != nil { + t.Fatal(err) + } else if r == nil { + t.Fatal("expected reader") + } + defer r.Close() + + // Slurp from reader. + var n uint64 + if err := binary.Read(r, binary.BigEndian, &n); err != nil { + t.Fatal(err) + } + buf := make([]byte, n) + if _, err := io.ReadFull(r, buf); err != nil { + t.Fatal(err) + } + + // Read database from disk. + exp, err := ioutil.ReadFile(sh.Path()) + if err != nil { + t.Fatal(err) + } + + // Trim expected bytes since bolt won't read beyond the HWM. + exp = exp[0:len(buf)] + + // Compare disk and reader contents. + if !bytes.Equal(exp, buf) { + t.Fatalf("data mismatch: exp=len(%d), got=len(%d)", len(exp), len(buf)) + } +} + +// Ensure the service can return an error to the client. +func TestService_handleConn_Error(t *testing.T) { + s := MustOpenService() + defer s.Close() + + // Mock missing shard. + s.TSDBStore.ShardFn = func(id uint64) *tsdb.Shard { return nil } + + // Create client and request shard from service. + c := copier.NewClient(s.Addr().String()) + r, err := c.ShardReader(123) + if err == nil || err.Error() != `shard not found: id=123` { + t.Fatalf("unexpected error: %s", err) + } else if r != nil { + t.Fatal("expected nil reader") + } +} + +// Service represents a test wrapper for copier.Service. +type Service struct { + *copier.Service + + ln net.Listener + TSDBStore ServiceTSDBStore +} + +// NewService returns a new instance of Service. +func NewService() *Service { + s := &Service{ + Service: copier.NewService(), + } + s.Service.TSDBStore = &s.TSDBStore + + if !testing.Verbose() { + s.SetLogger(log.New(ioutil.Discard, "", 0)) + } + return s +} + +// MustOpenService returns a new, opened service. Panic on error. +func MustOpenService() *Service { + // Open randomly assigned port. + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + panic(err) + } + + // Start muxer. + mux := tcp.NewMux() + + // Create new service and attach mux'd listener. + s := NewService() + s.ln = ln + s.Listener = mux.Listen(copier.MuxHeader) + go mux.Serve(ln) + + if err := s.Open(); err != nil { + panic(err) + } + + return s +} + +// Close shuts down the service and the attached listener. +func (s *Service) Close() error { + s.ln.Close() + err := s.Service.Close() + return err +} + +// Addr returns the address of the service. +func (s *Service) Addr() net.Addr { return s.ln.Addr() } + +// ServiceTSDBStore is a mock that implements copier.Service.TSDBStore. +type ServiceTSDBStore struct { + ShardFn func(id uint64) *tsdb.Shard +} + +func (ss *ServiceTSDBStore) Shard(id uint64) *tsdb.Shard { return ss.ShardFn(id) } + +// Shard is a test wrapper for tsdb.Shard. +type Shard struct { + *tsdb.Shard + path string +} + +// MustOpenShard returns a temporary, opened shard. +func MustOpenShard(id uint64) *Shard { + path, err := ioutil.TempDir("", "copier-") + if err != nil { + panic(err) + } + + sh := &Shard{ + Shard: tsdb.NewShard(id, + tsdb.NewDatabaseIndex(), + filepath.Join(path, "data"), + filepath.Join(path, "wal"), + tsdb.NewEngineOptions(), + ), + path: path, + } + if err := sh.Open(); err != nil { + sh.Close() + panic(err) + } + + return sh +} + +func (sh *Shard) Close() error { + err := sh.Shard.Close() + os.RemoveAll(sh.Path()) + return err +} diff --git a/tcp/mux.go b/tcp/mux.go index 30583227e9c..324191cf14d 100644 --- a/tcp/mux.go +++ b/tcp/mux.go @@ -126,3 +126,17 @@ func (ln *listener) Close() error { return nil } // Addr always returns nil. func (ln *listener) Addr() net.Addr { return nil } + +// Dial connects to a remote mux listener with a given header byte. +func Dial(network, address string, header byte) (net.Conn, error) { + conn, err := net.Dial(network, address) + if err != nil { + return nil, err + } + + if _, err := conn.Write([]byte{header}); err != nil { + return nil, fmt.Errorf("write mux header: %s", err) + } + + return conn, nil +} diff --git a/tsdb/engine.go b/tsdb/engine.go index 71da46a2743..f2d1332f3f8 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -33,6 +33,8 @@ type Engine interface { DeleteSeries(keys []string) error DeleteMeasurement(name string, seriesKeys []string) error SeriesCount() (n int, err error) + + io.WriterTo } // NewEngineFunc creates a new engine. diff --git a/tsdb/engine/b1/b1.go b/tsdb/engine/b1/b1.go index 90ed8f294fe..ce9cde29229 100644 --- a/tsdb/engine/b1/b1.go +++ b/tsdb/engine/b1/b1.go @@ -524,6 +524,25 @@ func (e *Engine) Begin(writable bool) (tsdb.Tx, error) { // DB returns the underlying Bolt database. func (e *Engine) DB() *bolt.DB { return e.db } +// WriteTo writes the length and contents of the engine to w. +func (e *Engine) WriteTo(w io.Writer) (n int64, err error) { + tx, err := e.db.Begin(false) + if err != nil { + return 0, err + } + defer tx.Rollback() + + // Write size. + if err := binary.Write(w, binary.BigEndian, uint64(tx.Size())); err != nil { + return 0, err + } + + // Write data. + n, err = tx.WriteTo(w) + n += 8 // size header + return +} + // Tx represents a transaction. type Tx struct { *bolt.Tx diff --git a/tsdb/engine/bz1/bz1.go b/tsdb/engine/bz1/bz1.go index 11910eca7f3..848875a2aff 100644 --- a/tsdb/engine/bz1/bz1.go +++ b/tsdb/engine/bz1/bz1.go @@ -574,6 +574,25 @@ func (e *Engine) SeriesBucketStats(key string) (stats bolt.BucketStats, err erro return stats, err } +// WriteTo writes the length and contents of the engine to w. +func (e *Engine) WriteTo(w io.Writer) (n int64, err error) { + tx, err := e.db.Begin(false) + if err != nil { + return 0, err + } + defer tx.Rollback() + + // Write size. + if err := binary.Write(w, binary.BigEndian, uint64(tx.Size())); err != nil { + return 0, err + } + + // Write data. + n, err = tx.WriteTo(w) + n += 8 // size header + return +} + // Stats represents internal engine statistics. type Stats struct { Size int64 // BoltDB data size diff --git a/tsdb/shard.go b/tsdb/shard.go index d1c4dc1f16a..85f94ef3226 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -382,6 +382,9 @@ func (s *Shard) validateSeriesAndFields(points []Point) ([]*SeriesCreate, []*Fie // SeriesCount returns the number of series buckets on the shard. func (s *Shard) SeriesCount() (int, error) { return s.engine.SeriesCount() } +// WriteTo writes the shard's data to w. +func (s *Shard) WriteTo(w io.Writer) (int64, error) { return s.engine.WriteTo(w) } + type MeasurementFields struct { Fields map[string]*Field `json:"fields"` Codec *FieldCodec