diff --git a/flow/cmd/custom_sync.go b/flow/cmd/custom_sync.go index bba0c9766a..968783f27a 100644 --- a/flow/cmd/custom_sync.go +++ b/flow/cmd/custom_sync.go @@ -2,6 +2,7 @@ package cmd import ( "context" + "errors" "fmt" "log/slog" @@ -17,69 +18,52 @@ const ( func (h *FlowRequestHandler) CustomSyncFlow( ctx context.Context, req *protos.CreateCustomSyncRequest, ) (*protos.CreateCustomSyncResponse, error) { - errResponse := &protos.CreateCustomSyncResponse{ - FlowJobName: req.FlowJobName, - NumberOfSyncs: 0, - ErrorMessage: "error while processing request", - Ok: false, - } - // ---- REQUEST VALIDATION ---- if req.FlowJobName == "" { - errResponse.ErrorMessage = "Mirror name cannot be empty." - return errResponse, nil + return nil, errors.New("mirror name cannot be empty") } if req.NumberOfSyncs <= 0 || req.NumberOfSyncs > syncRequestLimit { slog.Error("Invalid sync number request", slog.Any("requested_number_of_syncs", req.NumberOfSyncs)) - errResponse.ErrorMessage = fmt.Sprintf("Sync number request must be between 1 and %d (inclusive). Requested number: %d", + return nil, fmt.Errorf("sync number request must be between 1 and %d (inclusive). Requested number: %d", syncRequestLimit, req.NumberOfSyncs) - return errResponse, nil } mirrorExists, err := h.CheckIfMirrorNameExists(ctx, req.FlowJobName) if err != nil { slog.Error("Server error: unable to check if mirror exists", slog.Any("error", err)) - errResponse.ErrorMessage = "Server error: unable to check if mirror " + req.FlowJobName + " exists." - return errResponse, nil + return nil, fmt.Errorf("server error: unable to check if mirror %s exists", req.FlowJobName) } if !mirrorExists { slog.Error("Mirror does not exist", slog.Any("mirror_name", req.FlowJobName)) - errResponse.ErrorMessage = fmt.Sprintf("Mirror %s does not exist", req.FlowJobName) - return errResponse, nil + return nil, fmt.Errorf("mirror %s does not exist", req.FlowJobName) } - mirrorStatusResponse, _ := h.MirrorStatus(ctx, &protos.MirrorStatusRequest{ + mirrorStatusResponse, err := h.MirrorStatus(ctx, &protos.MirrorStatusRequest{ FlowJobName: req.FlowJobName, }) - if mirrorStatusResponse.ErrorMessage != "" { + if err != nil { slog.Error("Server error: unable to check the status of mirror", - slog.Any("mirror", req.FlowJobName), - slog.Any("error", mirrorStatusResponse.ErrorMessage)) - errResponse.ErrorMessage = fmt.Sprintf("Server error: unable to check the status of mirror %s: %s", - req.FlowJobName, mirrorStatusResponse.ErrorMessage) - return errResponse, nil + slog.String("mirror", req.FlowJobName), + slog.Any("error", err)) + return nil, err } if mirrorStatusResponse.CurrentFlowState != protos.FlowStatus_STATUS_PAUSED { slog.Error("Mirror is not paused", slog.Any("mirror", req.FlowJobName)) - errResponse.ErrorMessage = fmt.Sprintf(`Requested mirror %s is not paused. This is a requirement. - The mirror can be paused via PeerDB UI. Please follow %s`, + return nil, fmt.Errorf( + "requested mirror %s is not paused. This is a requirement. The mirror can be paused via PeerDB UI. Please follow %s", req.FlowJobName, peerdbPauseGuideDocLink) - return errResponse, nil } // Parallel sync-normalise should not be enabled parallelSyncNormaliseEnabled, err := peerdbenv.PeerDBEnableParallelSyncNormalize(ctx, nil) if err != nil { - slog.Error("Server error: unable to check if parallel sync-normalise is enabled", slog.Any("error", err)) - errResponse.ErrorMessage = "Server error: unable to check if parallel sync-normalise is enabled." - return errResponse, nil + return nil, errors.New("server error: unable to check if parallel sync-normalise is enabled") } if parallelSyncNormaliseEnabled { - errResponse.ErrorMessage = "Parallel sync-normalise is enabled. Please contact PeerDB support to disable it to proceed." - return errResponse, nil + return nil, errors.New("parallel sync-normalise is enabled. Please contact PeerDB support to disable it to proceed") } // ---- REQUEST VALIDATED ---- @@ -99,9 +83,7 @@ func (h *FlowRequestHandler) CustomSyncFlow( slog.Error("Unable to kick off custom sync for mirror", slog.Any("mirror", req.FlowJobName), slog.Any("error", err)) - errResponse.ErrorMessage = fmt.Sprintf("Unable to kick off sync for mirror %s:%s", - req.FlowJobName, err.Error()) - return errResponse, nil + return nil, fmt.Errorf("unable to kick off sync for mirror %s: %w", req.FlowJobName, err) } slog.Info("Custom sync started for mirror", @@ -111,7 +93,5 @@ func (h *FlowRequestHandler) CustomSyncFlow( return &protos.CreateCustomSyncResponse{ FlowJobName: req.FlowJobName, NumberOfSyncs: req.NumberOfSyncs, - ErrorMessage: "", - Ok: true, }, nil } diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 28169c1d21..2b48a29260 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -432,9 +432,7 @@ func (h *FlowRequestHandler) FlowStateChange( } } - return &protos.FlowStateChangeResponse{ - Ok: true, - }, nil + return &protos.FlowStateChangeResponse{}, nil } func (h *FlowRequestHandler) handleCancelWorkflow(ctx context.Context, workflowID, runID string) error { @@ -494,19 +492,13 @@ func (h *FlowRequestHandler) DropPeer( req *protos.DropPeerRequest, ) (*protos.DropPeerResponse, error) { if req.PeerName == "" { - return &protos.DropPeerResponse{ - Ok: false, - ErrorMessage: fmt.Sprintf("Peer %s not found", req.PeerName), - }, fmt.Errorf("peer %s not found", req.PeerName) + return nil, fmt.Errorf("peer %s not found", req.PeerName) } // Check if peer name is in flows table peerID, _, err := h.getPeerID(ctx, req.PeerName) if err != nil { - return &protos.DropPeerResponse{ - Ok: false, - ErrorMessage: fmt.Sprintf("Failed to obtain peer ID for peer %s: %v", req.PeerName, err), - }, fmt.Errorf("failed to obtain peer ID for peer %s: %v", req.PeerName, err) + return nil, fmt.Errorf("failed to obtain peer ID for peer %s: %w", req.PeerName, err) } var inMirror pgtype.Int8 @@ -514,30 +506,19 @@ func (h *FlowRequestHandler) DropPeer( "SELECT COUNT(*) FROM flows WHERE source_peer=$1 or destination_peer=$2", peerID, peerID).Scan(&inMirror) if queryErr != nil { - return &protos.DropPeerResponse{ - Ok: false, - ErrorMessage: fmt.Sprintf("Failed to check for existing mirrors with peer %s: %v", req.PeerName, queryErr), - }, fmt.Errorf("failed to check for existing mirrors with peer %s", req.PeerName) + return nil, fmt.Errorf("failed to check for existing mirrors with peer %s: %w", req.PeerName, queryErr) } if inMirror.Int64 != 0 { - return &protos.DropPeerResponse{ - Ok: false, - ErrorMessage: fmt.Sprintf("Peer %s is currently involved in an ongoing mirror.", req.PeerName), - }, nil + return nil, fmt.Errorf("peer %s is currently involved in an ongoing mirror", req.PeerName) } _, delErr := h.pool.Exec(ctx, "DELETE FROM peers WHERE name = $1", req.PeerName) if delErr != nil { - return &protos.DropPeerResponse{ - Ok: false, - ErrorMessage: fmt.Sprintf("failed to delete peer %s from metadata table: %v", req.PeerName, delErr), - }, fmt.Errorf("failed to delete peer %s from metadata table: %v", req.PeerName, delErr) + return nil, fmt.Errorf("failed to delete peer %s from metadata table: %w", req.PeerName, delErr) } - return &protos.DropPeerResponse{ - Ok: true, - }, nil + return &protos.DropPeerResponse{}, nil } func (h *FlowRequestHandler) getWorkflowID(ctx context.Context, flowJobName string) (string, error) { @@ -590,7 +571,5 @@ func (h *FlowRequestHandler) ResyncMirror( if err != nil { return nil, err } - return &protos.ResyncMirrorResponse{ - Ok: true, - }, nil + return &protos.ResyncMirrorResponse{}, nil } diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go index c1f00285cf..abf6c95f87 100644 --- a/flow/cmd/mirror_status.go +++ b/flow/cmd/mirror_status.go @@ -67,56 +67,31 @@ func (h *FlowRequestHandler) MirrorStatus( workflowID, err := h.getWorkflowID(ctx, req.FlowJobName) if err != nil { slog.Error("unable to get the workflow ID of mirror", slog.Any("error", err)) - return &protos.MirrorStatusResponse{ - FlowJobName: req.FlowJobName, - CurrentFlowState: protos.FlowStatus_STATUS_UNKNOWN, - ErrorMessage: "unable to get the workflow ID of mirror " + req.FlowJobName, - Ok: false, - }, nil + return nil, fmt.Errorf("unable to get the workflow ID of mirror %s: %w", req.FlowJobName, err) } currState, err := h.getWorkflowStatus(ctx, workflowID) if err != nil { slog.Error("unable to get the running status of mirror", slog.Any("error", err)) - return &protos.MirrorStatusResponse{ - FlowJobName: req.FlowJobName, - CurrentFlowState: protos.FlowStatus_STATUS_UNKNOWN, - ErrorMessage: "unable to get the running status of mirror " + req.FlowJobName, - Ok: false, - }, nil + return nil, fmt.Errorf("unable to get the running status of mirror %s: %w", req.FlowJobName, err) } createdAt, err := h.getMirrorCreatedAt(ctx, req.FlowJobName) if err != nil { - return &protos.MirrorStatusResponse{ - FlowJobName: req.FlowJobName, - CurrentFlowState: protos.FlowStatus_STATUS_UNKNOWN, - ErrorMessage: "unable to get the creation time of mirror " + req.FlowJobName, - Ok: false, - }, nil + return nil, fmt.Errorf("unable to get the creation time of mirror %s: %w", req.FlowJobName, err) } if req.IncludeFlowInfo { cdcFlow, err := h.isCDCFlow(ctx, req.FlowJobName) if err != nil { slog.Error("unable to determine if mirror is cdc", slog.Any("error", err)) - return &protos.MirrorStatusResponse{ - FlowJobName: req.FlowJobName, - CurrentFlowState: protos.FlowStatus_STATUS_UNKNOWN, - ErrorMessage: "unable to determine if mirror" + req.FlowJobName + "is of type CDC.", - Ok: false, - }, nil + return nil, fmt.Errorf("unable to determine if mirror %s is of type CDC: %w", req.FlowJobName, err) } if cdcFlow { cdcStatus, err := h.cdcFlowStatus(ctx, req) if err != nil { slog.Error("unable to obtain CDC information for mirror", slog.Any("error", err)) - return &protos.MirrorStatusResponse{ - FlowJobName: req.FlowJobName, - CurrentFlowState: protos.FlowStatus_STATUS_UNKNOWN, - ErrorMessage: "unable to obtain CDC information for mirror " + req.FlowJobName, - Ok: false, - }, nil + return nil, fmt.Errorf("unable to obtain CDC information for mirror %s: %w", req.FlowJobName, err) } return &protos.MirrorStatusResponse{ @@ -125,19 +100,13 @@ func (h *FlowRequestHandler) MirrorStatus( CdcStatus: cdcStatus, }, CurrentFlowState: currState, - Ok: true, CreatedAt: timestamppb.New(*createdAt), }, nil } else { qrepStatus, err := h.qrepFlowStatus(ctx, req) if err != nil { slog.Error("unable to obtain qrep information for mirror", slog.Any("error", err)) - return &protos.MirrorStatusResponse{ - FlowJobName: req.FlowJobName, - CurrentFlowState: protos.FlowStatus_STATUS_UNKNOWN, - ErrorMessage: "unable to obtain snapshot information for mirror " + req.FlowJobName, - Ok: false, - }, nil + return nil, fmt.Errorf("unable to obtain snapshot information for mirror %s: %w", req.FlowJobName, err) } return &protos.MirrorStatusResponse{ @@ -146,7 +115,6 @@ func (h *FlowRequestHandler) MirrorStatus( QrepStatus: qrepStatus, }, CurrentFlowState: currState, - Ok: true, CreatedAt: timestamppb.New(*createdAt), }, nil } @@ -155,7 +123,6 @@ func (h *FlowRequestHandler) MirrorStatus( return &protos.MirrorStatusResponse{ FlowJobName: req.FlowJobName, CurrentFlowState: currState, - Ok: true, CreatedAt: timestamppb.New(*createdAt), }, nil } diff --git a/flow/cmd/validate_mirror.go b/flow/cmd/validate_mirror.go index acb87a40d9..f7d8501996 100644 --- a/flow/cmd/validate_mirror.go +++ b/flow/cmd/validate_mirror.go @@ -29,53 +29,37 @@ func (h *FlowRequestHandler) ValidateCDCMirror( mirrorExists, existCheckErr := h.CheckIfMirrorNameExists(ctx, req.ConnectionConfigs.FlowJobName) if existCheckErr != nil { slog.Error("/validatecdc failed to check if mirror name exists", slog.Any("error", existCheckErr)) - return &protos.ValidateCDCMirrorResponse{ - Ok: false, - }, existCheckErr + return nil, existCheckErr } if mirrorExists { displayErr := fmt.Errorf("mirror with name %s already exists", req.ConnectionConfigs.FlowJobName) - h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName, - fmt.Sprint(displayErr), - ) - return &protos.ValidateCDCMirrorResponse{ - Ok: false, - }, displayErr + h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName, displayErr.Error()) + return nil, displayErr } } if req.ConnectionConfigs == nil { slog.Error("/validatecdc connection configs is nil") - return &protos.ValidateCDCMirrorResponse{ - Ok: false, - }, errors.New("connection configs is nil") + return nil, errors.New("connection configs is nil") } sourcePeer, err := connectors.LoadPeer(ctx, h.pool, req.ConnectionConfigs.SourceName) if err != nil { slog.Error("/validatecdc failed to load source peer", slog.String("peer", req.ConnectionConfigs.SourceName)) - return &protos.ValidateCDCMirrorResponse{ - Ok: false, - }, err + return nil, err } sourcePeerConfig := sourcePeer.GetPostgresConfig() if sourcePeerConfig == nil { slog.Error("/validatecdc source peer config is not postgres", slog.String("peer", req.ConnectionConfigs.SourceName)) - return &protos.ValidateCDCMirrorResponse{ - Ok: false, - }, errors.New("source peer config is not postgres") + return nil, errors.New("source peer config is not postgres") } pgPeer, err := connpostgres.NewPostgresConnector(ctx, sourcePeerConfig) if err != nil { displayErr := fmt.Errorf("failed to create postgres connector: %v", err) - h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName, - fmt.Sprint(displayErr), - ) - return &protos.ValidateCDCMirrorResponse{ - Ok: false, - }, displayErr + h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName, displayErr.Error()) + return nil, displayErr } defer pgPeer.Close() @@ -87,20 +71,14 @@ func (h *FlowRequestHandler) ValidateCDCMirror( h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName, displayErr.Error(), ) - return &protos.ValidateCDCMirrorResponse{ - Ok: false, - }, displayErr + return nil, displayErr } // Check permissions of postgres peer if err := pgPeer.CheckReplicationPermissions(ctx, sourcePeerConfig.User); err != nil { displayErr := fmt.Errorf("failed to check replication permissions: %v", err) - h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName, - fmt.Sprint(displayErr), - ) - return &protos.ValidateCDCMirrorResponse{ - Ok: false, - }, displayErr + h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName, displayErr.Error()) + return nil, displayErr } } @@ -110,12 +88,8 @@ func (h *FlowRequestHandler) ValidateCDCMirror( parsedTable, parseErr := utils.ParseSchemaTable(tableMapping.SourceTableIdentifier) if parseErr != nil { displayErr := fmt.Errorf("invalid source table identifier: %s", parseErr) - h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName, - fmt.Sprint(displayErr), - ) - return &protos.ValidateCDCMirrorResponse{ - Ok: false, - }, displayErr + h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName, displayErr.Error()) + return nil, displayErr } sourceTables = append(sourceTables, parsedTable) @@ -135,37 +109,25 @@ func (h *FlowRequestHandler) ValidateCDCMirror( if err := pgPeer.CheckPublicationCreationPermissions(ctx, srcTableNames); err != nil { displayErr := fmt.Errorf("invalid publication creation permissions: %v", err) - h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName, - fmt.Sprint(displayErr), - ) - return &protos.ValidateCDCMirrorResponse{ - Ok: false, - }, displayErr + h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName, displayErr.Error()) + return nil, displayErr } } if err := pgPeer.CheckSourceTables(ctx, sourceTables, pubName, noCDC); err != nil { displayErr := fmt.Errorf("provided source tables invalidated: %v", err) slog.Error(displayErr.Error()) - h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName, - fmt.Sprint(displayErr), - ) - return &protos.ValidateCDCMirrorResponse{ - Ok: false, - }, displayErr + h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName, displayErr.Error()) + return nil, displayErr } for _, tm := range req.ConnectionConfigs.TableMappings { for _, col := range tm.Columns { if !CustomColumnTypeRegex.MatchString(col.DestinationType) { - return &protos.ValidateCDCMirrorResponse{ - Ok: false, - }, fmt.Errorf("invalid custom column type %s", col.DestinationType) + return nil, fmt.Errorf("invalid custom column type %s", col.DestinationType) } if !CustomColumnNameRegex.MatchString(col.DestinationName) { - return &protos.ValidateCDCMirrorResponse{ - Ok: false, - }, fmt.Errorf("invalid custom column name %s", col.DestinationName) + return nil, fmt.Errorf("invalid custom column name %s", col.DestinationName) } } } @@ -173,20 +135,16 @@ func (h *FlowRequestHandler) ValidateCDCMirror( dstPeer, err := connectors.LoadPeer(ctx, h.pool, req.ConnectionConfigs.DestinationName) if err != nil { slog.Error("/validatecdc failed to load destination peer", slog.String("peer", req.ConnectionConfigs.DestinationName)) - return &protos.ValidateCDCMirrorResponse{ - Ok: false, - }, err + return nil, err } if dstPeer.GetClickhouseConfig() != nil { chPeer, err := connclickhouse.NewClickHouseConnector(ctx, nil, dstPeer.GetClickhouseConfig()) if err != nil { - displayErr := fmt.Errorf("failed to create clickhouse connector: %v", err) + displayErr := fmt.Errorf("failed to create clickhouse connector: %w", err) h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName, - fmt.Sprint(displayErr), + displayErr.Error(), ) - return &protos.ValidateCDCMirrorResponse{ - Ok: false, - }, displayErr + return nil, displayErr } defer chPeer.Close() @@ -194,27 +152,21 @@ func (h *FlowRequestHandler) ValidateCDCMirror( if err != nil { displayErr := fmt.Errorf("failed to get source table schema: %v", err) h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName, - fmt.Sprint(displayErr), + displayErr.Error(), ) - return &protos.ValidateCDCMirrorResponse{ - Ok: false, - }, displayErr + return nil, displayErr } err = chPeer.CheckDestinationTables(ctx, req.ConnectionConfigs, res) if err != nil { h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName, - fmt.Sprint(err), + err.Error(), ) - return &protos.ValidateCDCMirrorResponse{ - Ok: false, - }, err + return nil, err } } - return &protos.ValidateCDCMirrorResponse{ - Ok: true, - }, nil + return &protos.ValidateCDCMirrorResponse{}, nil } func (h *FlowRequestHandler) CheckIfMirrorNameExists(ctx context.Context, mirrorName string) (bool, error) { diff --git a/nexus/flow-rs/src/grpc.rs b/nexus/flow-rs/src/grpc.rs index 8548b91152..6c0c52224e 100644 --- a/nexus/flow-rs/src/grpc.rs +++ b/nexus/flow-rs/src/grpc.rs @@ -70,16 +70,8 @@ impl FlowGrpcClient { let drop_peer_req = pt::peerdb_route::DropPeerRequest { peer_name: String::from(peer_name), }; - let response = self.client.drop_peer(drop_peer_req).await?; - let drop_response = response.into_inner(); - if drop_response.ok { - Ok(()) - } else { - Err(anyhow::anyhow!(format!( - "failed to drop peer: {:?}", - drop_response.error_message - ))) - } + self.client.drop_peer(drop_peer_req).await?; + Ok(()) } pub async fn flow_state_change( @@ -94,16 +86,8 @@ impl FlowGrpcClient { flow_config_update, drop_mirror_stats: false, }; - let response = self.client.flow_state_change(state_change_req).await?; - let state_change_response = response.into_inner(); - if state_change_response.ok { - Ok(()) - } else { - Err(anyhow::anyhow!(format!( - "failed to change the state of flow job {}: {:?}", - flow_job_name, state_change_response.error_message - ))) - } + self.client.flow_state_change(state_change_req).await?; + Ok(()) } pub async fn start_peer_flow_job( @@ -308,15 +292,7 @@ impl FlowGrpcClient { flow_job_name: flow_job_name.to_owned(), drop_stats: true, }; - let response = self.client.resync_mirror(resync_mirror_req).await?; - let resync_mirror_response = response.into_inner(); - if resync_mirror_response.ok { - Ok(()) - } else { - Err(anyhow::anyhow!(format!( - "failed to resync mirror for flow job {}: {:?}", - flow_job_name, resync_mirror_response.error_message - ))) - } + self.client.resync_mirror(resync_mirror_req).await?; + Ok(()) } } diff --git a/protos/route.proto b/protos/route.proto index 9b4f6ba2db..ddec284460 100644 --- a/protos/route.proto +++ b/protos/route.proto @@ -33,8 +33,6 @@ message CreateCustomSyncRequest { message CreateCustomSyncResponse { string flow_job_name = 1; int32 number_of_syncs = 2; - string error_message = 3; - bool ok = 4; } message AlertConfig { @@ -120,8 +118,6 @@ message DropPeerRequest { } message DropPeerResponse { - bool ok = 1; - string error_message = 2; } enum ValidatePeerStatus { @@ -323,9 +319,7 @@ message MirrorStatusResponse { QRepMirrorStatus qrep_status = 2; CDCMirrorStatus cdc_status = 3; } - string error_message = 4; peerdb_flow.FlowStatus current_flow_state = 5; - bool ok = 6; google.protobuf.Timestamp created_at = 7; } @@ -364,7 +358,6 @@ message ListMirrorLogsResponse { } message ValidateCDCMirrorResponse{ - bool ok = 1; } message ListMirrorsItem { @@ -398,8 +391,6 @@ message FlowStateChangeRequest { bool drop_mirror_stats = 6; } message FlowStateChangeResponse { - bool ok = 1; - string error_message = 2; } message PeerDBVersionRequest { @@ -414,8 +405,6 @@ message ResyncMirrorRequest { } message ResyncMirrorResponse { - bool ok = 1; - string error_message = 2; } service FlowService { diff --git a/ui/app/mirrors/[mirrorId]/edit/page.tsx b/ui/app/mirrors/[mirrorId]/edit/page.tsx index 766fb2141c..3b48151761 100644 --- a/ui/app/mirrors/[mirrorId]/edit/page.tsx +++ b/ui/app/mirrors/[mirrorId]/edit/page.tsx @@ -53,21 +53,20 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => { const { push } = useRouter(); const fetchStateAndUpdateDeps = useCallback(async () => { - await getMirrorState(mirrorId).then((res) => { - setMirrorState(res); - - setConfig({ - batchSize: - (res as MirrorStatusResponse).cdcStatus?.config?.maxBatchSize || - defaultBatchSize, - idleTimeout: - (res as MirrorStatusResponse).cdcStatus?.config?.idleTimeoutSeconds || - defaultIdleTimeout, - additionalTables: [], - removedTables: [], - numberOfSyncs: 0, - updatedEnv: {}, - }); + const res = await getMirrorState(mirrorId); + setMirrorState(res); + + setConfig({ + batchSize: + (res as MirrorStatusResponse).cdcStatus?.config?.maxBatchSize || + defaultBatchSize, + idleTimeout: + (res as MirrorStatusResponse).cdcStatus?.config?.idleTimeoutSeconds || + defaultIdleTimeout, + additionalTables: [], + removedTables: [], + numberOfSyncs: 0, + updatedEnv: {}, }); }, [mirrorId, defaultBatchSize, defaultIdleTimeout]); diff --git a/ui/app/mirrors/[mirrorId]/handlers.ts b/ui/app/mirrors/[mirrorId]/handlers.ts index 4f3de81d79..7e68f26a53 100644 --- a/ui/app/mirrors/[mirrorId]/handlers.ts +++ b/ui/app/mirrors/[mirrorId]/handlers.ts @@ -14,6 +14,7 @@ export const getMirrorState = async ( include_flow_info: true, }), }); + if (!res.ok) throw res.json(); return res.json(); }; diff --git a/ui/app/mirrors/[mirrorId]/page.tsx b/ui/app/mirrors/[mirrorId]/page.tsx index b3637af32c..5516e0986a 100644 --- a/ui/app/mirrors/[mirrorId]/page.tsx +++ b/ui/app/mirrors/[mirrorId]/page.tsx @@ -23,12 +23,17 @@ type EditMirrorProps = { export default function ViewMirror({ params: { mirrorId } }: EditMirrorProps) { const [mirrorState, setMirrorState] = useState(); + const [errorMessage, setErrorMessage] = useState(''); const [mounted, setMounted] = useState(false); const fetchState = useCallback(async () => { - const res = await getMirrorState(mirrorId); - setMirrorState(res); setMounted(true); + try { + const res = await getMirrorState(mirrorId); + setMirrorState(res); + } catch (ex: any) { + setErrorMessage(ex.message); + } }, [mirrorId]); useEffect(() => { fetchState(); @@ -38,7 +43,7 @@ export default function ViewMirror({ params: { mirrorId } }: EditMirrorProps) { return <>; } - if (mirrorState?.errorMessage) { + if (errorMessage) { return ; } diff --git a/ui/app/mirrors/create/handlers.ts b/ui/app/mirrors/create/handlers.ts index 64c91431f9..ec6729defc 100644 --- a/ui/app/mirrors/create/handlers.ts +++ b/ui/app/mirrors/create/handlers.ts @@ -18,7 +18,6 @@ import { PeerSchemasResponse, SchemaTablesResponse, TableColumnsResponse, - ValidateCDCMirrorResponse, } from '@/grpc_generated/route'; import { Dispatch, SetStateAction } from 'react'; @@ -512,10 +511,7 @@ export async function handleValidateCDC( }), }); if (statusRes.ok) { - const status: ValidateCDCMirrorResponse = await statusRes.json(); - if (status.ok) { - notifyErr('CDC Mirror is valid', true); - } + notifyErr('CDC Mirror is valid', true); } else { const errRes = await statusRes.json(); notifyErr('CDC Mirror is invalid: ' + errRes.message); diff --git a/ui/components/DropDialog.tsx b/ui/components/DropDialog.tsx index 93ed6bdf8d..221260d034 100644 --- a/ui/components/DropDialog.tsx +++ b/ui/components/DropDialog.tsx @@ -2,7 +2,6 @@ import { changeFlowState } from '@/app/mirrors/[mirrorId]/handlers'; import { DeleteScript } from '@/app/scripts/handlers'; import { FlowStatus } from '@/grpc_generated/flow'; -import { DropPeerResponse } from '@/grpc_generated/route'; import { Button } from '@/lib/Button'; import { Checkbox } from '@/lib/Checkbox'; import { Dialog, DialogClose } from '@/lib/Dialog'; @@ -71,20 +70,21 @@ export const DropDialog = ({ } setLoading(true); - const dropRes: DropPeerResponse = await fetch('api/v1/peers/drop', { + const dropRes = await fetch('api/v1/peers/drop', { method: 'POST', body: JSON.stringify(dropArgs), - }).then((res) => res.json()); + }); setLoading(false); - if (dropRes.ok !== true) + if (dropRes.ok) { + setMsg('Peer dropped successfully.'); + window.location.reload(); + } else { + const dropResError = await dropRes.json(); setMsg( `Unable to drop peer ${dropArgs.peerName}. ${ - dropRes.errorMessage ?? '' + dropResError.message ?? '' }` ); - else { - setMsg('Peer dropped successfully.'); - window.location.reload(); } }; diff --git a/ui/components/ResyncDialog.tsx b/ui/components/ResyncDialog.tsx index 312fa1a948..65c88bff1f 100644 --- a/ui/components/ResyncDialog.tsx +++ b/ui/components/ResyncDialog.tsx @@ -30,27 +30,27 @@ export const ResyncDialog = ({ mirrorName }: ResyncDialogProps) => { msg: 'Requesting a resync. You can close this dialog and check the status', color: 'base', }); - const resyncRes = await fetch('/api/v1/mirrors/resync', { + const resyncResponse = await fetch('/api/v1/mirrors/resync', { method: 'POST', body: JSON.stringify({ flowJobName: mirrorName, dropStats: true, } as ResyncMirrorRequest), - }).then((res) => res.json()); - if (resyncRes.ok !== true) { + }); + if (resyncResponse.ok) { + setMsg({ + msg: 'Resync has been initiated. You may reload this window to see the progress.', + color: 'positive', + }); + setSyncing(false); + } else { + const resyncRes = await resyncResponse.json(); setMsg({ msg: `Unable to resync mirror ${mirrorName}. ${resyncRes.message ?? ''}`, color: 'destructive', }); setSyncing(false); - return; } - setMsg({ - msg: 'Resync has been initiated. You may reload this window to see the progress.', - color: 'positive', - }); - setSyncing(false); - return; }; return (