diff --git a/cmd/cayley/command/database.go b/cmd/cayley/command/database.go index 700d1b171..01ac2c748 100644 --- a/cmd/cayley/command/database.go +++ b/cmd/cayley/command/database.go @@ -111,9 +111,15 @@ func NewLoadDatabaseCmd() *cobra.Command { } defer h.Close() + qw, err := h.NewQuadWriter() + if err != nil { + return err + } + defer qw.Close() + // TODO: check read-only flag in config before that? typ, _ := cmd.Flags().GetString(flagLoadFormat) - if err = internal.Load(h.QuadWriter, quad.DefaultBatch, load, typ); err != nil { + if err = internal.Load(qw, quad.DefaultBatch, load, typ); err != nil { return err } @@ -240,10 +246,17 @@ func openForQueries(cmd *cobra.Command) (*graph.Handle, error) { load = load2 } if load != "" { + qw, err := h.NewQuadWriter() + if err != nil { + h.Close() + return nil, err + } + defer qw.Close() + typ, _ := cmd.Flags().GetString(flagLoadFormat) // TODO: check read-only flag in config before that? start := time.Now() - if err = internal.Load(h.QuadWriter, quad.DefaultBatch, load, typ); err != nil { + if err = internal.Load(qw, quad.DefaultBatch, load, typ); err != nil { h.Close() return nil, err } diff --git a/graph/gaedatastore/quadstore.go b/graph/gaedatastore/quadstore.go index 7830a5c24..760b2767c 100644 --- a/graph/gaedatastore/quadstore.go +++ b/graph/gaedatastore/quadstore.go @@ -164,6 +164,46 @@ func (qs *QuadStore) ForRequest(r *http.Request) (graph.QuadStore, error) { return &QuadStore{context: appengine.NewContext(r)}, nil } +func (qs *QuadStore) NewQuadWriter() (quad.WriteCloser, error) { + return &quadWriter{qs: qs}, nil +} + +type quadWriter struct { + qs *QuadStore + deltas []graph.Delta +} + +func (w *quadWriter) WriteQuad(q quad.Quad) error { + _, err := w.WriteQuads([]quad.Quad{q}) + return err +} + +func (w *quadWriter) WriteQuads(buf []quad.Quad) (int, error) { + // TODO(dennwc): write an optimized implementation + w.deltas = w.deltas[:0] + if cap(w.deltas) < len(buf) { + w.deltas = make([]graph.Delta, 0, len(buf)) + } + for _, q := range buf { + w.deltas = append(w.deltas, graph.Delta{ + Quad: q, Action: graph.Add, + }) + } + err := w.qs.ApplyDeltas(w.deltas, graph.IgnoreOpts{ + IgnoreDup: true, + }) + w.deltas = w.deltas[:0] + if err != nil { + return 0, err + } + return len(buf), nil +} + +func (w *quadWriter) Close() error { + w.deltas = nil + return nil +} + func (qs *QuadStore) ApplyDeltas(in []graph.Delta, ignoreOpts graph.IgnoreOpts) error { if qs.context == nil { return errors.New("No context, graph not correctly initialised") diff --git a/graph/graphmock/graphmock.go b/graph/graphmock/graphmock.go index a0be5f2d1..fdb4cb863 100644 --- a/graph/graphmock/graphmock.go +++ b/graph/graphmock/graphmock.go @@ -52,6 +52,24 @@ func (qs *Oldstore) ValueOf(s quad.Value) graph.Ref { return nil } +func (qs *Oldstore) NewQuadWriter() (quad.WriteCloser, error) { + return nopWriter{}, nil +} + +type nopWriter struct{} + +func (nopWriter) WriteQuad(q quad.Quad) error { + return nil +} + +func (nopWriter) WriteQuads(buf []quad.Quad) (int, error) { + return len(buf), nil +} + +func (nopWriter) Close() error { + return nil +} + func (qs *Oldstore) ApplyDeltas([]graph.Delta, graph.IgnoreOpts) error { return nil } func (qs *Oldstore) Quad(graph.Ref) quad.Quad { return quad.Quad{} } @@ -122,6 +140,10 @@ func (qs *Store) ValueOf(s quad.Value) graph.Ref { func (qs *Store) ApplyDeltas([]graph.Delta, graph.IgnoreOpts) error { return nil } +func (qs *Store) NewQuadWriter() (quad.WriteCloser, error) { + return nopWriter{}, nil +} + type quadValue struct { q quad.Quad } diff --git a/graph/graphtest/graphtest.go b/graph/graphtest/graphtest.go index 10f16fc92..5751542a4 100644 --- a/graph/graphtest/graphtest.go +++ b/graph/graphtest/graphtest.go @@ -2,6 +2,7 @@ package graphtest import ( "context" + "fmt" "math" "sort" "testing" @@ -71,7 +72,12 @@ func TestAll(t *testing.T, gen testutil.DatabaseFunc, conf *Config) { TestWriters(t, gen, conf) }) t.Run("1k", func(t *testing.T) { - Test1K(t, gen, conf) + t.Run("tx", func(t *testing.T) { + Test1K(t, gen, conf) + }) + t.Run("batch", func(t *testing.T) { + Test1KBatch(t, gen, conf) + }) }) t.Run("paths", func(t *testing.T) { pathtest.RunTestMorphisms(t, gen) @@ -81,9 +87,12 @@ func TestAll(t *testing.T, gen testutil.DatabaseFunc, conf *Config) { }) } -func BenchmarkAll(t *testing.B, gen testutil.DatabaseFunc, conf *Config) { - t.Run("integration", func(t *testing.B) { - BenchmarkIntegration(t, gen, conf.AlwaysRunIntegration) +func BenchmarkAll(b *testing.B, gen testutil.DatabaseFunc, conf *Config) { + b.Run("import", func(b *testing.B) { + BenchmarkImport(b, gen) + }) + b.Run("integration", func(b *testing.B) { + BenchmarkIntegration(b, gen, conf.AlwaysRunIntegration) }) } @@ -294,6 +303,31 @@ func TestLoadDupRaw(t testing.TB, gen testutil.DatabaseFunc, c *Config) { } func TestWriters(t *testing.T, gen testutil.DatabaseFunc, c *Config) { + t.Run("batch", func(t *testing.T) { + qs, _, closer := gen(t) + defer closer() + + w, err := qs.NewQuadWriter() + require.NoError(t, err) + defer w.Close() + + quads := MakeQuadSet() + q1 := quads[:len(quads)/2] + q2 := quads[len(q1):] + + n, err := w.WriteQuads(q1) + require.NoError(t, err) + require.Equal(t, len(q1), n) + + n, err = w.WriteQuads(q2) + require.NoError(t, err) + require.Equal(t, len(q2), n) + + err = w.Close() + require.NoError(t, err) + + ExpectIteratedQuads(t, qs, qs.QuadsAllIterator(), quads, true) + }) for _, mis := range []bool{false, true} { for _, dup := range []bool{false, true} { name := []byte("__") @@ -393,6 +427,36 @@ func Test1K(t *testing.T, gen testutil.DatabaseFunc, c *Config) { ExpectIteratedQuads(t, qs, qs.QuadsAllIterator(), exp, true) } +func Test1KBatch(t *testing.T, gen testutil.DatabaseFunc, c *Config) { + qs, _, closer := gen(t) + defer closer() + + pg := c.PageSize + if pg == 0 { + pg = 100 + } + n := pg*3 + 1 + + exp := make([]quad.Quad, 0, n) + for i := 0; i < n; i++ { + q := quad.Make(i, i, i, nil) + exp = append(exp, q) + } + + qw, err := qs.NewQuadWriter() + require.NoError(t, err) + defer qw.Close() + + n, err = qw.WriteQuads(exp) + require.NoError(t, err) + require.Equal(t, len(exp), n) + + err = qw.Close() + require.NoError(t, err) + + ExpectIteratedQuads(t, qs, qs.QuadsAllIterator(), exp, true) +} + type ValueSizer interface { SizeOf(graph.Ref) int64 } @@ -1136,3 +1200,53 @@ func TestDeleteReinserted(t testing.TB, gen testutil.DatabaseFunc, _ *Config) { } } + +func irif(format string, args ...interface{}) quad.IRI { + return quad.IRI(fmt.Sprintf(format, args...)) +} + +func BenchmarkImport(b *testing.B, gen testutil.DatabaseFunc) { + b.StopTimer() + + qs, _, closer := gen(b) + defer closer() + + w, err := qs.NewQuadWriter() + require.NoError(b, err) + defer w.Close() + + const ( + mult = 10 + perBatch = 100 + ) + + quads := make([]quad.Quad, 0, mult*b.N) + for i := 0; i < mult*b.N; i++ { + quads = append(quads, quad.Quad{ + Subject: irif("n%d", i/5), + Predicate: quad.IRI("sub"), + Object: irif("n%d", i/2+i%2), + }) + } + + b.ResetTimer() + b.StartTimer() + for len(quads) > 0 { + batch := quads + if len(batch) > perBatch { + batch = batch[:perBatch] + } + n, err := w.WriteQuads(batch) + if err != nil { + b.Fatal(err) + } else if n != len(batch) { + b.Fatal(n) + } + quads = quads[len(batch):] + } + err = w.Close() + if err != nil { + b.Fatal(err) + } + b.StopTimer() +} diff --git a/graph/graphtest/integration.go b/graph/graphtest/integration.go index 7855712b7..9bc98c893 100644 --- a/graph/graphtest/integration.go +++ b/graph/graphtest/integration.go @@ -58,10 +58,10 @@ func checkIntegration(t testing.TB, force bool) { func TestIntegration(t *testing.T, gen testutil.DatabaseFunc, force bool) { checkIntegration(t, force) - h, closer := prepare(t, gen) + qs, closer := prepare(t, gen) defer closer() - checkQueries(t, h, timeout) + checkQueries(t, qs, timeout) } func BenchmarkIntegration(t *testing.B, gen testutil.DatabaseFunc, force bool) { @@ -460,23 +460,20 @@ var m1_actors = movie1.Save("","movie1").Follow(filmToActor) var m2_actors = movie2.Save("","movie2").Follow(filmToActor) ` -func prepare(t testing.TB, gen testutil.DatabaseFunc) (*graph.Handle, func()) { +func prepare(t testing.TB, gen testutil.DatabaseFunc) (graph.QuadStore, func()) { qs, _, closer := gen(t) - qw, err := graph.NewQuadWriter("single", qs, nil) - if err != nil { - closer() - require.NoError(t, err) - } - - h := &graph.Handle{QuadStore: qs, QuadWriter: qw} - const needsLoad = true // TODO: support local setup if needsLoad { + qw, err := qs.NewQuadWriter() + if err != nil { + closer() + require.NoError(t, err) + } + start := time.Now() - var err error for _, p := range []string{"./", "../"} { - err = internal.Load(h.QuadWriter, 0, filepath.Join(p, "../../data/30kmoviedata.nq.gz"), format) + err = internal.Load(qw, 0, filepath.Join(p, "../../data/30kmoviedata.nq.gz"), format) if err == nil || !os.IsNotExist(err) { break } @@ -486,16 +483,18 @@ func prepare(t testing.TB, gen testutil.DatabaseFunc) (*graph.Handle, func()) { closer() require.NoError(t, err) } + err = qw.Close() + if err != nil { + closer() + require.NoError(t, err) + } t.Logf("loaded data in %v", time.Since(start)) } - return h, func() { - qw.Close() - closer() - } + return qs, closer } -func checkQueries(t *testing.T, h *graph.Handle, timeout time.Duration) { - if h == nil { +func checkQueries(t *testing.T, qs graph.QuadStore, timeout time.Duration) { + if qs == nil { t.Fatal("not initialized") } for _, test := range queries { @@ -507,7 +506,7 @@ func checkQueries(t *testing.T, h *graph.Handle, timeout time.Duration) { t.SkipNow() } start := time.Now() - ses := gizmo.NewSession(h.QuadStore) + ses := gizmo.NewSession(qs) c := make(chan query.Result, 5) ctx := context.Background() if timeout > 0 { @@ -570,7 +569,7 @@ func convertToStringList(in []interface{}) []string { } func benchmarkQueries(b *testing.B, gen testutil.DatabaseFunc) { - h, closer := prepare(b, gen) + qs, closer := prepare(b, gen) defer closer() for _, bench := range queries { @@ -587,7 +586,7 @@ func benchmarkQueries(b *testing.B, gen testutil.DatabaseFunc) { if timeout > 0 { ctx, cancel = context.WithTimeout(ctx, timeout) } - ses := gizmo.NewSession(h.QuadStore) + ses := gizmo.NewSession(qs) b.StartTimer() go ses.Execute(ctx, bench.query, c, -1) n := 0 diff --git a/graph/kv/indexing.go b/graph/kv/indexing.go index f50a59edd..5b926dcc9 100644 --- a/graph/kv/indexing.go +++ b/graph/kv/indexing.go @@ -334,21 +334,102 @@ func (qs *QuadStore) decNodes(ctx context.Context, tx kv.Tx, deltas []graphlog.N return nil } -func (qs *QuadStore) ApplyDeltas(in []graph.Delta, ignoreOpts graph.IgnoreOpts) error { +func (qs *QuadStore) NewQuadWriter() (quad.WriteCloser, error) { + return &quadWriter{qs: qs}, nil +} + +type quadWriter struct { + qs *QuadStore + tx kv.Tx + err error + n int +} + +func (w *quadWriter) WriteQuad(q quad.Quad) error { + _, err := w.WriteQuads([]quad.Quad{q}) + return err +} + +func (w *quadWriter) flush() error { + w.n = 0 ctx := context.TODO() - qs.writer.Lock() - defer qs.writer.Unlock() - tx, err := qs.db.Tx(true) + if err := w.qs.flushMapBucket(ctx, w.tx); err != nil { + w.err = err + return err + } + if err := w.tx.Commit(ctx); err != nil { + w.qs.writer.Unlock() + w.tx = nil + w.err = err + return err + } + tx, err := w.qs.db.Tx(true) if err != nil { + w.qs.writer.Unlock() + w.err = err return err } - defer tx.Close() + w.tx = tx + return nil +} + +func (w *quadWriter) WriteQuads(buf []quad.Quad) (int, error) { + if w.tx == nil { + w.qs.writer.Lock() + tx, err := w.qs.db.Tx(true) + if err != nil { + w.qs.writer.Unlock() + w.err = err + return 0, err + } + w.tx = tx + } + deltas := graphlog.InsertQuads(buf) + if _, err := w.qs.applyAddDeltas(w.tx, nil, deltas, graph.IgnoreOpts{IgnoreDup: true}); err != nil { + w.err = err + return 0, err + } + w.n += len(buf) + if w.n >= quad.DefaultBatch*20 { + if err := w.flush(); err != nil { + return 0, err + } + } + return len(buf), nil +} + +func (w *quadWriter) Close() error { + if w.tx == nil { + return w.err + } + defer w.qs.writer.Unlock() + + if w.err != nil { + _ = w.tx.Close() + w.tx = nil + return w.err + } + + ctx := context.TODO() + // flush quad indexes and commit + err := w.qs.flushMapBucket(ctx, w.tx) + if err != nil { + _ = w.tx.Close() + w.tx = nil + return err + } + err = w.tx.Commit(ctx) + w.tx = nil + return err +} + +func (qs *QuadStore) applyAddDeltas(tx kv.Tx, in []graph.Delta, deltas *graphlog.Deltas, ignoreOpts graph.IgnoreOpts) (map[graph.ValueHash]resolvedNode, error) { + ctx := context.TODO() - deltas := graphlog.SplitDeltas(in) // first add all new nodes nodes, err := qs.incNodes(ctx, tx, deltas.IncNode) if err != nil { - return err + return nil, err } deltas.IncNode = nil // resolve and insert all new quads @@ -374,13 +455,17 @@ func (qs *QuadStore) ApplyDeltas(in []graph.Delta, ignoreOpts graph.IgnoreOpts) if !mustBeNew { p, err := qs.hasPrimitive(ctx, tx, &link, false) if err != nil { - return err + return nil, err } if p != nil { if ignoreOpts.IgnoreDup { continue // already exists, no need to insert } - return &graph.DeltaError{Delta: in[q.Ind], Err: graph.ErrQuadExists} + err = graph.ErrQuadExists + if len(in) != 0 { + return nil, &graph.DeltaError{Delta: in[q.Ind], Err: err} + } + return nil, err } } links = append(links, link) @@ -390,18 +475,37 @@ func (qs *QuadStore) ApplyDeltas(in []graph.Delta, ignoreOpts graph.IgnoreOpts) qstart, err := qs.genIDs(ctx, tx, len(links)) if err != nil { - return err + return nil, err } for i := range links { links[i].ID = qstart + uint64(i) links[i].Timestamp = time.Now().UnixNano() } if err := qs.indexLinks(ctx, tx, links); err != nil { + return nil, err + } + return nodes, nil +} + +func (qs *QuadStore) ApplyDeltas(in []graph.Delta, ignoreOpts graph.IgnoreOpts) error { + ctx := context.TODO() + qs.writer.Lock() + defer qs.writer.Unlock() + tx, err := qs.db.Tx(true) + if err != nil { + return err + } + defer tx.Close() + + deltas := graphlog.SplitDeltas(in) + + nodes, err := qs.applyAddDeltas(tx, in, deltas, ignoreOpts) + if err != nil { return err } - links = links[:0] if len(deltas.QuadDel) != 0 || len(deltas.DecNode) != 0 { + links := make([]proto.Primitive, 0, len(deltas.QuadDel)) // resolve all nodes that will be removed dnodes := make(map[graph.ValueHash]uint64, len(deltas.DecNode)) if err := qs.resolveValDeltas(ctx, tx, deltas.DecNode, func(i int, id uint64) { diff --git a/graph/log/graphlog.go b/graph/log/graphlog.go index 0dcde7a47..8fe0ff498 100644 --- a/graph/log/graphlog.go +++ b/graph/log/graphlog.go @@ -40,6 +40,41 @@ type Deltas struct { QuadDel []QuadUpdate } +func InsertQuads(in []quad.Quad) *Deltas { + hnodes := make(map[graph.ValueHash]*NodeUpdate, len(in)*2) + quadAdd := make([]QuadUpdate, 0, len(in)) + for i, qd := range in { + var q graph.QuadHash + for _, dir := range quad.Directions { + v := qd.Get(dir) + if v == nil { + continue + } + h := graph.HashOf(v) + q.Set(dir, h) + n := hnodes[h] + if n == nil { + n = &NodeUpdate{Hash: h, Val: v} + hnodes[h] = n + } + n.RefInc++ + } + quadAdd = append(quadAdd, QuadUpdate{Ind: i, Quad: q}) + } + incNodes := make([]NodeUpdate, 0, len(hnodes)) + for _, n := range hnodes { + incNodes = append(incNodes, *n) + } + hnodes = nil + sort.Slice(incNodes, func(i, j int) bool { + return bytes.Compare(incNodes[i].Hash[:], incNodes[j].Hash[:]) < 0 + }) + return &Deltas{ + IncNode: incNodes, + QuadAdd: quadAdd, + } +} + func SplitDeltas(in []graph.Delta) *Deltas { hnodes := make(map[graph.ValueHash]*NodeUpdate, len(in)*2) quadAdd := make([]QuadUpdate, 0, len(in)) diff --git a/graph/memstore/quadstore.go b/graph/memstore/quadstore.go index 7134df8b0..82018cc3e 100644 --- a/graph/memstore/quadstore.go +++ b/graph/memstore/quadstore.go @@ -313,6 +313,30 @@ func (qs *QuadStore) WriteQuads(buf []quad.Quad) (int, error) { return len(buf), nil } +func (qs *QuadStore) NewQuadWriter() (quad.WriteCloser, error) { + return &quadWriter{qs: qs}, nil +} + +type quadWriter struct { + qs *QuadStore +} + +func (w *quadWriter) WriteQuad(q quad.Quad) error { + w.qs.AddQuad(q) + return nil +} + +func (w *quadWriter) WriteQuads(buf []quad.Quad) (int, error) { + for _, q := range buf { + w.qs.AddQuad(q) + } + return len(buf), nil +} + +func (w *quadWriter) Close() error { + return nil +} + func (qs *QuadStore) deleteQuadNodes(q internalQuad) { for dir := quad.Subject; dir <= quad.Label; dir++ { id := q.Dir(dir) diff --git a/graph/memstore/quadstore_test.go b/graph/memstore/quadstore_test.go index 3883c4f64..177457286 100644 --- a/graph/memstore/quadstore_test.go +++ b/graph/memstore/quadstore_test.go @@ -88,6 +88,14 @@ func TestMemstore(t *testing.T) { }) } +func BenchmarkMemstore(b *testing.B) { + graphtest.BenchmarkAll(b, func(t testing.TB) (graph.QuadStore, graph.Options, func()) { + return New(), nil, func() {} + }, &graphtest.Config{ + AlwaysRunIntegration: true, + }) +} + type pair struct { query string value int64 diff --git a/graph/nosql/quadstore.go b/graph/nosql/quadstore.go index ed3837664..900cae743 100644 --- a/graph/nosql/quadstore.go +++ b/graph/nosql/quadstore.go @@ -331,6 +331,46 @@ func (qs *QuadStore) appendLog(ctx context.Context, deltas []graph.Delta) ([]nos return w.Keys(), err } +func (qs *QuadStore) NewQuadWriter() (quad.WriteCloser, error) { + return &quadWriter{qs: qs}, nil +} + +type quadWriter struct { + qs *QuadStore + deltas []graph.Delta +} + +func (w *quadWriter) WriteQuad(q quad.Quad) error { + _, err := w.WriteQuads([]quad.Quad{q}) + return err +} + +func (w *quadWriter) WriteQuads(buf []quad.Quad) (int, error) { + // TODO(dennwc): write an optimized implementation + w.deltas = w.deltas[:0] + if cap(w.deltas) < len(buf) { + w.deltas = make([]graph.Delta, 0, len(buf)) + } + for _, q := range buf { + w.deltas = append(w.deltas, graph.Delta{ + Quad: q, Action: graph.Add, + }) + } + err := w.qs.ApplyDeltas(w.deltas, graph.IgnoreOpts{ + IgnoreDup: true, + }) + w.deltas = w.deltas[:0] + if err != nil { + return 0, err + } + return len(buf), nil +} + +func (w *quadWriter) Close() error { + w.deltas = nil + return nil +} + func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreOpts graph.IgnoreOpts) error { ctx := context.TODO() ids := make(map[quad.Value]int) diff --git a/graph/quadstore.go b/graph/quadstore.go index 7dc09ce2a..ab26725bd 100644 --- a/graph/quadstore.go +++ b/graph/quadstore.go @@ -112,6 +112,10 @@ type QuadStore interface { // is done by a replication strategy. ApplyDeltas(in []Delta, opts IgnoreOpts) error + // NewQuadWriter starts a batch quad import process. + // The order of changes is not guaranteed, neither is the order and result of concurrent ApplyDeltas. + NewQuadWriter() (quad.WriteCloser, error) + // Returns an iterator enumerating all nodes in the graph. NodesAllIterator() Iterator diff --git a/graph/shape/shape_test.go b/graph/shape/shape_test.go index a089a5bf2..c154d064f 100644 --- a/graph/shape/shape_test.go +++ b/graph/shape/shape_test.go @@ -42,6 +42,10 @@ func (qs ValLookup) OptimizeShape(s Shape) (Shape, bool) { func (qs ValLookup) ValueOf(v quad.Value) graph.Ref { return qs[v] } + +func (ValLookup) NewQuadWriter() (quad.WriteCloser, error) { + panic("not implemented") +} func (ValLookup) ApplyDeltas(_ []graph.Delta, _ graph.IgnoreOpts) error { panic("not implemented") } diff --git a/graph/sql/quadstore.go b/graph/sql/quadstore.go index 869f02fe4..6f834383b 100644 --- a/graph/sql/quadstore.go +++ b/graph/sql/quadstore.go @@ -374,6 +374,46 @@ func NodeValues(h NodeHash, v quad.Value) (ValueType, []interface{}, error) { return nodeKey, values, nil } +func (qs *QuadStore) NewQuadWriter() (quad.WriteCloser, error) { + return &quadWriter{qs: qs}, nil +} + +type quadWriter struct { + qs *QuadStore + deltas []graph.Delta +} + +func (w *quadWriter) WriteQuad(q quad.Quad) error { + _, err := w.WriteQuads([]quad.Quad{q}) + return err +} + +func (w *quadWriter) WriteQuads(buf []quad.Quad) (int, error) { + // TODO(dennwc): write an optimized implementation + w.deltas = w.deltas[:0] + if cap(w.deltas) < len(buf) { + w.deltas = make([]graph.Delta, 0, len(buf)) + } + for _, q := range buf { + w.deltas = append(w.deltas, graph.Delta{ + Quad: q, Action: graph.Add, + }) + } + err := w.qs.ApplyDeltas(w.deltas, graph.IgnoreOpts{ + IgnoreDup: true, + }) + w.deltas = w.deltas[:0] + if err != nil { + return 0, err + } + return len(buf), nil +} + +func (w *quadWriter) Close() error { + w.deltas = nil + return nil +} + func (qs *QuadStore) ApplyDeltas(in []graph.Delta, opts graph.IgnoreOpts) error { // first calculate values ref deltas deltas := graphlog.SplitDeltas(in) diff --git a/internal/load.go b/internal/load.go index 5fe4aba47..a78e82237 100644 --- a/internal/load.go +++ b/internal/load.go @@ -10,7 +10,6 @@ import ( "strings" "github.com/cayleygraph/cayley/clog" - "github.com/cayleygraph/cayley/graph" "github.com/cayleygraph/cayley/internal/decompressor" "github.com/cayleygraph/cayley/quad" "github.com/cayleygraph/cayley/quad/nquads" @@ -18,8 +17,8 @@ import ( // Load loads a graph from the given path and write it to qw. See // DecompressAndLoad for more information. -func Load(qw graph.QuadWriter, batch int, path, typ string) error { - return DecompressAndLoad(qw, batch, path, typ, nil) +func Load(qw quad.WriteCloser, batch int, path, typ string) error { + return DecompressAndLoad(qw, batch, path, typ) } type readCloser struct { @@ -121,7 +120,7 @@ func QuadReaderFor(path, typ string) (quad.ReadCloser, error) { // DecompressAndLoad will load or fetch a graph from the given path, decompress // it, and then call the given load function to process the decompressed graph. // If no loadFn is provided, db.Load is called. -func DecompressAndLoad(qw graph.QuadWriter, batch int, path, typ string, writerFunc func(graph.QuadWriter) graph.BatchWriter) error { +func DecompressAndLoad(qw quad.WriteCloser, batch int, path, typ string) error { if path == "" { return nil } @@ -131,16 +130,11 @@ func DecompressAndLoad(qw graph.QuadWriter, batch int, path, typ string, writerF } defer qr.Close() - if writerFunc == nil { - writerFunc = graph.NewWriter - } - dest := writerFunc(qw) - - _, err = quad.CopyBatch(&batchLogger{w: dest}, qr, batch) + _, err = quad.CopyBatch(&batchLogger{w: qw}, qr, batch) if err != nil { return fmt.Errorf("db: failed to load data: %v", err) } - return dest.Close() + return qw.Close() } type batchLogger struct {