Skip to content

Commit

Permalink
Updated Pulsar message spec (#8)
Browse files Browse the repository at this point in the history
* Comments

* comments

* Added function to return a request id or missing if none is found

* Updated events spec

* Updated Pulsar e2e tests

* Removed commented-out code

* Updated state transition message adapter to reflect changes to the proto

* Generate JobSucceeded on JobRunSucceeded, logging

* Provide Pulsar producer for SubmitFromLog service

* Added utility function to insert error information and stack trace to a logrus.Entry

* Removed deprecated code

* Import ordering

* Removed commented-out code

* Removed commented-out code

* Added isSequencef that takes a message to be logged on error

* Removed commented-out code

* Comments, removed debug logging
  • Loading branch information
Albin Severinson authored and Gary Conway committed Mar 25, 2022
1 parent 2b51e58 commit c9169ba
Show file tree
Hide file tree
Showing 11 changed files with 2,646 additions and 2,303 deletions.
93 changes: 64 additions & 29 deletions e2e/test/pulsar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/apache/pulsar-client-go/pulsar"
"github.com/gogo/protobuf/proto"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -27,6 +28,9 @@ const pulsarSubscription = "e2e-test-topic"
const armadaUrl = "localhost:50051"
const armadaQueueName = "e2e-test-queue"

// Namespace created by the test setup. Used when submitting test jobs.
const userNamespace = "personal-anonymous"

// Test publishing and receiving a message to/from Pulsar.
func TestPublishReceive(t *testing.T) {
err := withSetup(func(ctx context.Context, _ api.SubmitClient, producer pulsar.Producer, consumer pulsar.Consumer) error {
Expand All @@ -53,43 +57,78 @@ func TestPublishReceive(t *testing.T) {
func TestSubmitJobTransitions(t *testing.T) {
err := withSetup(func(ctx context.Context, client api.SubmitClient, producer pulsar.Producer, consumer pulsar.Consumer) error {
ctxWithTimeout, _ := context.WithTimeout(context.Background(), time.Second)
req := createJobSubmitRequest("personal-anonymous") // Namespace created by test setup
req := createJobSubmitRequest(userNamespace)
_, err := client.SubmitJobs(ctxWithTimeout, req)
if err != nil {
return err
}

numEventsExpected := 5
sequence, err := receiveJobSetSequence(ctx, consumer, armadaQueueName, req.JobSetId, numEventsExpected, 10*time.Second)
expected := &events.EventSequence{
Queue: armadaQueueName,
JobSetName: req.JobSetId,
Events: []*events.EventSequence_Event{
{Event: &events.EventSequence_Event_SubmitJob{}},
{Event: &events.EventSequence_Event_JobRunLeased{}},
{Event: &events.EventSequence_Event_JobRunAssigned{}},
{Event: &events.EventSequence_Event_JobRunRunning{}},
{Event: &events.EventSequence_Event_JobRunSucceeded{}},
{Event: &events.EventSequence_Event_JobSucceeded{}},
},
}

numEventsExpected := len(expected.Events)
actual, err := receiveJobSetSequence(ctx, consumer, armadaQueueName, req.JobSetId, numEventsExpected, 10*time.Second)
if err != nil {
return err
}
if ok := assert.Equal(t, numEventsExpected, len(sequence.Events)); !ok {
return nil
}
if ok := assert.IsType(t, &events.EventSequence_Event_SubmitJob{}, sequence.Events[0].Event); !ok {
return nil
}
if ok := assert.IsType(t, &events.EventSequence_Event_JobRunLeased{}, sequence.Events[1].Event); !ok {
return nil
}
if ok := assert.IsType(t, &events.EventSequence_Event_JobRunAssigned{}, sequence.Events[2].Event); !ok {
return nil
}
if ok := assert.IsType(t, &events.EventSequence_Event_JobRunRunning{}, sequence.Events[3].Event); !ok {
return nil
}
if ok := assert.IsType(t, &events.EventSequence_Event_JobRunSucceeded{}, sequence.Events[4].Event); !ok {

if ok := isSequencef(t, expected, actual, "Event sequence error; printing diff:\n%s", cmp.Diff(expected, actual)); !ok {
return nil
}
// TODO: We should also have a job succeeded message here.
fmt.Printf("Received %d events\n", len(sequence.Events))

return nil
})
assert.NoError(t, err)
}

// Compare an expected sequence of events with the actual sequence.
// Calls into the assert function to make comparison.
// Returns true if the two sequences are equal and false otherwise.
func isSequence(t *testing.T, expected *events.EventSequence, actual *events.EventSequence) (ok bool) {
return isSequencef(t, expected, actual, "")
}

// Like isSequence, but logs msg if a comparison fails.
func isSequencef(t *testing.T, expected *events.EventSequence, actual *events.EventSequence, msg string, args ...interface{}) (ok bool) {
defer func() {
if !ok && msg != "" {
t.Logf(msg, args...)
}
}()
if ok = assert.NotNil(t, expected); !ok {
return false
}
if ok = assert.NotNil(t, actual); !ok {
return false
}
if ok = assert.Equal(t, expected.Queue, actual.Queue); !ok {
return false
}
if ok = assert.Equal(t, expected.JobSetName, actual.JobSetName); !ok {
return false
}
if ok = assert.Equal(t, len(expected.Events), len(actual.Events)); !ok {
return false
}
for i, expectedEvent := range expected.Events {
actualEvent := actual.Events[i]
if ok := assert.IsTypef(t, expectedEvent.Event, actualEvent.Event, "%d-th event differed: %s", i, actualEvent); !ok {
return false
}
}
return true
}

// receiveJobSetSequence receives messages from Pulsar, discarding any messages not for queue and jobSetName.
// The events contained in the remaining messages are collected in a single sequence, which is returned.
func receiveJobSetSequence(ctx context.Context, consumer pulsar.Consumer, queue string, jobSetName string, numEventsExpected int, timeout time.Duration) (result *events.EventSequence, err error) {
Expand Down Expand Up @@ -167,15 +206,11 @@ func withSetup(action func(ctx context.Context, submitClient api.SubmitClient, p
defer conn.Close()
submitClient := api.NewSubmitClient(conn)

// Recreate the queue to make sure it's empty.
err = client.DeleteQueue(submitClient, armadaQueueName)
if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound {
// Queue didn't exist, which is fine; do nothing.
} else if err != nil {
return err
}
// Create queue needed for tests.
err = client.CreateQueue(submitClient, &api.Queue{Name: armadaQueueName, PriorityFactor: 1})
if err != nil {
if st, ok := status.FromError(err); ok && st.Code() == codes.AlreadyExists {
// Queue already exists; we don't need to create it.
} else if err != nil {
return err
}

Expand Down
14 changes: 14 additions & 0 deletions internal/armada/scheduling/clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,16 @@ import (
"github.com/G-Research/armada/pkg/api"
)

// Each executor periodically reports cluster resource usage to the server.
// A cluster is considered inactive if the most recent such report is older than this amount of time.
const activeClusterExpiry = 10 * time.Minute

// Each executor periodically sends a list of all nodes in its cluster to the server.
// These lists are used by the scheduler and are considered valid for this amount of time.
const recentlyActiveClusterExpiry = 60 * time.Minute

// FilterActiveClusters returns the subset of reports corresponding to active clusters.
// A cluster is considered active if the most recent ClusterUsageReport was received less than activeClusterExpiry ago.
func FilterActiveClusters(reports map[string]*api.ClusterUsageReport) map[string]*api.ClusterUsageReport {
result := map[string]*api.ClusterUsageReport{}
now := time.Now()
Expand All @@ -20,6 +27,7 @@ func FilterActiveClusters(reports map[string]*api.ClusterUsageReport) map[string
return result
}

// FilterPoolClusters returns the subset of reports for which the pool has a specific value.
func FilterPoolClusters(pool string, reports map[string]*api.ClusterUsageReport) map[string]*api.ClusterUsageReport {
result := map[string]*api.ClusterUsageReport{}
for id, report := range reports {
Expand All @@ -30,6 +38,7 @@ func FilterPoolClusters(pool string, reports map[string]*api.ClusterUsageReport)
return result
}

// GroupByPool returns a map from pool name to another map, which in turn maps report ids to reports.
func GroupByPool(reports map[string]*api.ClusterUsageReport) map[string]map[string]*api.ClusterUsageReport {
result := map[string]map[string]*api.ClusterUsageReport{}
for id, report := range reports {
Expand All @@ -43,6 +52,8 @@ func GroupByPool(reports map[string]*api.ClusterUsageReport) map[string]map[stri
return result
}

// FilterClusterLeasedReports returns the subset of reports with id in the provided slice of ids.
// ids for which there is no corresponding report are ignored.
func FilterClusterLeasedReports(ids []string, reports map[string]*api.ClusterLeasedReport) map[string]*api.ClusterLeasedReport {
result := map[string]*api.ClusterLeasedReport{}
for _, id := range ids {
Expand All @@ -53,6 +64,7 @@ func FilterClusterLeasedReports(ids []string, reports map[string]*api.ClusterLea
return result
}

// FilterActiveClusterSchedulingInfoReports returns the subset of reports within the expiry time.
func FilterActiveClusterSchedulingInfoReports(reports map[string]*api.ClusterSchedulingInfoReport) map[string]*api.ClusterSchedulingInfoReport {
result := map[string]*api.ClusterSchedulingInfoReport{}
now := time.Now()
Expand All @@ -64,6 +76,7 @@ func FilterActiveClusterSchedulingInfoReports(reports map[string]*api.ClusterSch
return result
}

// GroupSchedulingInfoByPool returns a map from pool name to another map, which in turn maps report ids to reports.
func GroupSchedulingInfoByPool(reports map[string]*api.ClusterSchedulingInfoReport) map[string]map[string]*api.ClusterSchedulingInfoReport {
result := map[string]map[string]*api.ClusterSchedulingInfoReport{}
for id, report := range reports {
Expand All @@ -77,6 +90,7 @@ func GroupSchedulingInfoByPool(reports map[string]*api.ClusterSchedulingInfoRepo
return result
}

// GetClusterReportIds returns a slice composed of all unique report ids in the provided map.
func GetClusterReportIds(reports map[string]*api.ClusterUsageReport) []string {
var result []string
for id := range reports {
Expand Down
6 changes: 6 additions & 0 deletions internal/armada/scheduling/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ func (c *leaseContext) distributeRemainder(limit LeasePayloadLimit) ([]*api.Job,
return jobs, nil
}

// leaseJobs calls into the JobRepository underlying the queue contained in the leaseContext to lease jobs.
// Returns a slice of jobs that were leased.
func (c *leaseContext) leaseJobs(queue *api.Queue, slice common.ComputeResourcesFloat, limit LeasePayloadLimit) ([]*api.Job, common.ComputeResourcesFloat, error) {
jobs := make([]*api.Job, 0)
remainder := slice
Expand Down Expand Up @@ -321,6 +323,7 @@ func (c *leaseContext) decreaseNodeResources(leased []*api.Job, nodeTypeUsage ma
}
}

// removeJobs returns the subset of jobs not in jobsToRemove.
func removeJobs(jobs []*api.Job, jobsToRemove []*api.Job) []*api.Job {
jobsToRemoveIds := make(map[string]bool, len(jobsToRemove))
for _, job := range jobsToRemove {
Expand All @@ -336,6 +339,9 @@ func removeJobs(jobs []*api.Job, jobsToRemove []*api.Job) []*api.Job {
return result
}

// pickQueueRandomly returns a queue randomly selected from the provided map.
// The probability of returning a particular queue AQueue is shares[AQueue] / sharesSum,
// where sharesSum is the sum of all values in the provided map.
func pickQueueRandomly(shares map[*api.Queue]float64) *api.Queue {
sum := 0.0
for _, share := range shares {
Expand Down
9 changes: 9 additions & 0 deletions internal/armada/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,17 @@ func Serve(config *configuration.ArmadaConfig, healthChecks *health.MultiChecker
panic(err)
}

// Create a new producer for this service.
producer, err = pulsarClient.CreateProducer(pulsar.ProducerOptions{
Topic: config.Pulsar.Topic,
})
if err != nil {
panic(err)
}

submitFromLog := server.SubmitFromLog{
Consumer: consumer,
Producer: producer,
SubmitServer: submitServer,
}
go submitFromLog.Run(context.Background())
Expand Down
Loading

0 comments on commit c9169ba

Please sign in to comment.