Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/query record incomplete #220

Merged
merged 2 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion miner/multiminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ func (m *Miner) mine(ctx context.Context) {
}
if err != nil {
log.Errorf("failed to create block: %s", err)
rcd.Record(ctx, recorder.Records{"error": fmt.Sprintf("failed to create block: %s", err), "end": build.Clock.Now().String()})
continue
}
blks = append(blks, b)
Expand Down Expand Up @@ -606,7 +607,9 @@ func (m *Miner) mineOneForAll(ctx context.Context, base *MiningBase) []*winPoStR

go func() {
defer wg.Done()
defer rcd.Record(ctx, recorder.Records{"end": build.Clock.Now().String()})
defer func() {
rcd.Record(ctx, recorder.Records{"end": build.Clock.Now().String()})
}()

// set timeout for miner once
tCtx, tCtxCancel := context.WithTimeout(ctx, m.MinerOnceTimeout)
Expand Down
7 changes: 3 additions & 4 deletions node/modules/mine-recorder/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ func (d *DefaultRecorder) Query(ctx context.Context, miner address.Address, from
}
covered := coverMap(r, Records{"miner": miner.String(), "epoch": from.String()})
ret = append(ret, covered)
from++
}

if len(ret) != int(limit) {
Expand Down Expand Up @@ -227,12 +226,12 @@ func fromBytes(b []byte, v any) {
}

// mergeMap cover src onto dst, the key in dst will overwrite the key in src
func coverMap[K comparable, V any](dst, src map[K]V) map[K]V {
func coverMap[K comparable, V any](before, after map[K]V) map[K]V {
newMap := make(map[K]V)
for k, v := range dst {
for k, v := range before {
newMap[k] = v
}
for k, v := range src {
for k, v := range after {
newMap[k] = v
}
return newMap
Expand Down
64 changes: 32 additions & 32 deletions node/modules/mine-recorder/record_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package minerecorder
import (
"context"
"fmt"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -43,12 +42,16 @@ func TestDefaultRecorder(t *testing.T) {

err = r.Record(ctx, newIDAddress(1), abi.ChainEpoch(9), RecordExample)
require.NoError(t, err)
err = r.Record(ctx, newIDAddress(1), abi.ChainEpoch(10), RecordExample)
require.NoError(t, err)
err = r.Record(ctx, newIDAddress(1), abi.ChainEpoch(11), RecordExample)
require.NoError(t, err)
err = r.Record(ctx, newIDAddress(1), abi.ChainEpoch(14), RecordExample)
require.NoError(t, err)

rets, err := r.Query(ctx, newIDAddress(1), abi.ChainEpoch(5), 10)
require.NoError(t, err)
require.Equal(t, 2, len(rets))
require.Equal(t, 4, len(rets))
})

t.Run("query exceed MaxRecordPerQuery", func(t *testing.T) {
Expand Down Expand Up @@ -112,6 +115,25 @@ func TestDefaultRecorder(t *testing.T) {
})
}

func TestCoverMap(t *testing.T) {
t.Run("cover map", func(t *testing.T) {
before := Records{
"key1": "val1",
"key2": "val2",
}
after := Records{
"key1": "val4",
"key3": "val3",
}
ret := coverMap(before, after)
require.Equal(t, Records{
"key1": "val4",
"key2": "val2",
"key3": "val3",
}, ret)
})
}

func createDatastore(t testing.TB) datastore.Batching {
path := t.TempDir() + "/leveldb"
db, err := leveldb.NewDatastore(path, nil)
Expand Down Expand Up @@ -145,37 +167,15 @@ func BenchmarkQuery(b *testing.B) {
}
}

func TestRWMutex(t *testing.T) {
lk := sync.RWMutex{}

for i := 0; i < 10; i++ {
go read(&lk, i)
}
for i := 0; i < 2; i++ {
go write(&lk, i)
}
time.Sleep(time.Second * 1)
for i := 0; i < 10; i++ {
go read(&lk, i+10)
}

time.Sleep(time.Second * 10)
func TestDefer(t *testing.T) {
outputTime()
}

func read(lk *sync.RWMutex, i int) {
time.Sleep(time.Millisecond * 100 * time.Duration(i))
fmt.Println("request read lock", i, time.Now())
lk.RLock()
defer lk.RUnlock()
println("read", i, time.Now().String())
time.Sleep(time.Second * 2)
}
func outputTime() {
fmt.Println(time.Now())
defer func() {

func write(lk *sync.RWMutex, i int) {
time.Sleep(time.Millisecond * 100 * time.Duration(i))
fmt.Println("request write lock", i, time.Now())
lk.Lock()
defer lk.Unlock()
println("write", i, time.Now().String())
time.Sleep(time.Second * 2)
fmt.Println(time.Now())
}()
time.Sleep(3 * time.Second)
}