Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
cherami-cli: update seal-check to work on specific destination and DL…
Browse files Browse the repository at this point in the history
…Qs (#276)

* seal-check: add support to specify destination

* add option to query DLQs

* cr feedback

* make map

* fix output

* make map

* fix lint
  • Loading branch information
Kiran RG authored Aug 18, 2017
1 parent 0e6a2b9 commit d5b7d1f
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 36 deletions.
11 changes: 7 additions & 4 deletions cmd/tools/common/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,10 +514,9 @@ func SetAdminCommands(commands *[]cli.Command) {
Aliases: []string{"e"},
Usage: "show extent <extent_uuid>",
Flags: []cli.Flag{
cli.StringFlag{
Name: "showcg, sc",
Value: "false",
Usage: "show consumer group(false, true), default to false",
cli.BoolFlag{
Name: "showcg, cg",
Usage: "show consumer group",
},
},
Action: func(c *cli.Context) {
Expand Down Expand Up @@ -853,6 +852,10 @@ func SetAdminCommands(commands *[]cli.Command) {
Value: "/",
Usage: "only process destinations with prefix",
},
cli.BoolFlag{
Name: "dlq",
Usage: "query and check corresopnding DLQ destinations",
},
cli.BoolFlag{
Name: "seal",
Usage: "seal extents on replica that are not sealed",
Expand Down
177 changes: 145 additions & 32 deletions tools/common/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -922,13 +922,13 @@ func ReadDestination(c *cli.Context, serviceName string) {
}

path := c.Args().First()
showCG := string(c.String("showcg"))
showCG := c.Bool("showcg")
desc, err := readDestinationFromMetadata(mClient, path)
ExitIfError(err)
printDest(desc)

// only show cg info if showCG flag is true
if showCG == "true" {
if showCG {
// read all the consumer group for this destination, including deleted ones
destUUID := desc.GetDestinationUUID()
req := &shared.ListConsumerGroupRequest{
Expand Down Expand Up @@ -1711,6 +1711,7 @@ func SealConsistencyCheck(c *cli.Context, mClient mcli.Client) {
seal := c.Bool("seal")
verbose := c.Bool("verbose")
veryVerbose := c.Bool("veryverbose")
dlq := c.Bool("dlq")

storeClients := newStoreClientCache(mClient)
defer storeClients.close()
Expand All @@ -1722,28 +1723,13 @@ func SealConsistencyCheck(c *cli.Context, mClient mcli.Client) {
sealKeyTimestamp = math.MaxInt64 & timestampBitmask
)

reqListDest := &shared.ListDestinationsRequest{
Prefix: common.StringPtr(prefix),
Limit: common.Int64Ptr(DefaultPageSize),
}

iterate_listdestinations_pages:
for {
if veryVerbose {
fmt.Printf("querying metadata: ListDestinations(prefix=\"%s\")\n", prefix)
}
checkDests := func(destUUIDs []string) {

respListDest, err := mClient.ListDestinations(reqListDest)

if err != nil {
fmt.Fprintf(os.Stderr, "ListDestinations error: %v\n", err)
break iterate_listdestinations_pages
}

for _, desc := range respListDest.GetDestinations() {

destUUID := desc.GetDestinationUUID()
for _, destUUID := range destUUIDs {

// find all sealed extents for the destination that are "local" -- ie, we skip
// extents that belong to a multi-zone destination, but are not in the "origin"
// zone, because they could potentially be still being replicated.
listExtentsStats := &shared.ListExtentsStatsRequest{
DestinationUUID: common.StringPtr(string(destUUID)),
Status: shared.ExtentStatusPtr(shared.ExtentStatus_SEALED),
Expand All @@ -1754,14 +1740,14 @@ iterate_listdestinations_pages:
iterate_listextents_pages:
for {
if veryVerbose {
fmt.Printf("querying metadata: ListExtentsStats(dest=%v status=%v LocalOnly=%v Limit=%v)",
fmt.Printf("querying metadata: ListExtentsStats(dest=%v status=%v LocalOnly=%v Limit=%v)\n",
destUUID, shared.ExtentStatus_SEALED, true, DefaultPageSize)
}

listExtentStatsResult, err1 := mClient.ListExtentsStats(listExtentsStats)

if err1 != nil {
fmt.Fprintf(os.Stderr, "ListExtentsStats(dest=%v) error: %v\n", destUUID, err)
fmt.Fprintf(os.Stderr, "ListExtentsStats(dest=%v) error: %v\n", destUUID, err1)
break iterate_listextents_pages
}

Expand All @@ -1777,7 +1763,7 @@ iterate_listdestinations_pages:
storeClient, err1 := storeClients.get(storeUUID)

if err1 != nil {
fmt.Fprintf(os.Stderr, "error getting store client (store=%v): %v\n", storeUUID, err)
fmt.Fprintf(os.Stderr, "error getting store client (store=%v): %v\n", storeUUID, err1)
continue iterate_stores
}

Expand Down Expand Up @@ -1812,13 +1798,12 @@ iterate_listdestinations_pages:
IsEmpty: extentNotFound || resp.GetAddress() == store.ADDR_BEGIN,
}

outputStr, _ := json.Marshal(output)
fmt.Fprintln(os.Stdout, string(outputStr))

// now seal the extent
if seal {

fmt.Printf("sealing extent on replica: %v %v %v", destUUID, extentUUID, storeUUID)
if verbose {
fmt.Printf("sealing extent on replica: %v %v %v\n", destUUID, extentUUID, storeUUID)
}

req := store.NewSealExtentRequest()
req.ExtentUUID = common.StringPtr(string(extentUUID))
Expand All @@ -1832,8 +1817,13 @@ iterate_listdestinations_pages:
destUUID, extentUUID, storeUUID, err1)
continue iterate_stores
}

output.IsSealed = true
}

outputStr, _ := json.Marshal(output)
fmt.Fprintln(os.Stdout, string(outputStr))

default:

if verbose {
Expand Down Expand Up @@ -1862,12 +1852,135 @@ iterate_listdestinations_pages:
listExtentsStats.PageToken = listExtentStatsResult.GetNextPageToken()
}
}
}

var getDlqs = func(destUUID string) (dlqs []string) {

req := &shared.ListConsumerGroupRequest{
DestinationPath: common.StringPtr(destUUID),
Limit: common.Int64Ptr(DefaultPageSize),
}

for {
resp, err := mClient.ListConsumerGroups(req)
ExitIfError(err)

for _, cg := range resp.GetConsumerGroups() {

if cg.IsSetDeadLetterQueueDestinationUUID() {
dlqs = append(dlqs, cg.GetDeadLetterQueueDestinationUUID())
}
}

if len(resp.GetNextPageToken()) == 0 {
return
}

req.PageToken = resp.GetNextPageToken()
}
}

var getAllDlqs = func() (dlqs map[string][]string) {

dlqs = make(map[string][]string)

if veryVerbose {
fmt.Printf("querying metadata: ListAllConsumerGroups()\n")
}

req := &shared.ListConsumerGroupRequest{
Limit: common.Int64Ptr(DefaultPageSize),
}

var nDlqs int

for {
resp, err1 := mClient.ListAllConsumerGroups(req)
ExitIfError(err1)

for _, cg := range resp.GetConsumerGroups() {

if cg.IsSetDeadLetterQueueDestinationUUID() {

destUUID := cg.GetDestinationUUID()
dlqs[destUUID] = append(dlqs[destUUID], cg.GetDeadLetterQueueDestinationUUID())
nDlqs++
}
}

if len(resp.GetNextPageToken()) == 0 {

if veryVerbose {
fmt.Printf("found %d DLQ destinations for %d destinations\n", nDlqs, len(dlqs))
}

return
}

req.PageToken = resp.NextPageToken
}
}

if len(c.Args()) > 0 {

desc, err := mClient.ReadDestination(&shared.ReadDestinationRequest{
Path: common.StringPtr(c.Args()[0]),
})

ExitIfError(err)

var destUUIDs []string
destUUIDs = append(destUUIDs, desc.GetDestinationUUID())

if dlq { // check DLQ destinations
destUUIDs = append(destUUIDs, getDlqs(desc.GetDestinationUUID())...)
}

checkDests(destUUIDs)

} else {

var dlqs map[string][]string

if dlq { // check DLQ destinations
dlqs = getAllDlqs()
}

if len(respListDest.GetNextPageToken()) == 0 {
break iterate_listdestinations_pages
reqListDest := &shared.ListDestinationsRequest{
Prefix: common.StringPtr(prefix),
Limit: common.Int64Ptr(DefaultPageSize),
}

reqListDest.PageToken = respListDest.GetNextPageToken()
iterate_listdestinations_pages:
for {
if veryVerbose {
fmt.Printf("querying metadata: ListDestinations(prefix=\"%s\")\n", prefix)
}

respListDest, err := mClient.ListDestinations(reqListDest)

if err != nil {
fmt.Fprintf(os.Stderr, "ListDestinations error: %v\n", err)
break iterate_listdestinations_pages
}

for _, desc := range respListDest.GetDestinations() {

destUUIDs := []string{desc.GetDestinationUUID()}

if dlq { // check DLQ destinations
destUUIDs = append(destUUIDs, dlqs[destUUIDs[0]]...)
}

checkDests(destUUIDs)
}

if len(respListDest.GetNextPageToken()) == 0 {
break iterate_listdestinations_pages
}

reqListDest.PageToken = respListDest.GetNextPageToken()
}
}
}

Expand Down

0 comments on commit d5b7d1f

Please sign in to comment.