Skip to content

Commit

Permalink
routes: remove ok/error_message (#2128)
Browse files Browse the repository at this point in the history
We have a mix of ok: false & returning errors,
now that customers are using API this is negatively impacting their UX
(not to mention our own struggles)

Consolidate on consistently returning HTTP errors,
rather than 200 OK with ok: false
  • Loading branch information
serprex authored Oct 9, 2024
1 parent fab4786 commit 8ae44d3
Show file tree
Hide file tree
Showing 12 changed files with 105 additions and 261 deletions.
50 changes: 15 additions & 35 deletions flow/cmd/custom_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cmd

import (
"context"
"errors"
"fmt"
"log/slog"

Expand All @@ -17,69 +18,52 @@ const (
func (h *FlowRequestHandler) CustomSyncFlow(
ctx context.Context, req *protos.CreateCustomSyncRequest,
) (*protos.CreateCustomSyncResponse, error) {
errResponse := &protos.CreateCustomSyncResponse{
FlowJobName: req.FlowJobName,
NumberOfSyncs: 0,
ErrorMessage: "error while processing request",
Ok: false,
}

// ---- REQUEST VALIDATION ----
if req.FlowJobName == "" {
errResponse.ErrorMessage = "Mirror name cannot be empty."
return errResponse, nil
return nil, errors.New("mirror name cannot be empty")
}

if req.NumberOfSyncs <= 0 || req.NumberOfSyncs > syncRequestLimit {
slog.Error("Invalid sync number request",
slog.Any("requested_number_of_syncs", req.NumberOfSyncs))
errResponse.ErrorMessage = fmt.Sprintf("Sync number request must be between 1 and %d (inclusive). Requested number: %d",
return nil, fmt.Errorf("sync number request must be between 1 and %d (inclusive). Requested number: %d",
syncRequestLimit, req.NumberOfSyncs)
return errResponse, nil
}

mirrorExists, err := h.CheckIfMirrorNameExists(ctx, req.FlowJobName)
if err != nil {
slog.Error("Server error: unable to check if mirror exists", slog.Any("error", err))
errResponse.ErrorMessage = "Server error: unable to check if mirror " + req.FlowJobName + " exists."
return errResponse, nil
return nil, fmt.Errorf("server error: unable to check if mirror %s exists", req.FlowJobName)
}
if !mirrorExists {
slog.Error("Mirror does not exist", slog.Any("mirror_name", req.FlowJobName))
errResponse.ErrorMessage = fmt.Sprintf("Mirror %s does not exist", req.FlowJobName)
return errResponse, nil
return nil, fmt.Errorf("mirror %s does not exist", req.FlowJobName)
}

mirrorStatusResponse, _ := h.MirrorStatus(ctx, &protos.MirrorStatusRequest{
mirrorStatusResponse, err := h.MirrorStatus(ctx, &protos.MirrorStatusRequest{
FlowJobName: req.FlowJobName,
})
if mirrorStatusResponse.ErrorMessage != "" {
if err != nil {
slog.Error("Server error: unable to check the status of mirror",
slog.Any("mirror", req.FlowJobName),
slog.Any("error", mirrorStatusResponse.ErrorMessage))
errResponse.ErrorMessage = fmt.Sprintf("Server error: unable to check the status of mirror %s: %s",
req.FlowJobName, mirrorStatusResponse.ErrorMessage)
return errResponse, nil
slog.String("mirror", req.FlowJobName),
slog.Any("error", err))
return nil, err
}

if mirrorStatusResponse.CurrentFlowState != protos.FlowStatus_STATUS_PAUSED {
slog.Error("Mirror is not paused", slog.Any("mirror", req.FlowJobName))
errResponse.ErrorMessage = fmt.Sprintf(`Requested mirror %s is not paused. This is a requirement.
The mirror can be paused via PeerDB UI. Please follow %s`,
return nil, fmt.Errorf(
"requested mirror %s is not paused. This is a requirement. The mirror can be paused via PeerDB UI. Please follow %s",
req.FlowJobName, peerdbPauseGuideDocLink)
return errResponse, nil
}

// Parallel sync-normalise should not be enabled
parallelSyncNormaliseEnabled, err := peerdbenv.PeerDBEnableParallelSyncNormalize(ctx, nil)
if err != nil {
slog.Error("Server error: unable to check if parallel sync-normalise is enabled", slog.Any("error", err))
errResponse.ErrorMessage = "Server error: unable to check if parallel sync-normalise is enabled."
return errResponse, nil
return nil, errors.New("server error: unable to check if parallel sync-normalise is enabled")
}
if parallelSyncNormaliseEnabled {
errResponse.ErrorMessage = "Parallel sync-normalise is enabled. Please contact PeerDB support to disable it to proceed."
return errResponse, nil
return nil, errors.New("parallel sync-normalise is enabled. Please contact PeerDB support to disable it to proceed")
}
// ---- REQUEST VALIDATED ----

Expand All @@ -99,9 +83,7 @@ func (h *FlowRequestHandler) CustomSyncFlow(
slog.Error("Unable to kick off custom sync for mirror",
slog.Any("mirror", req.FlowJobName),
slog.Any("error", err))
errResponse.ErrorMessage = fmt.Sprintf("Unable to kick off sync for mirror %s:%s",
req.FlowJobName, err.Error())
return errResponse, nil
return nil, fmt.Errorf("unable to kick off sync for mirror %s: %w", req.FlowJobName, err)
}

slog.Info("Custom sync started for mirror",
Expand All @@ -111,7 +93,5 @@ func (h *FlowRequestHandler) CustomSyncFlow(
return &protos.CreateCustomSyncResponse{
FlowJobName: req.FlowJobName,
NumberOfSyncs: req.NumberOfSyncs,
ErrorMessage: "",
Ok: true,
}, nil
}
37 changes: 8 additions & 29 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,9 +432,7 @@ func (h *FlowRequestHandler) FlowStateChange(
}
}

return &protos.FlowStateChangeResponse{
Ok: true,
}, nil
return &protos.FlowStateChangeResponse{}, nil
}

func (h *FlowRequestHandler) handleCancelWorkflow(ctx context.Context, workflowID, runID string) error {
Expand Down Expand Up @@ -494,50 +492,33 @@ func (h *FlowRequestHandler) DropPeer(
req *protos.DropPeerRequest,
) (*protos.DropPeerResponse, error) {
if req.PeerName == "" {
return &protos.DropPeerResponse{
Ok: false,
ErrorMessage: fmt.Sprintf("Peer %s not found", req.PeerName),
}, fmt.Errorf("peer %s not found", req.PeerName)
return nil, fmt.Errorf("peer %s not found", req.PeerName)
}

// Check if peer name is in flows table
peerID, _, err := h.getPeerID(ctx, req.PeerName)
if err != nil {
return &protos.DropPeerResponse{
Ok: false,
ErrorMessage: fmt.Sprintf("Failed to obtain peer ID for peer %s: %v", req.PeerName, err),
}, fmt.Errorf("failed to obtain peer ID for peer %s: %v", req.PeerName, err)
return nil, fmt.Errorf("failed to obtain peer ID for peer %s: %w", req.PeerName, err)
}

var inMirror pgtype.Int8
queryErr := h.pool.QueryRow(ctx,
"SELECT COUNT(*) FROM flows WHERE source_peer=$1 or destination_peer=$2",
peerID, peerID).Scan(&inMirror)
if queryErr != nil {
return &protos.DropPeerResponse{
Ok: false,
ErrorMessage: fmt.Sprintf("Failed to check for existing mirrors with peer %s: %v", req.PeerName, queryErr),
}, fmt.Errorf("failed to check for existing mirrors with peer %s", req.PeerName)
return nil, fmt.Errorf("failed to check for existing mirrors with peer %s: %w", req.PeerName, queryErr)
}

if inMirror.Int64 != 0 {
return &protos.DropPeerResponse{
Ok: false,
ErrorMessage: fmt.Sprintf("Peer %s is currently involved in an ongoing mirror.", req.PeerName),
}, nil
return nil, fmt.Errorf("peer %s is currently involved in an ongoing mirror", req.PeerName)
}

_, delErr := h.pool.Exec(ctx, "DELETE FROM peers WHERE name = $1", req.PeerName)
if delErr != nil {
return &protos.DropPeerResponse{
Ok: false,
ErrorMessage: fmt.Sprintf("failed to delete peer %s from metadata table: %v", req.PeerName, delErr),
}, fmt.Errorf("failed to delete peer %s from metadata table: %v", req.PeerName, delErr)
return nil, fmt.Errorf("failed to delete peer %s from metadata table: %w", req.PeerName, delErr)
}

return &protos.DropPeerResponse{
Ok: true,
}, nil
return &protos.DropPeerResponse{}, nil
}

func (h *FlowRequestHandler) getWorkflowID(ctx context.Context, flowJobName string) (string, error) {
Expand Down Expand Up @@ -590,7 +571,5 @@ func (h *FlowRequestHandler) ResyncMirror(
if err != nil {
return nil, err
}
return &protos.ResyncMirrorResponse{
Ok: true,
}, nil
return &protos.ResyncMirrorResponse{}, nil
}
45 changes: 6 additions & 39 deletions flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,56 +67,31 @@ func (h *FlowRequestHandler) MirrorStatus(
workflowID, err := h.getWorkflowID(ctx, req.FlowJobName)
if err != nil {
slog.Error("unable to get the workflow ID of mirror", slog.Any("error", err))
return &protos.MirrorStatusResponse{
FlowJobName: req.FlowJobName,
CurrentFlowState: protos.FlowStatus_STATUS_UNKNOWN,
ErrorMessage: "unable to get the workflow ID of mirror " + req.FlowJobName,
Ok: false,
}, nil
return nil, fmt.Errorf("unable to get the workflow ID of mirror %s: %w", req.FlowJobName, err)
}

currState, err := h.getWorkflowStatus(ctx, workflowID)
if err != nil {
slog.Error("unable to get the running status of mirror", slog.Any("error", err))
return &protos.MirrorStatusResponse{
FlowJobName: req.FlowJobName,
CurrentFlowState: protos.FlowStatus_STATUS_UNKNOWN,
ErrorMessage: "unable to get the running status of mirror " + req.FlowJobName,
Ok: false,
}, nil
return nil, fmt.Errorf("unable to get the running status of mirror %s: %w", req.FlowJobName, err)
}

createdAt, err := h.getMirrorCreatedAt(ctx, req.FlowJobName)
if err != nil {
return &protos.MirrorStatusResponse{
FlowJobName: req.FlowJobName,
CurrentFlowState: protos.FlowStatus_STATUS_UNKNOWN,
ErrorMessage: "unable to get the creation time of mirror " + req.FlowJobName,
Ok: false,
}, nil
return nil, fmt.Errorf("unable to get the creation time of mirror %s: %w", req.FlowJobName, err)
}

if req.IncludeFlowInfo {
cdcFlow, err := h.isCDCFlow(ctx, req.FlowJobName)
if err != nil {
slog.Error("unable to determine if mirror is cdc", slog.Any("error", err))
return &protos.MirrorStatusResponse{
FlowJobName: req.FlowJobName,
CurrentFlowState: protos.FlowStatus_STATUS_UNKNOWN,
ErrorMessage: "unable to determine if mirror" + req.FlowJobName + "is of type CDC.",
Ok: false,
}, nil
return nil, fmt.Errorf("unable to determine if mirror %s is of type CDC: %w", req.FlowJobName, err)
}
if cdcFlow {
cdcStatus, err := h.cdcFlowStatus(ctx, req)
if err != nil {
slog.Error("unable to obtain CDC information for mirror", slog.Any("error", err))
return &protos.MirrorStatusResponse{
FlowJobName: req.FlowJobName,
CurrentFlowState: protos.FlowStatus_STATUS_UNKNOWN,
ErrorMessage: "unable to obtain CDC information for mirror " + req.FlowJobName,
Ok: false,
}, nil
return nil, fmt.Errorf("unable to obtain CDC information for mirror %s: %w", req.FlowJobName, err)
}

return &protos.MirrorStatusResponse{
Expand All @@ -125,19 +100,13 @@ func (h *FlowRequestHandler) MirrorStatus(
CdcStatus: cdcStatus,
},
CurrentFlowState: currState,
Ok: true,
CreatedAt: timestamppb.New(*createdAt),
}, nil
} else {
qrepStatus, err := h.qrepFlowStatus(ctx, req)
if err != nil {
slog.Error("unable to obtain qrep information for mirror", slog.Any("error", err))
return &protos.MirrorStatusResponse{
FlowJobName: req.FlowJobName,
CurrentFlowState: protos.FlowStatus_STATUS_UNKNOWN,
ErrorMessage: "unable to obtain snapshot information for mirror " + req.FlowJobName,
Ok: false,
}, nil
return nil, fmt.Errorf("unable to obtain snapshot information for mirror %s: %w", req.FlowJobName, err)
}

return &protos.MirrorStatusResponse{
Expand All @@ -146,7 +115,6 @@ func (h *FlowRequestHandler) MirrorStatus(
QrepStatus: qrepStatus,
},
CurrentFlowState: currState,
Ok: true,
CreatedAt: timestamppb.New(*createdAt),
}, nil
}
Expand All @@ -155,7 +123,6 @@ func (h *FlowRequestHandler) MirrorStatus(
return &protos.MirrorStatusResponse{
FlowJobName: req.FlowJobName,
CurrentFlowState: currState,
Ok: true,
CreatedAt: timestamppb.New(*createdAt),
}, nil
}
Expand Down
Loading

0 comments on commit 8ae44d3

Please sign in to comment.