From d5b7d1f85064f81196e53e7f308bff37a9c49ccd Mon Sep 17 00:00:00 2001 From: Kiran RG Date: Fri, 18 Aug 2017 10:55:16 -0700 Subject: [PATCH] cherami-cli: update seal-check to work on specific destination and DLQs (#276) * seal-check: add support to specify destination * add option to query DLQs * cr feedback * make map * fix output * make map * fix lint --- cmd/tools/common/lib.go | 11 ++- tools/common/lib.go | 177 ++++++++++++++++++++++++++++++++-------- 2 files changed, 152 insertions(+), 36 deletions(-) diff --git a/cmd/tools/common/lib.go b/cmd/tools/common/lib.go index ffa86050..77d6f7d0 100644 --- a/cmd/tools/common/lib.go +++ b/cmd/tools/common/lib.go @@ -514,10 +514,9 @@ func SetAdminCommands(commands *[]cli.Command) { Aliases: []string{"e"}, Usage: "show extent ", Flags: []cli.Flag{ - cli.StringFlag{ - Name: "showcg, sc", - Value: "false", - Usage: "show consumer group(false, true), default to false", + cli.BoolFlag{ + Name: "showcg, cg", + Usage: "show consumer group", }, }, Action: func(c *cli.Context) { @@ -853,6 +852,10 @@ func SetAdminCommands(commands *[]cli.Command) { Value: "/", Usage: "only process destinations with prefix", }, + cli.BoolFlag{ + Name: "dlq", + Usage: "query and check corresopnding DLQ destinations", + }, cli.BoolFlag{ Name: "seal", Usage: "seal extents on replica that are not sealed", diff --git a/tools/common/lib.go b/tools/common/lib.go index 7929192a..f47c6fa5 100644 --- a/tools/common/lib.go +++ b/tools/common/lib.go @@ -922,13 +922,13 @@ func ReadDestination(c *cli.Context, serviceName string) { } path := c.Args().First() - showCG := string(c.String("showcg")) + showCG := c.Bool("showcg") desc, err := readDestinationFromMetadata(mClient, path) ExitIfError(err) printDest(desc) // only show cg info if showCG flag is true - if showCG == "true" { + if showCG { // read all the consumer group for this destination, including deleted ones destUUID := desc.GetDestinationUUID() req := &shared.ListConsumerGroupRequest{ @@ -1711,6 +1711,7 @@ func SealConsistencyCheck(c *cli.Context, mClient mcli.Client) { seal := c.Bool("seal") verbose := c.Bool("verbose") veryVerbose := c.Bool("veryverbose") + dlq := c.Bool("dlq") storeClients := newStoreClientCache(mClient) defer storeClients.close() @@ -1722,28 +1723,13 @@ func SealConsistencyCheck(c *cli.Context, mClient mcli.Client) { sealKeyTimestamp = math.MaxInt64 & timestampBitmask ) - reqListDest := &shared.ListDestinationsRequest{ - Prefix: common.StringPtr(prefix), - Limit: common.Int64Ptr(DefaultPageSize), - } - -iterate_listdestinations_pages: - for { - if veryVerbose { - fmt.Printf("querying metadata: ListDestinations(prefix=\"%s\")\n", prefix) - } + checkDests := func(destUUIDs []string) { - respListDest, err := mClient.ListDestinations(reqListDest) - - if err != nil { - fmt.Fprintf(os.Stderr, "ListDestinations error: %v\n", err) - break iterate_listdestinations_pages - } - - for _, desc := range respListDest.GetDestinations() { - - destUUID := desc.GetDestinationUUID() + for _, destUUID := range destUUIDs { + // find all sealed extents for the destination that are "local" -- ie, we skip + // extents that belong to a multi-zone destination, but are not in the "origin" + // zone, because they could potentially be still being replicated. listExtentsStats := &shared.ListExtentsStatsRequest{ DestinationUUID: common.StringPtr(string(destUUID)), Status: shared.ExtentStatusPtr(shared.ExtentStatus_SEALED), @@ -1754,14 +1740,14 @@ iterate_listdestinations_pages: iterate_listextents_pages: for { if veryVerbose { - fmt.Printf("querying metadata: ListExtentsStats(dest=%v status=%v LocalOnly=%v Limit=%v)", + fmt.Printf("querying metadata: ListExtentsStats(dest=%v status=%v LocalOnly=%v Limit=%v)\n", destUUID, shared.ExtentStatus_SEALED, true, DefaultPageSize) } listExtentStatsResult, err1 := mClient.ListExtentsStats(listExtentsStats) if err1 != nil { - fmt.Fprintf(os.Stderr, "ListExtentsStats(dest=%v) error: %v\n", destUUID, err) + fmt.Fprintf(os.Stderr, "ListExtentsStats(dest=%v) error: %v\n", destUUID, err1) break iterate_listextents_pages } @@ -1777,7 +1763,7 @@ iterate_listdestinations_pages: storeClient, err1 := storeClients.get(storeUUID) if err1 != nil { - fmt.Fprintf(os.Stderr, "error getting store client (store=%v): %v\n", storeUUID, err) + fmt.Fprintf(os.Stderr, "error getting store client (store=%v): %v\n", storeUUID, err1) continue iterate_stores } @@ -1812,13 +1798,12 @@ iterate_listdestinations_pages: IsEmpty: extentNotFound || resp.GetAddress() == store.ADDR_BEGIN, } - outputStr, _ := json.Marshal(output) - fmt.Fprintln(os.Stdout, string(outputStr)) - // now seal the extent if seal { - fmt.Printf("sealing extent on replica: %v %v %v", destUUID, extentUUID, storeUUID) + if verbose { + fmt.Printf("sealing extent on replica: %v %v %v\n", destUUID, extentUUID, storeUUID) + } req := store.NewSealExtentRequest() req.ExtentUUID = common.StringPtr(string(extentUUID)) @@ -1832,8 +1817,13 @@ iterate_listdestinations_pages: destUUID, extentUUID, storeUUID, err1) continue iterate_stores } + + output.IsSealed = true } + outputStr, _ := json.Marshal(output) + fmt.Fprintln(os.Stdout, string(outputStr)) + default: if verbose { @@ -1862,12 +1852,135 @@ iterate_listdestinations_pages: listExtentsStats.PageToken = listExtentStatsResult.GetNextPageToken() } } + } + + var getDlqs = func(destUUID string) (dlqs []string) { + + req := &shared.ListConsumerGroupRequest{ + DestinationPath: common.StringPtr(destUUID), + Limit: common.Int64Ptr(DefaultPageSize), + } + + for { + resp, err := mClient.ListConsumerGroups(req) + ExitIfError(err) + + for _, cg := range resp.GetConsumerGroups() { + + if cg.IsSetDeadLetterQueueDestinationUUID() { + dlqs = append(dlqs, cg.GetDeadLetterQueueDestinationUUID()) + } + } + + if len(resp.GetNextPageToken()) == 0 { + return + } + + req.PageToken = resp.GetNextPageToken() + } + } + + var getAllDlqs = func() (dlqs map[string][]string) { + + dlqs = make(map[string][]string) + + if veryVerbose { + fmt.Printf("querying metadata: ListAllConsumerGroups()\n") + } + + req := &shared.ListConsumerGroupRequest{ + Limit: common.Int64Ptr(DefaultPageSize), + } + + var nDlqs int + + for { + resp, err1 := mClient.ListAllConsumerGroups(req) + ExitIfError(err1) + + for _, cg := range resp.GetConsumerGroups() { + + if cg.IsSetDeadLetterQueueDestinationUUID() { + + destUUID := cg.GetDestinationUUID() + dlqs[destUUID] = append(dlqs[destUUID], cg.GetDeadLetterQueueDestinationUUID()) + nDlqs++ + } + } + + if len(resp.GetNextPageToken()) == 0 { + + if veryVerbose { + fmt.Printf("found %d DLQ destinations for %d destinations\n", nDlqs, len(dlqs)) + } + + return + } + + req.PageToken = resp.NextPageToken + } + } + + if len(c.Args()) > 0 { + + desc, err := mClient.ReadDestination(&shared.ReadDestinationRequest{ + Path: common.StringPtr(c.Args()[0]), + }) + + ExitIfError(err) + + var destUUIDs []string + destUUIDs = append(destUUIDs, desc.GetDestinationUUID()) + + if dlq { // check DLQ destinations + destUUIDs = append(destUUIDs, getDlqs(desc.GetDestinationUUID())...) + } + + checkDests(destUUIDs) + + } else { + + var dlqs map[string][]string + + if dlq { // check DLQ destinations + dlqs = getAllDlqs() + } - if len(respListDest.GetNextPageToken()) == 0 { - break iterate_listdestinations_pages + reqListDest := &shared.ListDestinationsRequest{ + Prefix: common.StringPtr(prefix), + Limit: common.Int64Ptr(DefaultPageSize), } - reqListDest.PageToken = respListDest.GetNextPageToken() + iterate_listdestinations_pages: + for { + if veryVerbose { + fmt.Printf("querying metadata: ListDestinations(prefix=\"%s\")\n", prefix) + } + + respListDest, err := mClient.ListDestinations(reqListDest) + + if err != nil { + fmt.Fprintf(os.Stderr, "ListDestinations error: %v\n", err) + break iterate_listdestinations_pages + } + + for _, desc := range respListDest.GetDestinations() { + + destUUIDs := []string{desc.GetDestinationUUID()} + + if dlq { // check DLQ destinations + destUUIDs = append(destUUIDs, dlqs[destUUIDs[0]]...) + } + + checkDests(destUUIDs) + } + + if len(respListDest.GetNextPageToken()) == 0 { + break iterate_listdestinations_pages + } + + reqListDest.PageToken = respListDest.GetNextPageToken() + } } }