Skip to content

Commit

Permalink
dgraphtest: add workaround for multiple groot issue in export-import
Browse files Browse the repository at this point in the history
When we perform an export and import into the cluster, this ends
up creating two groot users in the cluster. This PR implements
a workaround so that two groot users are not created to begin with.
With this workaround, now we can also run upgrade tests using
ExportImport upgrade strategy.
  • Loading branch information
mangalaman93 committed Jul 5, 2023
1 parent 3de01e4 commit dcd544f
Show file tree
Hide file tree
Showing 4 changed files with 243 additions and 25 deletions.
8 changes: 8 additions & 0 deletions dgraphtest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ var AllUpgradeCombos = []UpgradeCombo{
// ea1cb5f35650c54419483d6129460b4b25cbb418
{"v21.03.1", "v23.0.0", BackupRestore},
// b17395d33801bf36235de378d8560f61f4457d2b
{"v21.03.2", "v23.0.0", ExportImport},
{"v21.03.2", "v23.0.0", BackupRestore},
// c36206a5c7062efef797f62bd797625a2d0d2a27
{"v22.0.0", "v23.0.0", BackupRestore},
// 7fb5291a984af45d4639d370c290939152c79612
{"v22.0.1", "v23.0.0", BackupRestore},
// 7b18a6bec95731201d94142ec86a4fde035fb7e0
{"v22.0.2", "v23.0.0", ExportImport},
{"v22.0.2", "v23.0.0", BackupRestore},
// CLOUD VERSIONS
// v21.03.0-48-ge3d3e6290
Expand All @@ -66,7 +68,13 @@ var AllUpgradeCombos = []UpgradeCombo{
// v21.03.0-82-g83c9cbedc
{"83c9cbedc", "v23.0.0", BackupRestore},
// v21.03.0-84-gc5862ae2a
{"c5862ae2a", "v23.0.0", ExportImport},
{"c5862ae2a", "v23.0.0", BackupRestore},
// v20.11
{"v20.11.0", "v23.0.0", ExportImport},
{"v20.11.1", "v23.0.0", ExportImport},
{"v20.11.2", "v23.0.0", ExportImport},
{"v20.11.3", "v23.0.0", ExportImport},
}

type ClusterConfig struct {
Expand Down
224 changes: 204 additions & 20 deletions dgraphtest/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ package dgraphtest

import (
"archive/tar"
"bufio"
"bytes"
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io"
"log"
Expand All @@ -33,12 +36,19 @@ import (
"github.com/dgraph-io/dgraph/x"
)

const (
groupOneRdfFile = "g01.rdf"
groupOneRdfGzFile = "g01.rdf.gz"
)

// LiveOpts are options that are used for running live loader.
type LiveOpts struct {
RdfFiles []string
SchemaFiles []string
GqlSchemaFiles []string
}

// readGzFile reads the given file from disk completely and returns the content.
func readGzFile(sf string) ([]byte, error) {
fd, err := os.Open(sf)
if err != nil {
Expand All @@ -52,7 +62,7 @@ func readGzFile(sf string) ([]byte, error) {

gr, err := gzip.NewReader(fd)
if err != nil {
return nil, errors.Wrapf(err, "error creating a gzip reader for file [%v]", sf)
return nil, errors.Wrapf(err, "error creating gzip reader for file [%v]", sf)
}
defer func() {
if err := gr.Close(); err != nil {
Expand All @@ -62,20 +72,27 @@ func readGzFile(sf string) ([]byte, error) {

data, err := io.ReadAll(gr)
if err != nil {
return nil, errors.Wrapf(err, "error reading file content [%v]", sf)
return nil, errors.Wrapf(err, "error reading file [%v]", sf)
}
return data, nil
}

// setDQLSchema updates the DQL schema in the dgraph cluster to the
// schema present in all the files in the export as passed in args.
func setDQLSchema(c *LocalCluster, files []string) error {
gc, cleanup, err := c.Client()
if err != nil {
return errors.WithStack(err)
return errors.Wrap(err, "error creating grpc client")
}
defer cleanup()
if err := gc.LoginIntoNamespace(context.Background(),
DefaultUser, DefaultPassword, x.GalaxyNamespace); err != nil {
return errors.WithStack(err)

if c.conf.acl {
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
defer cancel()
err := gc.LoginIntoNamespace(ctx, DefaultUser, DefaultPassword, x.GalaxyNamespace)
if err != nil {
return errors.Wrap(err, "error login to default namespace")
}
}

for _, sf := range files {
Expand All @@ -84,43 +101,64 @@ func setDQLSchema(c *LocalCluster, files []string) error {
return err
}
if err := gc.SetupSchema(string(data)); err != nil {
return errors.WithStack(err)
return errors.Wrapf(err, "error setting up DQL schema [%v]", string(data))
}
}
return nil
}

// setGraphQLSchema updates the graphql schema in the dgraph cluster with
// the schema present in all the files in the export as passed in args.
func setGraphQLSchema(c *LocalCluster, files []string) error {
hc, err := c.HTTPClient()
if err != nil {
return errors.WithStack(err)
}
if err := hc.LoginIntoNamespace(DefaultUser, DefaultPassword, x.GalaxyNamespace); err != nil {
return errors.WithStack(err)
return errors.Wrap(err, "error creating HTTP client")
}

for _, sf := range files {
data, err := readGzFile(sf)
if err != nil {
return err
}
// if there is no GraphQL schema in the cluster,
// the GQL file only has empty [].
// if there is no GraphQL schema in the cluster, the GQL
// file only has empty []. we can skip these files.
if len(data) < 10 {
continue
}
if err := hc.UpdateGQLSchema(string(data)); err != nil {
return errors.WithStack(err)

var nsToSch []struct {
Namespace uint64 `json:"namespace"`
Schema string `json:"schema"`
}
if err := json.Unmarshal(data, &nsToSch); err != nil {
return errors.Wrapf(err, "error parsing gql schema file content [%v]", string(data))
}
for _, nss := range nsToSch {
if nss.Schema == "" {
continue
}

if c.conf.acl {
err := hc.LoginIntoNamespace(DefaultUser, DefaultPassword, nss.Namespace)
if err != nil {
return errors.Wrap(err, "error login into default namespace")
}
}
if err := hc.UpdateGQLSchema(nss.Schema); err != nil {
return errors.Wrapf(err, "error updating GQL schema to: [%v]", nss.Schema)
}
}
}
return nil
}

// LiveLoad runs the live loader with provided options
func (c *LocalCluster) LiveLoad(opts LiveOpts) error {
log.Printf("[INFO] updating DQL schema from [%v]", strings.Join(opts.SchemaFiles, " "))
if err := setDQLSchema(c, opts.SchemaFiles); err != nil {
return err
}
log.Printf("[INFO] updating GraphQL schema from [%v]", strings.Join(opts.GqlSchemaFiles, " "))
if err := setGraphQLSchema(c, opts.GqlSchemaFiles); err != nil {
return err
}
Expand All @@ -129,13 +167,13 @@ func (c *LocalCluster) LiveLoad(opts LiveOpts) error {
for i, aa := range c.alphas {
url, err := aa.alphaURL(c)
if err != nil {
return errors.Wrapf(err, "error finding URL to %vth alpha", i)
return errors.Wrapf(err, "error finding URL for alpha #%v", i)
}
alphaURLs = append(alphaURLs, url)
}
zeroURL, err := c.zeros[0].zeroURL(c)
if err != nil {
return errors.Wrap(err, "error finding URL to 0th zero")
return errors.Wrap(err, "error finding URL of first zero")
}

args := []string{
Expand Down Expand Up @@ -163,6 +201,133 @@ func (c *LocalCluster) LiveLoad(opts LiveOpts) error {
return nil
}

// findGrootAndGuardians returns the UIDs of groot user and guardians group
func findGrootAndGuardians(c *LocalCluster) (string, string, error) {
gc, cleanup, err := c.Client()
if err != nil {
return "", "", errors.Wrapf(err, "error creating grpc client")
}
defer cleanup()

if c.conf.acl {
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
defer cancel()
err = gc.LoginIntoNamespace(ctx, DefaultUser, DefaultPassword, x.GalaxyNamespace)
if err != nil {
return "", "", errors.Wrapf(err, "error logging in as groot")
}
}

query := `{
q(func: eq(dgraph.xid, "groot")){
uid
dgraph.user.group @filter(eq(dgraph.xid, "guardians")) {
uid
}
}
}`
resp, err := gc.Query(query)
if err != nil {
return "", "", errors.Wrapf(err, "error querying groot & guardians UIDs")
}
var result struct {
Q []struct {
Uid string
Groups []struct {
Uid string
} `json:"dgraph.user.group"`
}
}
if err := json.Unmarshal(resp.Json, &result); err != nil {
return "", "", errors.Wrapf(err, "error unmarshalling resp: [%v]", string(resp.Json))
}
if len(result.Q) != 1 || result.Q[0].Uid == "" ||
len(result.Q[0].Groups) != 1 || result.Q[0].Groups[0].Uid == "" {
return "", "", errors.Errorf("unable to find groot & guardians, resp: [%+v]", resp)
}
return result.Q[0].Uid, result.Q[0].Groups[0].Uid, nil
}

// modifyACLEntries replaces groot's and guardians' UIDs
// in the exported files to the UIDs in this dgraph cluster.
func modifyACLEntries(c *LocalCluster, r io.Reader) (io.Reader, error) {
grootUIDNew, guardiansUIDNew, err := findGrootAndGuardians(c)
if err != nil {
return nil, err
}
grootUIDNew = fmt.Sprintf("<%v>", grootUIDNew)
guardiansUIDNew = fmt.Sprintf("<%v>", guardiansUIDNew)

gr, err := gzip.NewReader(r)
if err != nil {
return nil, errors.Wrap(err, "error creating gzip reader for RDF file")
}
defer func() {
if err := gr.Close(); err != nil {
log.Printf("[WARNING] error closing gzip reader for RDF file: %v", err)
}
}()
data, err := io.ReadAll(gr)
if err != nil {
return nil, errors.Wrap(err, "error reading file content for RDF file")
}

// We need to find UIDs of the guardians group and groot node and replace them with
// the right UID that the new cluster has. Because,all of the reserved predicates are
// assigned to group 1 and we only have one rdf.gz file per group in the export, we
// can just do it one io.Reader (flie) at a time as we get in this function. The way
// we find the UIDs is through searching for following byte sequences.
// <dgraph.xid> "guardians"
// <dgraph.xid> "groot"
findUID := func(sub []byte) ([]byte, error) {
i := bytes.Index(data, sub)
if i == -1 {
return nil, errors.Errorf("unable to find data in RDF file: [%v]", string(sub))
}
var start, end int
for i = i - 1; i >= 0; i-- {
if data[i] == byte('>') {
end = i
} else if data[i] == byte('<') {
start = i
break
}
}
return data[start : end+1], nil
}
guardiansUID, err := findUID([]byte(`<dgraph.xid> "guardians"`))
if err != nil {
return nil, err
}
grootUID, err := findUID([]byte(`<dgraph.xid> "groot"`))
if err != nil {
return nil, err
}

// we should only replace if RDF line is for a reserved type
lines := bytes.Split(data, []byte{'\n'})
for i, line := range lines {
if bytes.Contains(line, []byte("<dgraph.")) {
line = bytes.ReplaceAll(line, guardiansUID, []byte(guardiansUIDNew))
line = bytes.ReplaceAll(line, grootUID, []byte(grootUIDNew))
lines[i] = line
}
}
data = bytes.Join(lines, []byte{'\n'})

buf := bytes.NewBuffer(nil)
zw := gzip.NewWriter(buf)
defer func() {
if err := zw.Close(); err != nil {
log.Printf("[WARNING] error closing zip writer: %v", err)
}
}()
if _, err := zw.Write(data); err != nil {
return nil, errors.Wrap(err, "error writing modified rdf file to zip")
}
return bufio.NewReader(buf), nil
}

// LiveLoadFromExport runs the live loader from the output of dgraph export
// The exportDir is the directory present inside the container. This function
// first copies all the files on the host and then runs the live loader.
Expand All @@ -173,8 +338,13 @@ func (c *LocalCluster) LiveLoadFromExport(exportDir string) error {
if err != nil {
return errors.Wrap(err, "error creating temp dir for exported data")
}
defer func() {
if err := os.RemoveAll(exportDirHost); err != nil {
log.Printf("[WARNING] error removing export copy on the host: %v", err)
}
}()

// First, we need to copy the exported data from the container to host
// we need to copy the exported data from the container to host
ts, _, err := c.dcli.CopyFromContainer(ctx, c.alphas[0].cid(), exportDir)
if err != nil {
return errors.Wrapf(err, "error copying export dir from container [%v]", c.alphas[0].cname())
Expand Down Expand Up @@ -212,7 +382,7 @@ func (c *LocalCluster) LiveLoadFromExport(exportDir string) error {
return errors.Errorf("found unexpected file in export: %v", fileName)
}

fd, err := os.Create(hostFile) //nolint: G305
fd, err := os.Create(hostFile)
if err != nil {
return errors.Wrapf(err, "error creating file [%v]", hostFile)
}
Expand All @@ -221,7 +391,21 @@ func (c *LocalCluster) LiveLoadFromExport(exportDir string) error {
log.Printf("[WARNING] error closing file while docker cp: [%+v]", header)
}
}()
if _, err := io.Copy(fd, tr); err != nil {

// Because we export UIDs in the export, and groot and guardians nodes already exist
// in the graph in any Dgraph cluster, we need to fix the exported data to use the UIDs
// of the new cluster for both groot and guardians nodes. These UIDs will only be used
// in the export file of group 1 because all reserved predicates are always in group 1.
var fromReader io.Reader = tr
if fileName == groupOneRdfGzFile {
r, err := modifyACLEntries(c, tr)
if err != nil {
return err
}
fromReader = r
}

if _, err := io.Copy(fd, fromReader); err != nil {
return errors.Wrapf(err, "error writing to [%v] from: [%+v]", fd.Name(), header)
}
}
Expand Down
Loading

0 comments on commit dcd544f

Please sign in to comment.