Skip to content

Commit

Permalink
Merge pull request #51 from buildbarn/grpc-bg-ctx
Browse files Browse the repository at this point in the history
Push Database Finish Processing Tasks to context background process
  • Loading branch information
trey-ivy authored Oct 28, 2024
2 parents ce9f563 + acedc53 commit 9bc68bb
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 30 deletions.
7 changes: 6 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@ download:

.PHONY: lint
lint:
golangci-lint --timeout 10m run ./...
bazel mod tidy
bazel run //:gazelle
bazel run @com_github_bazelbuild_buildtools//:buildifier
bazel run @cc_mvdan_gofumpt//:gofumpt -- -w -extra $(CURDIR)
bazel run @org_golang_x_lint//golint -- -set_exit_status $(CURDIR)/...
bazel test //...

.PHONY: lint-fix
lint-fix:
Expand Down
2 changes: 1 addition & 1 deletion ent/gen/ent/migrate/schema.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion ent/schema/eventfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type EventFile struct {
// Fields of the EventFile.
func (EventFile) Fields() []ent.Field {
return []ent.Field{
field.String("url").Unique().Immutable(),
field.String("url").Immutable(),
field.Time("mod_time"),
field.String("protocol"), // *.bep, *.log, etc
field.String("mime_type"),
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/components/BazelInvocationsTable/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const BazelInvocationsTable: React.FC<Props> = ({ height }) => {
const { loading, data, previousData, error } = useQuery(FIND_BAZEL_INVOCATIONS_QUERY, {
variables,
pollInterval: 120000,
fetchPolicy: 'cache-and-network',
fetchPolicy: "network-only",
});

const onChange: TableProps<BazelInvocationNodeFragment>['onChange'] = useCallback(
Expand Down
30 changes: 22 additions & 8 deletions internal/api/grpc/bes/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ func (c *buildEventChannel) HandleBuildEvent(event *build.BuildEvent) error {
if event.GetBazelEvent() == nil {
return nil
}

var bazelEvent bes.BuildEvent
err := event.GetBazelEvent().UnmarshalTo(&bazelEvent)
if err != nil {
Expand All @@ -56,25 +55,40 @@ func (c *buildEventChannel) HandleBuildEvent(event *build.BuildEvent) error {

// Finalize implements BuildEventChannel.Finalize.
func (c *buildEventChannel) Finalize() error {
// defer the ctx so its not reaped when the client closes the connection
ctx, cancel := context.WithTimeout(context.Background(), time.Hour*24)
defer cancel()
summaryReport, err := c.summarizer.FinishProcessing()
if err != nil {
slog.ErrorContext(c.ctx, "FinishProcessing failed", "err", err)
cancel()
return err
}

// Hack for eventFile being required
summaryReport.EventFileURL = fmt.Sprintf(
"grpc://localhost:8082/google.devtools.build.v1/PublishLifecycleEvent?streamID=%s",
c.streamID.String(),
"grpc://localhost:8082/google.devtools.build.v1/PublishLifecycleEvent?invocationId=%s&buildID=%s&component=%s",
c.streamID.GetInvocationId(), c.streamID.GetBuildId(), c.streamID.GetComponent(),
)

slog.InfoContext(c.ctx, "Saving invocation", "id", c.streamID.String())
slog.InfoContext(c.ctx, "Saving invocation",
"InvocationId", c.streamID.GetInvocationId(),
"BuildId", c.streamID.GetBuildId(),
"Component", c.streamID.GetComponent())
startTime := time.Now()
invocation, err := c.workflow.SaveSummary(c.ctx, summaryReport)
// try to get the invocation id
if summaryReport.InvocationID == "" {
summaryReport.InvocationID = c.streamID.GetInvocationId()
slog.WarnContext(c.ctx, "summaryReport was missing invocation ID",
"invocationId", c.streamID.GetInvocationId(),
"buildId", c.streamID.GetBuildId(),
"component", c.streamID.GetComponent())
}
invocation, err := c.workflow.SaveSummary(ctx, summaryReport)
if err != nil {
slog.ErrorContext(c.ctx, "SaveSummary failed", "err", err)
slog.ErrorContext(ctx, "SaveSummary failed", "err", err)
cancel()
return err
}
cancel()
endTime := time.Now()
elapsedTime := endTime.Sub(startTime)
slog.InfoContext(c.ctx, fmt.Sprintf("Saved invocation in %v", elapsedTime.String()), "id", invocation.InvocationID)
Expand Down
31 changes: 28 additions & 3 deletions internal/api/grpc/bes/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,35 @@ func (s BuildEventServer) PublishBuildToolEventStream(stream build.PublishBuildE
// We'll want to ack these once all events are received, as we don't support resumption.
seqNrs := make([]int64, 0)

ack := func(streamID *build.StreamId, sequenceNumber int64) {
ack := func(streamID *build.StreamId, sequenceNumber int64, isClosing bool) {
if err := stream.Send(&build.PublishBuildToolEventStreamResponse{
StreamId: streamID,
SequenceNumber: sequenceNumber,
}); err != nil {
slog.ErrorContext(stream.Context(), "Send failed", "err", err)

// with the option --bes_upload_mode=fully_async or nowait_for_upload_complete
// its not an error when the send fails. the bes gracefully terminated the close
// i.e. sent an EOF. for long running builds that take a while to save to the db (> 1s)
// the context is processed in the background, so by the time we are acknowledging these
// requests, the client connection may have already timed out and these errors can be
// safely ignored
grpcErr := status.Convert(err)
if isClosing &&
grpcErr.Code() == codes.Unavailable &&
grpcErr.Message() == "transport is closing" {
return
}

slog.ErrorContext(
stream.Context(),
"Send failed",
"err",
err,
"streamid",
streamID,
"sequenceNumber",
sequenceNumber,
)
}
}

Expand All @@ -77,7 +100,9 @@ func (s BuildEventServer) PublishBuildToolEventStream(stream build.PublishBuildE
case err := <-errCh:
if err == io.EOF {
slog.InfoContext(stream.Context(), "Stream finished", "event", stream.Context())

if eventCh == nil {
slog.WarnContext(stream.Context(), "No event channel found for stream event", "event", stream.Context())
return nil
}

Expand All @@ -100,7 +125,7 @@ func (s BuildEventServer) PublishBuildToolEventStream(stream build.PublishBuildE

// Ack all events
for _, seqNr := range seqNrs {
ack(streamID, seqNr)
ack(streamID, seqNr, true)
}

return nil
Expand Down
42 changes: 27 additions & 15 deletions pkg/processing/save.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,35 +30,35 @@ type SaveActor struct {

// SaveSummary saves an invocation summary to the database.
func (act SaveActor) SaveSummary(ctx context.Context, summary *summary.Summary) (*ent.BazelInvocation, error) {
// errors := []error{}
if summary.InvocationID == "" {
slog.ErrorContext(ctx, "No Invocation ID Found on summary", "ctx.Err()", ctx.Err())
return nil, fmt.Errorf("no Invocation ID Found on summary")
}
eventFile, err := act.saveEventFile(ctx, summary)
if err != nil {
slog.ErrorContext(ctx, "failed to save event file", "id", summary.InvocationID, "err", err)
return nil, fmt.Errorf("could not save EventFile: %w", err)
}
buildRecord, err := act.findOrCreateBuild(ctx, summary)
if err != nil {
slog.ErrorContext(ctx, "failed to find or create build", "id", summary.InvocationID, "err", err)
return nil, err
slog.ErrorContext(ctx, "failed to find or create build", "summary.InvocationId", summary.InvocationID, "err", err)
}
metrics, err := act.saveMetrics(ctx, summary.Metrics)
if err != nil {
slog.ErrorContext(ctx, "failed to save metrics", "id", summary.InvocationID, "err", err)
return nil, fmt.Errorf("could not save Metrics: %w", err)
}
targets, err := act.saveTargets(ctx, summary)
if err != nil {
slog.ErrorContext(ctx, "failed to save targets", "id", summary.InvocationID, "err", err)
return nil, fmt.Errorf("could not save Targets: %w", err)
}
tests, err := act.saveTests(ctx, summary)
if err != nil {
slog.ErrorContext(ctx, "failed to save tests", "id", summary.InvocationID, "err", err)
return nil, fmt.Errorf("could not save test results: %w", err)
tests = nil
}
sourcecontrol, err := act.saveSourceControl(ctx, summary)
if err != nil {
slog.ErrorContext(ctx, "failed to save source control information", "id", summary.InvocationID, "err", err)
return nil, fmt.Errorf("could not save source control information: %w", err)
}
bazelInvocation, err := act.saveBazelInvocation(ctx, summary, eventFile, buildRecord, metrics, tests, targets, sourcecontrol)
if err != nil {
Expand Down Expand Up @@ -139,6 +139,9 @@ func (act SaveActor) saveBazelInvocation(
targets []*ent.TargetPair,
sourcecontrol *ent.SourceControl,
) (*ent.BazelInvocation, error) {
if summary == nil {
return nil, fmt.Errorf("no summary object provided")
}
uniqueID, err := uuid.Parse(summary.InvocationID)
if err != nil {
return nil, err
Expand All @@ -158,15 +161,24 @@ func (act SaveActor) saveBazelInvocation(
SetConfigurationMnemonic(summary.ConfigrationMnemonic).
SetPlatformName(summary.PlatformName).
SetNumFetches(summary.NumFetches).
SetBuildLogs(summary.BuildLogs.String()).
SetUserLdap(summary.UserLDAP).
SetRelatedFiles(summary.RelatedFiles).
SetEventFile(eventFile).
SetMetrics(metrics).
SetSourceControl(sourcecontrol).
AddTestCollection(tests...).
AddTargets(targets...)
SetRelatedFiles(summary.RelatedFiles)

if eventFile != nil {
create = create.SetEventFile(eventFile)
}
if metrics != nil {
create = create.SetMetrics(metrics)
}
if tests != nil {
create = create.AddTestCollection(tests...)
}
if targets != nil {
create = create.AddTargets(targets...)
}
if sourcecontrol != nil {
create = create.SetSourceControl(sourcecontrol)
}
if buildRecord != nil {
create = create.SetBuild(buildRecord)
}
Expand Down Expand Up @@ -210,7 +222,7 @@ func (act SaveActor) saveTestFiles(ctx context.Context, files []summary.TestFile
func (act SaveActor) saveOutputGroup(ctx context.Context, ouputGroup summary.OutputGroup) (*ent.OutputGroup, error) {
inlineFiles, err := act.saveTestFiles(ctx, ouputGroup.InlineFiles)
if err != nil {
slog.ErrorContext(ctx, "failed to save output group", "id", "err", err)
slog.ErrorContext(ctx, "failed to save output group", "err", err)
return nil, err
}

Expand Down

0 comments on commit 9bc68bb

Please sign in to comment.