Skip to content

Commit

Permalink
Merge pull request #176 from ipfs/expose-dirty-marks-stats
Browse files Browse the repository at this point in the history
Make store dirty-mark methods public. Add InternalStats()
  • Loading branch information
hsanjuan authored Sep 26, 2022
2 parents 6c723f4 + 157693f commit 19d9bc6
Showing 1 changed file with 44 additions and 22 deletions.
66 changes: 44 additions & 22 deletions crdt.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func New(
"crdt Datastore created. Number of heads: %d. Current max-height: %d. Dirty: %t",
len(headList),
maxHeight,
dstore.isDirty(),
dstore.IsDirty(),
)

// sendJobWorker + NumWorkers
Expand Down Expand Up @@ -372,7 +372,7 @@ func (store *Datastore) handleNext() {
// processed, thus it did not leave a branch
// half-processed and there's nothign to
// recover.
// disabled: store.markDirty()
// disabled: store.MarkDirty()
}
}

Expand Down Expand Up @@ -501,7 +501,7 @@ func (store *Datastore) repair() {
}
return
case <-timer.C:
if !store.isDirty() {
if !store.IsDirty() {
store.logger.Info("store is marked clean. No need to repair")
} else {
store.logger.Warn("store is marked dirty. Starting DAG repair operation")
Expand Down Expand Up @@ -564,7 +564,7 @@ func (store *Datastore) logStats() {
len(heads),
height,
len(store.jobQueue),
store.isDirty(),
store.IsDirty(),
)
case <-store.ctx.Done():
ticker.Stop()
Expand Down Expand Up @@ -631,15 +631,15 @@ func (store *Datastore) dagWorker() {

if err != nil {
store.logger.Error(err)
store.markDirty()
store.MarkDirty()
job.session.Done()
continue
}
go func(j *dagJob) {
err := store.sendNewJobs(j.session, j.nodeGetter, j.root, j.rootPrio, children)
if err != nil {
store.logger.Error(err)
store.markDirty()
store.MarkDirty()
}
j.session.Done()
}(job)
Expand Down Expand Up @@ -722,7 +722,7 @@ func (store *Datastore) sendJobWorker() {
case <-store.ctx.Done():
if len(store.sendJobs) > 0 {
// we left something in the queue
store.markDirty()
store.MarkDirty()
}
close(store.jobQueue)
return
Expand All @@ -748,23 +748,26 @@ func (store *Datastore) dirtyKey() ds.Key {
return store.namespace.ChildString(dirtyBitKey)
}

func (store *Datastore) markDirty() {
store.logger.Error("marking datastore as dirty")
// MarkDirty marks the Datastore as dirty.
func (store *Datastore) MarkDirty() {
store.logger.Warn("marking datastore as dirty")
err := store.store.Put(store.ctx, store.dirtyKey(), nil)
if err != nil {
store.logger.Errorf("error setting dirty bit: %s", err)
}
}

func (store *Datastore) isDirty() bool {
// IsDirty returns whether the datastore is marked dirty.
func (store *Datastore) IsDirty() bool {
ok, err := store.store.Has(store.ctx, store.dirtyKey())
if err != nil {
store.logger.Errorf("error checking dirty bit: %s", err)
}
return ok
}

func (store *Datastore) markClean() {
// MarkClean removes the dirty mark from the datastore.
func (store *Datastore) MarkClean() {
store.logger.Info("marking datastore as clean")
err := store.store.Delete(store.ctx, store.dirtyKey())
if err != nil {
Expand Down Expand Up @@ -880,7 +883,7 @@ func (store *Datastore) processNode(ng *crdtNodeGetter, root cid.Cid, rootPrio u
return children, nil
}

// repairDAG is used to walk down the chain until a non-processed node is
// RepairDAG is used to walk down the chain until a non-processed node is
// found and at that moment, queues it for processing.
func (store *Datastore) repairDAG() error {
start := time.Now()
Expand Down Expand Up @@ -986,7 +989,7 @@ func (store *Datastore) repairDAG() error {

// If we are here we have successfully reprocessed the chain until the
// bottom.
store.markClean()
store.MarkClean()
return nil
}

Expand Down Expand Up @@ -1023,15 +1026,14 @@ func (store *Datastore) GetSize(ctx context.Context, key ds.Key) (size int, err
// Query searches the datastore and returns a query result. This function
// may return before the query actually runs. To wait for the query:
//
// result, _ := ds.Query(q)
// result, _ := ds.Query(q)
//
// // use the channel interface; result may come in at different times
// for entry := range result.Next() { ... }
//
// // or wait for the query to be completely done
// entries, _ := result.Rest()
// for entry := range entries { ... }
// // use the channel interface; result may come in at different times
// for entry := range result.Next() { ... }
//
// // or wait for the query to be completely done
// entries, _ := result.Rest()
// for entry := range entries { ... }
func (store *Datastore) Query(ctx context.Context, q query.Query) (query.Results, error) {
qr, err := store.set.Elements(ctx, q)
if err != nil {
Expand Down Expand Up @@ -1108,7 +1110,7 @@ func (store *Datastore) Sync(ctx context.Context, prefix ds.Key) error {
func (store *Datastore) Close() error {
store.cancel()
store.wg.Wait()
if store.isDirty() {
if store.IsDirty() {
store.logger.Warn("datastore is being closed marked as dirty")
}
return nil
Expand Down Expand Up @@ -1257,7 +1259,7 @@ func (store *Datastore) addDAGNode(delta *pb.Delta) (cid.Cid, error) {
nd,
)
if err != nil {
store.markDirty() // not sure if this will fix much if this happens.
store.MarkDirty() // not sure if this will fix much if this happens.
return cid.Undef, errors.Wrap(err, "error processing new block")
}
if len(children) != 0 {
Expand Down Expand Up @@ -1464,6 +1466,26 @@ func (store *Datastore) dotDAGRec(w io.Writer, from cid.Cid, depth uint64, ng *c
return nil
}

// Stats wraps internal information about the datastore.
// Might be expanded in the future.
type Stats struct {
Heads []cid.Cid
MaxHeight uint64
QueuedJobs int
}

// InternalStats returns internal datastore information like the current heads
// and max height.
func (store *Datastore) InternalStats() Stats {
heads, height, _ := store.heads.List()

return Stats{
Heads: heads,
MaxHeight: height,
QueuedJobs: len(store.jobQueue),
}
}

type cidSafeSet struct {
set map[cid.Cid]struct{}
mux sync.RWMutex
Expand Down

0 comments on commit 19d9bc6

Please sign in to comment.