diff --git a/posting/mvcc.go b/posting/mvcc.go index d228ad10f0d..c052422ff6e 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -235,7 +235,7 @@ func (txn *Txn) addConflictKey(conflictKey uint64) { } // FillContext updates the given transaction context with data from this transaction. -func (txn *Txn) FillContext(ctx *api.TxnContext, gid uint32) { +func (txn *Txn) FillContext(ctx *api.TxnContext, gid uint32, isErrored bool) { txn.Lock() ctx.StartTs = txn.StartTs @@ -249,7 +249,12 @@ func (txn *Txn) FillContext(ctx *api.TxnContext, gid uint32) { ctx.Keys = x.Unique(ctx.Keys) txn.Unlock() - txn.Update() + // If the trasnaction has errored out, we don't need to update it, as these values will never be read. + // Sometimes, the transaction might have failed due to timeout. If we let this trasnactino update, there + // could be deadlock with the running transaction. + if !isErrored { + txn.Update() + } txn.cache.fillPreds(ctx, gid) } diff --git a/query/vector/vector_test.go b/query/vector/vector_test.go index 7ad9dab6091..2bd4554314e 100644 --- a/query/vector/vector_test.go +++ b/query/vector/vector_test.go @@ -29,6 +29,7 @@ import ( "github.com/dgraph-io/dgo/v230/protos/api" "github.com/dgraph-io/dgraph/dgraphtest" + "github.com/dgraph-io/dgraph/x" "github.com/stretchr/testify/require" ) @@ -432,6 +433,48 @@ func TestVectorsMutateFixedLengthWithDiffrentIndexes(t *testing.T) { dropPredicate("vtest") } +func TestVectorDeadlockwithTimeout(t *testing.T) { + pred := "vtest1" + dc = dgraphtest.NewComposeCluster() + var cleanup func() + client, cleanup, err := dc.Client() + x.Panic(err) + defer cleanup() + + for i := 0; i < 5; i++ { + fmt.Println("Testing iteration: ", i) + ctx, cancel2 := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel2() + err = client.LoginIntoNamespace(ctx, dgraphtest.DefaultUser, + dgraphtest.DefaultPassword, x.GalaxyNamespace) + require.NoError(t, err) + + err = client.Alter(context.Background(), &api.Operation{ + DropAttr: pred, + }) + dropPredicate(pred) + setSchema(fmt.Sprintf(vectorSchemaWithIndex, pred, "4", "euclidian")) + numVectors := 1000 + vectorSize := 10 + + randomVectors, _ := generateRandomVectors(numVectors, vectorSize, pred) + + txn := client.NewTxn() + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer func() { _ = txn.Discard(ctx) }() + defer cancel() + + _, err = txn.Mutate(ctx, &api.Mutation{ + SetNquads: []byte(randomVectors), + CommitNow: true, + }) + require.Error(t, err) + + err = txn.Commit(ctx) + require.Contains(t, err.Error(), "Transaction has already been committed or discarded") + } +} + func TestVectorMutateDiffrentLengthWithDiffrentIndexes(t *testing.T) { dropPredicate("vtest") diff --git a/systest/vector/backup_test.go b/systest/vector/backup_test.go index f11eb8b75ff..f1706e44b0b 100644 --- a/systest/vector/backup_test.go +++ b/systest/vector/backup_test.go @@ -235,3 +235,54 @@ func TestVectorBackupRestoreDropIndex(t *testing.T) { } } } + +func TestVectorBackupRestoreReIndexing(t *testing.T) { + conf := dgraphtest.NewClusterConfig().WithNumAlphas(1).WithNumZeros(1).WithReplicas(1).WithACL(time.Hour) + c, err := dgraphtest.NewLocalCluster(conf) + require.NoError(t, err) + defer func() { c.Cleanup(t.Failed()) }() + require.NoError(t, c.Start()) + + gc, cleanup, err := c.Client() + require.NoError(t, err) + defer cleanup() + require.NoError(t, gc.LoginIntoNamespace(context.Background(), + dgraphtest.DefaultUser, dgraphtest.DefaultPassword, x.GalaxyNamespace)) + + hc, err := c.HTTPClient() + require.NoError(t, err) + require.NoError(t, hc.LoginIntoNamespace(dgraphtest.DefaultUser, + dgraphtest.DefaultPassword, x.GalaxyNamespace)) + + require.NoError(t, gc.SetupSchema(testSchema)) + + numVectors := 1000 + pred := "project_discription_v" + rdfs, vectors := dgraphtest.GenerateRandomVectors(0, numVectors, 10, pred) + + mu := &api.Mutation{SetNquads: []byte(rdfs), CommitNow: true} + _, err = gc.Mutate(mu) + require.NoError(t, err) + + t.Log("taking backup \n") + require.NoError(t, hc.Backup(c, false, dgraphtest.DefaultBackupDir)) + + rdfs2, vectors2 := dgraphtest.GenerateRandomVectors(numVectors, numVectors+300, 10, pred) + + mu = &api.Mutation{SetNquads: []byte(rdfs2), CommitNow: true} + _, err = gc.Mutate(mu) + require.NoError(t, err) + t.Log("restoring backup \n") + require.NoError(t, hc.Restore(c, dgraphtest.DefaultBackupDir, "", 2, 1)) + require.NoError(t, dgraphtest.WaitForRestore(c)) + + for i := 0; i < 5; i++ { + // drop index + require.NoError(t, gc.SetupSchema(testSchemaWithoutIndex)) + // add index + require.NoError(t, gc.SetupSchema(testSchema)) + } + vectors = append(vectors, vectors2...) + rdfs = rdfs + rdfs2 + testVectorQuery(t, gc, vectors, rdfs, pred, numVectors) +} diff --git a/tok/hnsw/helper.go b/tok/hnsw/helper.go index f42431a9143..033ef40a99d 100644 --- a/tok/hnsw/helper.go +++ b/tok/hnsw/helper.go @@ -433,9 +433,10 @@ func (ph *persistentHNSW[T]) createEntryAndStartNodes( err := ph.getVecFromUid(entry, c, vec) if err != nil || len(*vec) == 0 { // The entry vector has been deleted. We have to create a new entry vector. - entry, err := ph.PickStartNode(ctx, c, vec) + entry, err := ph.calculateNewEntryVec(ctx, c, vec) if err != nil { - return 0, []*index.KeyValue{}, err + // No other node exists, go with the new node that has come + return create_edges(inUuid) } return create_edges(entry) } diff --git a/worker/draft.go b/worker/draft.go index 23b92099a57..dd4230d6031 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -549,12 +549,19 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr errCh <- process(m.Edges[start:end]) }(start, end) } + // Earlier we were returning after even if one thread had an error. We should wait for + // all the transactions to finish. We call txn.Update() when this function exists. This could cause + // a deadlock with runMutation. + var errs error for i := 0; i < numGo; i++ { if err := <-errCh; err != nil { - return err + if errs == nil { + errs = errors.New("Got error while running mutation") + } + errs = errors.Wrapf(err, errs.Error()) } } - return nil + return errs } func (n *node) applyCommitted(proposal *pb.Proposal, key uint64) error { @@ -839,7 +846,10 @@ func (n *node) commitOrAbort(pkey uint64, delta *pb.OracleDelta) error { if txn == nil { return } - txn.Update() + // If the transaction has failed, we dont need to update it. + if commit != 0 { + txn.Update() + } // We start with 20 ms, so that we end up waiting 5 mins by the end. // If there is any transient issue, it should get fixed within that timeframe. err := x.ExponentialRetry(int(x.Config.MaxRetries), diff --git a/worker/mutation.go b/worker/mutation.go index abae1515dde..71b3db0cbc0 100644 --- a/worker/mutation.go +++ b/worker/mutation.go @@ -642,9 +642,9 @@ func Timestamps(ctx context.Context, num *pb.Num) (*pb.AssignedIds, error) { return c.Timestamps(ctx, num) } -func fillTxnContext(tctx *api.TxnContext, startTs uint64) { +func fillTxnContext(tctx *api.TxnContext, startTs uint64, isErrored bool) { if txn := posting.Oracle().GetTxn(startTs); txn != nil { - txn.FillContext(tctx, groups().groupId()) + txn.FillContext(tctx, groups().groupId(), isErrored) } // We do not need to fill linread mechanism anymore, because transaction // start ts is sufficient to wait for, to achieve lin reads. @@ -950,7 +950,8 @@ func (w *grpcWorker) proposeAndWait(ctx context.Context, txnCtx *api.TxnContext, node := groups().Node err := node.proposeAndWait(ctx, &pb.Proposal{Mutations: m}) - fillTxnContext(txnCtx, m.StartTs) + // When we are filling txn context, we don't need to update latest delta if the transaction has failed. + fillTxnContext(txnCtx, m.StartTs, err != nil) return err }