Skip to content

Commit

Permalink
influx_tsm: ignore series index and convert all points
Browse files Browse the repository at this point in the history
A case (influxdata#5606) was found where a lot of data unexpectedly disappeared from a database
following a TSM conversion.

The proximate cause was an inconsistency between the root Bolt DB bucket list
and the meta data in the "series" bucket of the same shard. There were apparently valid
series in Bolt DB buckets that were no longer referenced by the meta data
in the "series" bucket - so-called orphaned series; since the conversion
process only iterated across the series found in the meta data, the conversion process
caused the orphaned series to be removed from the converted shards. This resulted in the
unexpected removal of data from the TSM shards that had previously been accessible
(despite the meta data inconsistency) in the b1 shards.

The root cause of the meta data inconsistency in the case above was a failure, in versions prior
to v0.9.3 (actually 3348dab) to update the "series" bucket with series that had been created in
previous shards during the life of the same influxd process instance.

This fix is required to avoid data loss during TSM conversions for shards that were created with
versions of influx that did not include 3348dab (e.g. prior to v0.9.3).

Analysis-by: Jon Seymour <[email protected]>
  • Loading branch information
joelegasse authored and jonseymour committed Feb 13, 2016
1 parent cee603c commit 62711ba
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 32 deletions.
34 changes: 19 additions & 15 deletions cmd/influx_tsm/b1/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ const DefaultChunkSize = 1000

var NoFieldsFiltered uint64

var excludedBuckets = map[string]bool{
"fields": true,
"meta": true,
"series": true,
"wal": true,
}

// Reader is used to read all data from a b1 shard.
type Reader struct {
path string
Expand All @@ -27,7 +34,6 @@ type Reader struct {
keyBuf string
valuesBuf []tsm1.Value

series map[string]*tsdb.Series
fields map[string]*tsdb.MeasurementFields
codecs map[string]*tsdb.FieldCodec

Expand All @@ -38,7 +44,6 @@ type Reader struct {
func NewReader(path string) *Reader {
return &Reader{
path: path,
series: make(map[string]*tsdb.Series),
fields: make(map[string]*tsdb.MeasurementFields),
codecs: make(map[string]*tsdb.FieldCodec),
}
Expand Down Expand Up @@ -71,32 +76,31 @@ func (r *Reader) Open() error {
return err
}

// Load series
if err := r.db.View(func(tx *bolt.Tx) error {
meta := tx.Bucket([]byte("series"))
c := meta.Cursor()
seriesSet := make(map[string]bool)

for k, v := c.First(); k != nil; k, v = c.Next() {
series := &tsdb.Series{}
if err := series.UnmarshalBinary(v); err != nil {
return err
// ignore series index and find all series in this shard
if err := r.db.View(func(tx *bolt.Tx) error {
tx.ForEach(func(name []byte, _ *bolt.Bucket) error {
key := string(name)
if !excludedBuckets[key] {
seriesSet[key] = true
}
r.series[string(k)] = series
}
return nil
})
return nil
}); err != nil {
return err
}

// Create cursor for each field of each series.
r.tx, err = r.db.Begin(false)
if err != nil {
return err
}

for s := range r.series {
// Create cursor for each field of each series.
for s := range seriesSet {
measurement := tsdb.MeasurementFromSeriesKey(s)
fields := r.fields[tsdb.MeasurementFromSeriesKey(s)]
fields := r.fields[measurement]
if fields == nil {
atomic.AddUint64(&NoFieldsFiltered, 1)
continue
Expand Down
28 changes: 14 additions & 14 deletions cmd/influx_tsm/bz1/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ type Reader struct {
keyBuf string
valuesBuf []tsm.Value

series map[string]*tsdb.Series
fields map[string]*tsdb.MeasurementFields
codecs map[string]*tsdb.FieldCodec

Expand All @@ -42,7 +41,6 @@ type Reader struct {
func NewReader(path string) *Reader {
return &Reader{
path: path,
series: make(map[string]*tsdb.Series),
fields: make(map[string]*tsdb.MeasurementFields),
codecs: make(map[string]*tsdb.FieldCodec),
ChunkSize: DefaultChunkSize,
Expand All @@ -58,6 +56,8 @@ func (r *Reader) Open() error {
}
r.db = db

seriesSet := make(map[string]bool)

if err := r.db.View(func(tx *bolt.Tx) error {
var data []byte

Expand All @@ -66,20 +66,20 @@ func (r *Reader) Open() error {
// No data in this shard.
return nil
}
buf := meta.Get([]byte("series"))
if buf == nil {
// No data in this shard.

pointsBucket := tx.Bucket([]byte("points"))
if pointsBucket == nil {
return nil
}
data, err = snappy.Decode(nil, buf)
if err != nil {
return err
}
if err := json.Unmarshal(data, &r.series); err != nil {

if err := pointsBucket.ForEach(func(key, _ []byte) error {
seriesSet[string(key)] = true
return nil
}); err != nil {
return err
}

buf = meta.Get([]byte("fields"))
buf := meta.Get([]byte("fields"))
if buf == nil {
// No data in this shard.
return nil
Expand All @@ -102,15 +102,15 @@ func (r *Reader) Open() error {
r.codecs[k] = tsdb.NewFieldCodec(v.Fields)
}

// Create cursor for each field of each series.
r.tx, err = r.db.Begin(false)
if err != nil {
return err
}

for s := range r.series {
// Create cursor for each field of each series.
for s := range seriesSet {
measurement := tsdb.MeasurementFromSeriesKey(s)
fields := r.fields[tsdb.MeasurementFromSeriesKey(s)]
fields := r.fields[measurement]
if fields == nil {
atomic.AddUint64(&NoFieldsFiltered, 1)
continue
Expand Down
4 changes: 1 addition & 3 deletions cmd/influx_tsm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ The backed-up files must be removed manually, generally after starting up the
node again to make sure all of data has been converted correctly.
To restore a backup:
Shut down the node, remove the converted directory, and
Shut down the node, remove the converted directory, and
copy the backed-up directory to the original location.`

type options struct {
Expand All @@ -54,7 +54,6 @@ type options struct {
Parallel bool
SkipBackup bool
UpdateInterval time.Duration
// Quiet bool
}

func (o *options) Parse() error {
Expand All @@ -67,7 +66,6 @@ func (o *options) Parse() error {
fs.BoolVar(&opts.Parallel, "parallel", false, "Perform parallel conversion. (up to GOMAXPROCS shards at once)")
fs.BoolVar(&opts.SkipBackup, "nobackup", false, "Disable database backups. Not recommended.")
fs.StringVar(&opts.BackupPath, "backup", "", "The location to backup up the current databases. Must not be within the data directoryi.")
// fs.BoolVar(&opts.Quiet, "quiet", false, "Suppresses the regular status updates.")
fs.StringVar(&opts.DebugAddr, "debug", "", "If set, http debugging endpoints will be enabled on the given address")
fs.DurationVar(&opts.UpdateInterval, "interval", 5*time.Second, "How often status updates are printed.")
fs.Usage = func() {
Expand Down

0 comments on commit 62711ba

Please sign in to comment.