Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize batch import to KV backends #803

Merged
merged 5 commits into from
Jul 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions cmd/cayley/command/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,15 @@ func NewLoadDatabaseCmd() *cobra.Command {
}
defer h.Close()

qw, err := h.NewQuadWriter()
if err != nil {
return err
}
defer qw.Close()

// TODO: check read-only flag in config before that?
typ, _ := cmd.Flags().GetString(flagLoadFormat)
if err = internal.Load(h.QuadWriter, quad.DefaultBatch, load, typ); err != nil {
if err = internal.Load(qw, quad.DefaultBatch, load, typ); err != nil {
return err
}

Expand Down Expand Up @@ -240,10 +246,17 @@ func openForQueries(cmd *cobra.Command) (*graph.Handle, error) {
load = load2
}
if load != "" {
qw, err := h.NewQuadWriter()
if err != nil {
h.Close()
return nil, err
}
defer qw.Close()

typ, _ := cmd.Flags().GetString(flagLoadFormat)
// TODO: check read-only flag in config before that?
start := time.Now()
if err = internal.Load(h.QuadWriter, quad.DefaultBatch, load, typ); err != nil {
if err = internal.Load(qw, quad.DefaultBatch, load, typ); err != nil {
h.Close()
return nil, err
}
Expand Down
40 changes: 40 additions & 0 deletions graph/gaedatastore/quadstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,46 @@ func (qs *QuadStore) ForRequest(r *http.Request) (graph.QuadStore, error) {
return &QuadStore{context: appengine.NewContext(r)}, nil
}

func (qs *QuadStore) NewQuadWriter() (quad.WriteCloser, error) {
return &quadWriter{qs: qs}, nil
}

type quadWriter struct {
qs *QuadStore
deltas []graph.Delta
}

func (w *quadWriter) WriteQuad(q quad.Quad) error {
_, err := w.WriteQuads([]quad.Quad{q})
return err
}

func (w *quadWriter) WriteQuads(buf []quad.Quad) (int, error) {
// TODO(dennwc): write an optimized implementation
w.deltas = w.deltas[:0]
if cap(w.deltas) < len(buf) {
w.deltas = make([]graph.Delta, 0, len(buf))
}
for _, q := range buf {
w.deltas = append(w.deltas, graph.Delta{
Quad: q, Action: graph.Add,
})
}
err := w.qs.ApplyDeltas(w.deltas, graph.IgnoreOpts{
IgnoreDup: true,
})
w.deltas = w.deltas[:0]
if err != nil {
return 0, err
}
return len(buf), nil
}

func (w *quadWriter) Close() error {
w.deltas = nil
return nil
}

func (qs *QuadStore) ApplyDeltas(in []graph.Delta, ignoreOpts graph.IgnoreOpts) error {
if qs.context == nil {
return errors.New("No context, graph not correctly initialised")
Expand Down
22 changes: 22 additions & 0 deletions graph/graphmock/graphmock.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,24 @@ func (qs *Oldstore) ValueOf(s quad.Value) graph.Ref {
return nil
}

func (qs *Oldstore) NewQuadWriter() (quad.WriteCloser, error) {
return nopWriter{}, nil
}

type nopWriter struct{}

func (nopWriter) WriteQuad(q quad.Quad) error {
return nil
}

func (nopWriter) WriteQuads(buf []quad.Quad) (int, error) {
return len(buf), nil
}

func (nopWriter) Close() error {
return nil
}

func (qs *Oldstore) ApplyDeltas([]graph.Delta, graph.IgnoreOpts) error { return nil }

func (qs *Oldstore) Quad(graph.Ref) quad.Quad { return quad.Quad{} }
Expand Down Expand Up @@ -122,6 +140,10 @@ func (qs *Store) ValueOf(s quad.Value) graph.Ref {

func (qs *Store) ApplyDeltas([]graph.Delta, graph.IgnoreOpts) error { return nil }

func (qs *Store) NewQuadWriter() (quad.WriteCloser, error) {
return nopWriter{}, nil
}

type quadValue struct {
q quad.Quad
}
Expand Down
122 changes: 118 additions & 4 deletions graph/graphtest/graphtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package graphtest

import (
"context"
"fmt"
"math"
"sort"
"testing"
Expand Down Expand Up @@ -71,7 +72,12 @@ func TestAll(t *testing.T, gen testutil.DatabaseFunc, conf *Config) {
TestWriters(t, gen, conf)
})
t.Run("1k", func(t *testing.T) {
Test1K(t, gen, conf)
t.Run("tx", func(t *testing.T) {
Test1K(t, gen, conf)
})
t.Run("batch", func(t *testing.T) {
Test1KBatch(t, gen, conf)
})
})
t.Run("paths", func(t *testing.T) {
pathtest.RunTestMorphisms(t, gen)
Expand All @@ -81,9 +87,12 @@ func TestAll(t *testing.T, gen testutil.DatabaseFunc, conf *Config) {
})
}

func BenchmarkAll(t *testing.B, gen testutil.DatabaseFunc, conf *Config) {
t.Run("integration", func(t *testing.B) {
BenchmarkIntegration(t, gen, conf.AlwaysRunIntegration)
func BenchmarkAll(b *testing.B, gen testutil.DatabaseFunc, conf *Config) {
b.Run("import", func(b *testing.B) {
BenchmarkImport(b, gen)
})
b.Run("integration", func(b *testing.B) {
BenchmarkIntegration(b, gen, conf.AlwaysRunIntegration)
})
}

Expand Down Expand Up @@ -294,6 +303,31 @@ func TestLoadDupRaw(t testing.TB, gen testutil.DatabaseFunc, c *Config) {
}

func TestWriters(t *testing.T, gen testutil.DatabaseFunc, c *Config) {
t.Run("batch", func(t *testing.T) {
qs, _, closer := gen(t)
defer closer()

w, err := qs.NewQuadWriter()
require.NoError(t, err)
defer w.Close()

quads := MakeQuadSet()
q1 := quads[:len(quads)/2]
q2 := quads[len(q1):]

n, err := w.WriteQuads(q1)
require.NoError(t, err)
require.Equal(t, len(q1), n)

n, err = w.WriteQuads(q2)
require.NoError(t, err)
require.Equal(t, len(q2), n)

err = w.Close()
require.NoError(t, err)

ExpectIteratedQuads(t, qs, qs.QuadsAllIterator(), quads, true)
})
for _, mis := range []bool{false, true} {
for _, dup := range []bool{false, true} {
name := []byte("__")
Expand Down Expand Up @@ -393,6 +427,36 @@ func Test1K(t *testing.T, gen testutil.DatabaseFunc, c *Config) {
ExpectIteratedQuads(t, qs, qs.QuadsAllIterator(), exp, true)
}

func Test1KBatch(t *testing.T, gen testutil.DatabaseFunc, c *Config) {
qs, _, closer := gen(t)
defer closer()

pg := c.PageSize
if pg == 0 {
pg = 100
}
n := pg*3 + 1

exp := make([]quad.Quad, 0, n)
for i := 0; i < n; i++ {
q := quad.Make(i, i, i, nil)
exp = append(exp, q)
}

qw, err := qs.NewQuadWriter()
require.NoError(t, err)
defer qw.Close()

n, err = qw.WriteQuads(exp)
require.NoError(t, err)
require.Equal(t, len(exp), n)

err = qw.Close()
require.NoError(t, err)

ExpectIteratedQuads(t, qs, qs.QuadsAllIterator(), exp, true)
}

type ValueSizer interface {
SizeOf(graph.Ref) int64
}
Expand Down Expand Up @@ -1136,3 +1200,53 @@ func TestDeleteReinserted(t testing.TB, gen testutil.DatabaseFunc, _ *Config) {
}

}

func irif(format string, args ...interface{}) quad.IRI {
return quad.IRI(fmt.Sprintf(format, args...))
}

func BenchmarkImport(b *testing.B, gen testutil.DatabaseFunc) {
b.StopTimer()

qs, _, closer := gen(b)
defer closer()

w, err := qs.NewQuadWriter()
require.NoError(b, err)
defer w.Close()

const (
mult = 10
perBatch = 100
)

quads := make([]quad.Quad, 0, mult*b.N)
for i := 0; i < mult*b.N; i++ {
quads = append(quads, quad.Quad{
Subject: irif("n%d", i/5),
Predicate: quad.IRI("sub"),
Object: irif("n%d", i/2+i%2),
})
}

b.ResetTimer()
b.StartTimer()
for len(quads) > 0 {
batch := quads
if len(batch) > perBatch {
batch = batch[:perBatch]
}
n, err := w.WriteQuads(batch)
if err != nil {
b.Fatal(err)
} else if n != len(batch) {
b.Fatal(n)
}
quads = quads[len(batch):]
}
err = w.Close()
if err != nil {
b.Fatal(err)
}
b.StopTimer()
}
43 changes: 21 additions & 22 deletions graph/graphtest/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ func checkIntegration(t testing.TB, force bool) {

func TestIntegration(t *testing.T, gen testutil.DatabaseFunc, force bool) {
checkIntegration(t, force)
h, closer := prepare(t, gen)
qs, closer := prepare(t, gen)
defer closer()

checkQueries(t, h, timeout)
checkQueries(t, qs, timeout)
}

func BenchmarkIntegration(t *testing.B, gen testutil.DatabaseFunc, force bool) {
Expand Down Expand Up @@ -460,23 +460,20 @@ var m1_actors = movie1.Save("<name>","movie1").Follow(filmToActor)
var m2_actors = movie2.Save("<name>","movie2").Follow(filmToActor)
`

func prepare(t testing.TB, gen testutil.DatabaseFunc) (*graph.Handle, func()) {
func prepare(t testing.TB, gen testutil.DatabaseFunc) (graph.QuadStore, func()) {
qs, _, closer := gen(t)

qw, err := graph.NewQuadWriter("single", qs, nil)
if err != nil {
closer()
require.NoError(t, err)
}

h := &graph.Handle{QuadStore: qs, QuadWriter: qw}

const needsLoad = true // TODO: support local setup
if needsLoad {
qw, err := qs.NewQuadWriter()
if err != nil {
closer()
require.NoError(t, err)
}

start := time.Now()
var err error
for _, p := range []string{"./", "../"} {
err = internal.Load(h.QuadWriter, 0, filepath.Join(p, "../../data/30kmoviedata.nq.gz"), format)
err = internal.Load(qw, 0, filepath.Join(p, "../../data/30kmoviedata.nq.gz"), format)
if err == nil || !os.IsNotExist(err) {
break
}
Expand All @@ -486,16 +483,18 @@ func prepare(t testing.TB, gen testutil.DatabaseFunc) (*graph.Handle, func()) {
closer()
require.NoError(t, err)
}
err = qw.Close()
if err != nil {
closer()
require.NoError(t, err)
}
t.Logf("loaded data in %v", time.Since(start))
}
return h, func() {
qw.Close()
closer()
}
return qs, closer
}

func checkQueries(t *testing.T, h *graph.Handle, timeout time.Duration) {
if h == nil {
func checkQueries(t *testing.T, qs graph.QuadStore, timeout time.Duration) {
if qs == nil {
t.Fatal("not initialized")
}
for _, test := range queries {
Expand All @@ -507,7 +506,7 @@ func checkQueries(t *testing.T, h *graph.Handle, timeout time.Duration) {
t.SkipNow()
}
start := time.Now()
ses := gizmo.NewSession(h.QuadStore)
ses := gizmo.NewSession(qs)
c := make(chan query.Result, 5)
ctx := context.Background()
if timeout > 0 {
Expand Down Expand Up @@ -570,7 +569,7 @@ func convertToStringList(in []interface{}) []string {
}

func benchmarkQueries(b *testing.B, gen testutil.DatabaseFunc) {
h, closer := prepare(b, gen)
qs, closer := prepare(b, gen)
defer closer()

for _, bench := range queries {
Expand All @@ -587,7 +586,7 @@ func benchmarkQueries(b *testing.B, gen testutil.DatabaseFunc) {
if timeout > 0 {
ctx, cancel = context.WithTimeout(ctx, timeout)
}
ses := gizmo.NewSession(h.QuadStore)
ses := gizmo.NewSession(qs)
b.StartTimer()
go ses.Execute(ctx, bench.query, c, -1)
n := 0
Expand Down
Loading