diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index ed9d2b720d..bd7d0ee922 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -218,6 +218,7 @@ func (a *FlowableActivity) CreateNormalizedTable( ) (*protos.SetupNormalizedTableBatchOutput, error) { logger := activity.GetLogger(ctx) ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName) + a.Alerter.LogFlowInfo(ctx, config.FlowName, "Setting up destination tables") conn, err := connectors.GetByNameAs[connectors.NormalizedTablesConnector](ctx, config.Env, a.CatalogPool, config.PeerName) if err != nil { if errors.Is(err, errors.ErrUnsupported) { @@ -246,6 +247,7 @@ func (a *FlowableActivity) CreateNormalizedTable( }) defer shutdown() + a.Alerter.LogFlowInfo(ctx, config.FlowName, "Setting up destination tables") tableExistsMapping := make(map[string]bool, len(tableNameSchemaMapping)) for tableIdentifier, tableSchema := range tableNameSchemaMapping { existing, err := conn.SetupNormalizedTable( @@ -264,15 +266,19 @@ func (a *FlowableActivity) CreateNormalizedTable( numTablesSetup.Add(1) if !existing { logger.Info("created table " + tableIdentifier) + a.Alerter.LogFlowInfo(ctx, config.FlowName, "created table "+tableIdentifier+" in destination") } else { logger.Info("table already exists " + tableIdentifier) } + } if err := conn.FinishSetupNormalizedTables(ctx, tx); err != nil { return nil, fmt.Errorf("failed to commit normalized tables tx: %w", err) } + a.Alerter.LogFlowInfo(ctx, config.FlowName, "All destination tables have been setup") + return &protos.SetupNormalizedTableBatchOutput{ TableExistsMapping: tableExistsMapping, }, nil @@ -510,6 +516,8 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context, } } + a.Alerter.LogFlowInfo(ctx, config.FlowJobName, fmt.Sprintf("obtained partitions for table %s", config.WatermarkTable)) + return &protos.QRepParitionResult{ Partitions: partitions, }, nil @@ -577,6 +585,7 @@ func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context, } } + a.Alerter.LogFlowInfo(ctx, config.FlowJobName, "replicated all rows to destination for table "+config.DestinationTableIdentifier) return nil } @@ -638,6 +647,8 @@ func (a *FlowableActivity) DropFlowSource(ctx context.Context, req *protos.DropF return pullCleanupErr } + a.Alerter.LogFlowInfo(ctx, req.FlowJobName, "Cleaned up source peer replication objects. Any replication slot or publication created by PeerDB has been removed.") + return nil } @@ -657,6 +668,8 @@ func (a *FlowableActivity) DropFlowDestination(ctx context.Context, req *protos. return syncFlowCleanupErr } + a.Alerter.LogFlowInfo(ctx, req.FlowJobName, "Cleaned up destination peer replication objects. Any PeerDB metadata storage has been dropped.") + return nil } @@ -897,6 +910,8 @@ func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.Rena } } + a.Alerter.LogFlowInfo(ctx, config.FlowJobName, "Resync completed for all tables") + return renameOutput, tx.Commit(ctx) } @@ -970,6 +985,9 @@ func (a *FlowableActivity) AddTablesToPublication(ctx context.Context, cfg *prot if err != nil { a.Alerter.LogFlowError(ctx, cfg.FlowJobName, err) } + + a.Alerter.LogFlowInfo(ctx, cfg.FlowJobName, fmt.Sprintf("ensured %d tables exist in publication %s", + len(additionalTableMappings), cfg.PublicationName)) return err } @@ -993,6 +1011,9 @@ func (a *FlowableActivity) RemoveTablesFromPublication( if err != nil { a.Alerter.LogFlowError(ctx, cfg.FlowJobName, err) } + + a.Alerter.LogFlowInfo(ctx, cfg.FlowJobName, fmt.Sprintf("removed %d tables from publication %s", + len(removedTablesMapping), cfg.PublicationName)) return err } diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index a9f58a5f3f..7d0d8cf022 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -229,6 +229,11 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon return nil, fmt.Errorf("failed to sync schema: %w", err) } + numberOfSchemaChanges := len(recordBatchSync.SchemaDeltas) + if numberOfSchemaChanges > 0 { + a.Alerter.LogFlowInfo(ctx, flowName, fmt.Sprintf("synced %d schema changes from source to destination", numberOfSchemaChanges)) + } + return &model.SyncCompositeResponse{ SyncResponse: &model.SyncResponse{ CurrentSyncBatchID: -1, @@ -322,7 +327,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon } } - pushedRecordsWithCount := fmt.Sprintf("pushed %d records", numRecords) + pushedRecordsWithCount := fmt.Sprintf("pushed %d records into intermediate storage", numRecords) activity.RecordHeartbeat(ctx, pushedRecordsWithCount) a.Alerter.LogFlowInfo(ctx, flowName, pushedRecordsWithCount) diff --git a/flow/activities/snapshot_activity.go b/flow/activities/snapshot_activity.go index 01c9e748e6..5993bfe463 100644 --- a/flow/activities/snapshot_activity.go +++ b/flow/activities/snapshot_activity.go @@ -58,7 +58,7 @@ func (a *SnapshotActivity) SetupReplication( ) (*protos.SetupReplicationOutput, error) { ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) logger := activity.GetLogger(ctx) - + a.Alerter.LogFlowInfo(ctx, config.FlowJobName, "Setting up replication slot and publication") a.Alerter.LogFlowEvent(ctx, config.FlowJobName, "Started Snapshot Flow Job") conn, err := connectors.GetByNameAs[*connpostgres.PostgresConnector](ctx, nil, a.CatalogPool, config.PeerName) @@ -98,6 +98,8 @@ func (a *SnapshotActivity) SetupReplication( connector: conn, } + a.Alerter.LogFlowInfo(ctx, config.FlowJobName, "Replication slot and publication setup complete") + return &protos.SetupReplicationOutput{ SlotName: slotInfo.SlotName, SnapshotName: slotInfo.SnapshotName,