Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Restore): Async restore operations. #5704

Merged
merged 18 commits into from
Jul 1, 2020
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions ee/acl/acl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1973,10 +1973,7 @@ func TestGuardianOnlyAccessForAdminEndpoints(t *testing.T) {
query: `
mutation {
restore(input: {location: "", backupId: "", encryptionKeyFile: ""}) {
response {
code
message
}
code
}
}`,
queryName: "restore",
Expand All @@ -1986,7 +1983,7 @@ func TestGuardianOnlyAccessForAdminEndpoints(t *testing.T) {
" manifests: The path \"\" does not exist or it is inaccessible.",
Locations: []x.Location{{Line: 3, Column: 8}},
}},
guardianData: `{"restore": null}`,
guardianData: `{"restore": {"code": "Failure"}}`,
},
{
name: "getGQLSchema has guardian auth",
Expand Down
12 changes: 8 additions & 4 deletions graphql/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,10 +300,11 @@ var (
resolve.GuardianAuthMW4Mutation,
}
adminQueryMWConfig = map[string]resolve.QueryMiddlewares{
"health": {resolve.IpWhitelistingMW4Query}, // dgraph handles Guardian auth for health
"state": {resolve.IpWhitelistingMW4Query}, // dgraph handles Guardian auth for state
"config": commonAdminQueryMWs,
"listBackups": commonAdminQueryMWs,
"health": {resolve.IpWhitelistingMW4Query}, // dgraph handles Guardian auth for health
"state": {resolve.IpWhitelistingMW4Query}, // dgraph handles Guardian auth for state
"config": commonAdminQueryMWs,
"listBackups": commonAdminQueryMWs,
"restoreStatus": commonAdminQueryMWs,
// not applying ip whitelisting to keep it in sync with /alter
"getGQLSchema": {resolve.GuardianAuthMW4Query},
// for queries and mutations related to User/Group, dgraph handles Guardian auth,
Expand Down Expand Up @@ -493,6 +494,9 @@ func newAdminResolverFactory() resolve.ResolverFactory {
WithQueryResolver("listBackups", func(q schema.Query) resolve.QueryResolver {
return resolve.QueryResolverFunc(resolveListBackups)
}).
WithQueryResolver("restoreStatus", func(q schema.Query) resolve.QueryResolver {
return resolve.QueryResolverFunc(resolveRestoreStatus)
}).
WithMutationResolver("updateGQLSchema", func(m schema.Mutation) resolve.MutationResolver {
return resolve.MutationResolverFunc(
func(ctx context.Context, m schema.Mutation) (*resolve.Resolved, bool) {
Expand Down
35 changes: 33 additions & 2 deletions graphql/admin/endpoints_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,21 @@ const adminTypes = `
}

type RestorePayload {
response: Response
"""
A short string indicating whether the restore operation was successfully scheduled.
The status of the operation can be queried using the restoreStatus endpoint.
"""
code: String

"""
Includes the error message if the operation failed.
"""
message: String

"""
The unique ID that can be used to query the status of the restore operation.
"""
restoreId: Int
}

input ListBackupsInput {
Expand Down Expand Up @@ -209,6 +223,18 @@ const adminTypes = `
"""
type: String
}

type RestoreStatus {
"""
The status of the restore operation. One of UNKNOWN, IN_PROGRESS, OK, or ERR.
"""
status: String!

"""
A list of error messages if the restore operation failed.
"""
errors: [String]
}

type LoginResponse {

Expand Down Expand Up @@ -465,4 +491,9 @@ const adminQueries = `
"""
Get the information about the backups at a given location.
"""
listBackups(input: ListBackupsInput!) : [Manifest]`
listBackups(input: ListBackupsInput!) : [Manifest]

"""
Get information about a restore operation.
"""
restoreStatus(restoreId: Int!) : RestoreStatus`
20 changes: 15 additions & 5 deletions graphql/admin/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ type restoreInput struct {
}

func resolveRestore(ctx context.Context, m schema.Mutation) (*resolve.Resolved, bool) {

input, err := getRestoreInput(m)
if err != nil {
return resolve.EmptyResult(m, err), false
Expand All @@ -62,15 +61,26 @@ func resolveRestore(ctx context.Context, m schema.Mutation) (*resolve.Resolved,
VaultSecretidFile: input.VaultSecretIDFile,
VaultPath: input.VaultPath,
VaultField: input.VaultField,
VaultFormat: input.VaultFormat,
VaultFormat: input.VaultFormat,
}
err = worker.ProcessRestoreRequest(context.Background(), &req)
restoreId, err := worker.ProcessRestoreRequest(context.Background(), &req)
if err != nil {
return resolve.EmptyResult(m, err), false
worker.DeleteRestoreId(restoreId)
return &resolve.Resolved{
Data: map[string]interface{}{m.Name(): map[string]interface{}{
"code": "Failure",
}},
Field: m,
Err: schema.GQLWrapLocationf(err, m.Location(), "resolving %s failed", m.Name()),
}, false
}

return &resolve.Resolved{
Data: map[string]interface{}{m.Name(): response("Success", "Restore completed.")},
Data: map[string]interface{}{m.Name(): map[string]interface{}{
"code": "Success",
"message": "Restore operation started.",
"restoreId": restoreId,
}},
Field: m,
}, true
}
Expand Down
82 changes: 82 additions & 0 deletions graphql/admin/restore_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2020 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package admin

import (
"context"
"encoding/json"

"github.com/dgraph-io/dgraph/graphql/resolve"
"github.com/dgraph-io/dgraph/graphql/schema"
"github.com/dgraph-io/dgraph/worker"
)

type restoreStatus struct {
Status string `json:"status,omitempty"`
Errors []string `json:"errors,omitempty"`
}

func unknownStatus(q schema.Query, err error) *resolve.Resolved {
return &resolve.Resolved{
Data: map[string]interface{}{q.Name(): map[string]interface{}{
"status": "UNKNOWN",
}},
Field: q,
Err: schema.GQLWrapLocationf(err, q.Location(), "resolving %s failed", q.Name()),
}
}

func resolveRestoreStatus(ctx context.Context, q schema.Query) *resolve.Resolved {
restoreId := int(q.ArgValue("restoreId").(int64))
status, err := worker.ProcessRestoreStatus(ctx, restoreId)
if err != nil {
return unknownStatus(q, err)
}
if status == nil {
return unknownStatus(q, err)
}
convertedStatus := convertStatus(status)

b, err := json.Marshal(convertedStatus)
if err != nil {
return unknownStatus(q, err)
}
result := make(map[string]interface{})
err = json.Unmarshal(b, &result)
if err != nil {
return unknownStatus(q, err)
}

return &resolve.Resolved{
Data: map[string]interface{}{q.Name(): result},
Field: q,
}
}

func convertStatus(status *worker.RestoreStatus) *restoreStatus {
if status == nil {
return nil
}
res := &restoreStatus{
Status: status.Status,
Errors: make([]string, len(status.Errors)),
}
for i, err := range status.Errors {
res.Errors[i] = err.Error()
}
return res
}
57 changes: 46 additions & 11 deletions systest/online-restore/online_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,13 @@ import (
"github.com/dgraph-io/dgraph/testutil"
)

func sendRestoreRequest(t *testing.T, backupId string, dg *dgo.Dgraph) {
func sendRestoreRequest(t *testing.T, backupId string) int {
restoreRequest := fmt.Sprintf(`mutation restore() {
restore(input: {location: "/data/backup", backupId: "%s",
encryptionKeyFile: "/data/keys/enc_key"}) {
response {
code
message
}
code
message
restoreId
}
}`, backupId)

Expand All @@ -58,8 +57,42 @@ func sendRestoreRequest(t *testing.T, backupId string, dg *dgo.Dgraph) {
resp, err := http.Post(adminUrl, "application/json", bytes.NewBuffer(b))
require.NoError(t, err)
buf, err := ioutil.ReadAll(resp.Body)
bufString := string(buf)
require.NoError(t, err)
require.Contains(t, bufString, "Success")
jsonMap := make(map[string]map[string]interface{})
require.NoError(t, json.Unmarshal([]byte(bufString), &jsonMap))
restoreId := int(jsonMap["data"]["restore"].(map[string]interface{})["restoreId"].(float64))
require.NotEqual(t, "", restoreId)
return restoreId
}

func waitForRestore(t *testing.T, restoreId int, dg *dgo.Dgraph) {
query := fmt.Sprintf(`query status() {
restoreStatus(restoreId: %d) {
status
errors
}
}`, restoreId)
adminUrl := "http://localhost:8180/admin"
params := testutil.GraphQLParams{
Query: query,
}
b, err := json.Marshal(params)
require.NoError(t, err)
require.Contains(t, string(buf), "Restore completed.")

for i := 0; i < 15; i++ {
resp, err := http.Post(adminUrl, "application/json", bytes.NewBuffer(b))
require.NoError(t, err)
buf, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)
sbuf := string(buf)
if strings.Contains(sbuf, "OK") {
return
}
time.Sleep(time.Second)
}
require.True(t, false, "restore operation did not complete after max number of retries")

// Wait for the client to exit draining mode. This is needed because the client might
// be connected to a follower and might be behind the leader in applying the restore.
Expand Down Expand Up @@ -179,7 +212,8 @@ func TestBasicRestore(t *testing.T) {
ctx := context.Background()
require.NoError(t, dg.Alter(ctx, &api.Operation{DropAll: true}))

sendRestoreRequest(t, "youthful_rhodes3", dg)
restoreId := sendRestoreRequest(t, "youthful_rhodes3")
waitForRestore(t, restoreId, dg)
runQueries(t, dg, false)
runMutations(t, dg)
}
Expand All @@ -194,12 +228,14 @@ func TestMoveTablets(t *testing.T) {
ctx := context.Background()
require.NoError(t, dg.Alter(ctx, &api.Operation{DropAll: true}))

sendRestoreRequest(t, "youthful_rhodes3", dg)
restoreId := sendRestoreRequest(t, "youthful_rhodes3")
waitForRestore(t, restoreId, dg)
runQueries(t, dg, false)

// Send another restore request with a different backup. This backup has some of the
// same predicates as the previous one but they are stored in different groups.
sendRestoreRequest(t, "blissful_hermann1", dg)
restoreId = sendRestoreRequest(t, "blissful_hermann1")
waitForRestore(t, restoreId, dg)

resp, err := dg.NewTxn().Query(context.Background(), `{
q(func: has(name), orderasc: name) {
Expand All @@ -226,10 +262,9 @@ func TestInvalidBackupId(t *testing.T) {
restoreRequest := `mutation restore() {
restore(input: {location: "/data/backup", backupId: "bad-backup-id",
encryptionKeyFile: "/data/keys/enc_key"}) {
response {
code
message
}
restoreId
}
}`

Expand Down
Loading