Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix(core): Fix deadlock in runMutation and error handling #9085

Merged
merged 7 commits into from
May 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func (txn *Txn) addConflictKey(conflictKey uint64) {
}

// FillContext updates the given transaction context with data from this transaction.
func (txn *Txn) FillContext(ctx *api.TxnContext, gid uint32) {
func (txn *Txn) FillContext(ctx *api.TxnContext, gid uint32, isErrored bool) {
txn.Lock()
ctx.StartTs = txn.StartTs

Expand All @@ -249,7 +249,12 @@ func (txn *Txn) FillContext(ctx *api.TxnContext, gid uint32) {
ctx.Keys = x.Unique(ctx.Keys)

txn.Unlock()
txn.Update()
// If the trasnaction has errored out, we don't need to update it, as these values will never be read.
// Sometimes, the transaction might have failed due to timeout. If we let this trasnactino update, there
// could be deadlock with the running transaction.
if !isErrored {
txn.Update()
}
txn.cache.fillPreds(ctx, gid)
}

Expand Down
43 changes: 43 additions & 0 deletions query/vector/vector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/dgraph-io/dgo/v230/protos/api"
"github.com/dgraph-io/dgraph/dgraphtest"
"github.com/dgraph-io/dgraph/x"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -432,6 +433,48 @@ func TestVectorsMutateFixedLengthWithDiffrentIndexes(t *testing.T) {
dropPredicate("vtest")
}

func TestVectorDeadlockwithTimeout(t *testing.T) {
pred := "vtest1"
dc = dgraphtest.NewComposeCluster()
var cleanup func()
client, cleanup, err := dc.Client()
x.Panic(err)
defer cleanup()

for i := 0; i < 5; i++ {
fmt.Println("Testing iteration: ", i)
ctx, cancel2 := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel2()
err = client.LoginIntoNamespace(ctx, dgraphtest.DefaultUser,
dgraphtest.DefaultPassword, x.GalaxyNamespace)
require.NoError(t, err)

err = client.Alter(context.Background(), &api.Operation{
DropAttr: pred,
})
dropPredicate(pred)
setSchema(fmt.Sprintf(vectorSchemaWithIndex, pred, "4", "euclidian"))
numVectors := 1000
vectorSize := 10

randomVectors, _ := generateRandomVectors(numVectors, vectorSize, pred)

txn := client.NewTxn()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer func() { _ = txn.Discard(ctx) }()
defer cancel()

_, err = txn.Mutate(ctx, &api.Mutation{
SetNquads: []byte(randomVectors),
CommitNow: true,
})
require.Error(t, err)

err = txn.Commit(ctx)
require.Contains(t, err.Error(), "Transaction has already been committed or discarded")
}
}

func TestVectorMutateDiffrentLengthWithDiffrentIndexes(t *testing.T) {
dropPredicate("vtest")

Expand Down
51 changes: 51 additions & 0 deletions systest/vector/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,3 +235,54 @@ func TestVectorBackupRestoreDropIndex(t *testing.T) {
}
}
}

func TestVectorBackupRestoreReIndexing(t *testing.T) {
conf := dgraphtest.NewClusterConfig().WithNumAlphas(1).WithNumZeros(1).WithReplicas(1).WithACL(time.Hour)
c, err := dgraphtest.NewLocalCluster(conf)
require.NoError(t, err)
defer func() { c.Cleanup(t.Failed()) }()
require.NoError(t, c.Start())

gc, cleanup, err := c.Client()
require.NoError(t, err)
defer cleanup()
require.NoError(t, gc.LoginIntoNamespace(context.Background(),
dgraphtest.DefaultUser, dgraphtest.DefaultPassword, x.GalaxyNamespace))

hc, err := c.HTTPClient()
require.NoError(t, err)
require.NoError(t, hc.LoginIntoNamespace(dgraphtest.DefaultUser,
dgraphtest.DefaultPassword, x.GalaxyNamespace))

require.NoError(t, gc.SetupSchema(testSchema))

numVectors := 1000
pred := "project_discription_v"
rdfs, vectors := dgraphtest.GenerateRandomVectors(0, numVectors, 10, pred)

mu := &api.Mutation{SetNquads: []byte(rdfs), CommitNow: true}
_, err = gc.Mutate(mu)
require.NoError(t, err)

t.Log("taking backup \n")
require.NoError(t, hc.Backup(c, false, dgraphtest.DefaultBackupDir))

rdfs2, vectors2 := dgraphtest.GenerateRandomVectors(numVectors, numVectors+300, 10, pred)

mu = &api.Mutation{SetNquads: []byte(rdfs2), CommitNow: true}
_, err = gc.Mutate(mu)
require.NoError(t, err)
t.Log("restoring backup \n")
require.NoError(t, hc.Restore(c, dgraphtest.DefaultBackupDir, "", 2, 1))
require.NoError(t, dgraphtest.WaitForRestore(c))

for i := 0; i < 5; i++ {
// drop index
require.NoError(t, gc.SetupSchema(testSchemaWithoutIndex))
// add index
require.NoError(t, gc.SetupSchema(testSchema))
}
vectors = append(vectors, vectors2...)
rdfs = rdfs + rdfs2
testVectorQuery(t, gc, vectors, rdfs, pred, numVectors)
}
5 changes: 3 additions & 2 deletions tok/hnsw/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,9 +433,10 @@ func (ph *persistentHNSW[T]) createEntryAndStartNodes(
err := ph.getVecFromUid(entry, c, vec)
if err != nil || len(*vec) == 0 {
// The entry vector has been deleted. We have to create a new entry vector.
entry, err := ph.PickStartNode(ctx, c, vec)
entry, err := ph.calculateNewEntryVec(ctx, c, vec)
if err != nil {
return 0, []*index.KeyValue{}, err
// No other node exists, go with the new node that has come
return create_edges(inUuid)
}
return create_edges(entry)
}
Expand Down
16 changes: 13 additions & 3 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,12 +549,19 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr
errCh <- process(m.Edges[start:end])
}(start, end)
}
// Earlier we were returning after even if one thread had an error. We should wait for
// all the transactions to finish. We call txn.Update() when this function exists. This could cause
// a deadlock with runMutation.
var errs error
for i := 0; i < numGo; i++ {
if err := <-errCh; err != nil {
return err
if errs == nil {
errs = errors.New("Got error while running mutation")
}
errs = errors.Wrapf(err, errs.Error())
}
}
return nil
return errs
}

func (n *node) applyCommitted(proposal *pb.Proposal, key uint64) error {
Expand Down Expand Up @@ -839,7 +846,10 @@ func (n *node) commitOrAbort(pkey uint64, delta *pb.OracleDelta) error {
if txn == nil {
return
}
txn.Update()
// If the transaction has failed, we dont need to update it.
if commit != 0 {
txn.Update()
}
// We start with 20 ms, so that we end up waiting 5 mins by the end.
// If there is any transient issue, it should get fixed within that timeframe.
err := x.ExponentialRetry(int(x.Config.MaxRetries),
Expand Down
7 changes: 4 additions & 3 deletions worker/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,9 +642,9 @@ func Timestamps(ctx context.Context, num *pb.Num) (*pb.AssignedIds, error) {
return c.Timestamps(ctx, num)
}

func fillTxnContext(tctx *api.TxnContext, startTs uint64) {
func fillTxnContext(tctx *api.TxnContext, startTs uint64, isErrored bool) {
if txn := posting.Oracle().GetTxn(startTs); txn != nil {
txn.FillContext(tctx, groups().groupId())
txn.FillContext(tctx, groups().groupId(), isErrored)
}
// We do not need to fill linread mechanism anymore, because transaction
// start ts is sufficient to wait for, to achieve lin reads.
Expand Down Expand Up @@ -950,7 +950,8 @@ func (w *grpcWorker) proposeAndWait(ctx context.Context, txnCtx *api.TxnContext,

node := groups().Node
err := node.proposeAndWait(ctx, &pb.Proposal{Mutations: m})
fillTxnContext(txnCtx, m.StartTs)
// When we are filling txn context, we don't need to update latest delta if the transaction has failed.
fillTxnContext(txnCtx, m.StartTs, err != nil)
return err
}

Expand Down