Skip to content

Commit

Permalink
Merge pull request #182 from microsoft/chgeuer/diagnostics
Browse files Browse the repository at this point in the history
More diagnostics tooling
  • Loading branch information
chgeuer authored Sep 14, 2023
2 parents 2423139 + ed3e804 commit 98a32a1
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 19 deletions.
19 changes: 19 additions & 0 deletions src/Metering.BaseTypes/MeteringUpdateEvent.fs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,25 @@ type MeteringUpdateEvent =
| RemoveUnprocessedMessages _ -> ""
| Ping x -> x.PartitionID.value

member this.MarketplaceResourceID
with get() : MarketplaceResourceId option =
match this with
| SubscriptionPurchased x -> x.Subscription.MarketplaceResourceId |> Some
| SubscriptionDeletion x -> x |> Some
| UsageReported x -> x.MarketplaceResourceId |> Some
| UsageSubmittedToAPI x ->
match x.Result with
| Ok x -> x.RequestData.MarketplaceResourceId |> Some
| Error e ->
match e with
| DuplicateSubmission d -> d.FailedRequest.MarketplaceResourceId |> Some
| ResourceNotFound d -> d.RequestData.MarketplaceResourceId |> Some
| Expired d -> d.RequestData.MarketplaceResourceId |> Some
| Generic d -> d.RequestData.MarketplaceResourceId |> Some
| UnprocessableMessage _ -> None
| RemoveUnprocessedMessages _ -> None
| Ping x -> None

override this.ToString() =
match this with
| SubscriptionPurchased x -> x.ToString()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ open System.Text.RegularExpressions
// and stores the events and the state after applying a certain event, as JSON files in the file system.
//

let readAllEvents<'TEvent> (directory: string) (blobPrefix: string) (convert: EventData -> 'TEvent) (partitionId: PartitionID) (cancellationToken: CancellationToken) : IEnumerable<EventHubEvent<'TEvent>> =
let readAllEvents<'TEvent> (captureDirectory: string) (blobPrefix: string) (convert: EventData -> 'TEvent) (partitionId: PartitionID) (cancellationToken: CancellationToken) : IEnumerable<EventHubEvent<'TEvent>> =
seq {
// p0--2023-08-22--19-57-36.avro
let extractTime (captureFileNameFormat: string) (partitionId: PartitionID) (blobName: string) : MeteringDateTime option =
Expand Down Expand Up @@ -45,7 +45,7 @@ let readAllEvents<'TEvent> (directory: string) (blobPrefix: string) (convert: Ev
let time = extractTime "p{PartitionId}--{Year}-{Month}-{Day}--{Hour}-{Minute}-{Second}" partitionId

let files =
Directory.GetFiles(path = directory, searchPattern = "*.avro") // all avro files
Directory.GetFiles(path = captureDirectory, searchPattern = "*.avro") // all avro files
|> Array.map (fun filename -> (filename, time filename)) // extract time from filename
|> Array.choose (fun (filename, time) -> time |> Option.map (fun t -> (filename, t))) // filter out files without time
|> Seq.sortBy (fun (_, time) -> time.LocalDateTime) // sort by time
Expand All @@ -61,34 +61,137 @@ let readAllEvents<'TEvent> (directory: string) (blobPrefix: string) (convert: Ev
|> Seq.map (fun e -> EventHubEvent.createFromCapture (convert e) (RehydratedFromCaptureEventData.getMessagePosition e) None blobName )
}

let recreateLatestState directory partitionId stateFile =
let recreateLatestState captureDirectory partitionId stateFile =
File.WriteAllText(
path = stateFile,
contents =
(readAllEvents<MeteringUpdateEvent>
directory
captureDirectory
"whatever"
CaptureProcessor.toMeteringUpdateEvent
partitionId
CancellationToken.None
|> Seq.scan
MeterCollectionLogic.handleMeteringEvent
MeterCollection.Empty
//|> Seq.map (fun state ->
// match state.LastUpdate with
// | None -> ()
// | Some mu ->
// if (int mu.SequenceNumber) % 100 = 0
// then printfn "%d" (mu.SequenceNumber)
// else ()
// state
//)
|> Seq.map (fun state ->
match state.LastUpdate with
| None -> ()
| Some mu ->
if (int mu.SequenceNumber) % 1000 = 0
then printf "%d " (mu.SequenceNumber)
else ()
state
)
|> Seq.last
|> Json.toStr 2))
|> Json.toStr 0))

// Processing partion 0 consumed 1.0 GB, partition 1 consumed 3.5 GB, and partition 2 consumed 1.7 GB of memory
let directory = @"..\..\..\..\..\..\testcaptures"
let partitionId = PartitionID.create "0"
let stateFile = $"..\..\..\..\..\..\state{ partitionId.value }.json"
let expandCaptureToEventsInIndividualJsonFiles captureDirectory partitionId eventDirectory =
if Directory.Exists(eventDirectory)
then
Directory.CreateDirectory(path = eventDirectory) |> ignore

recreateLatestState directory partitionId stateFile
readAllEvents<MeteringUpdateEvent>
captureDirectory
"whatever"
CaptureProcessor.toMeteringUpdateEvent
partitionId
CancellationToken.None
|> Seq.filter (fun e ->
match e.EventData with
| UsageReported _ -> false
| Ping _ -> false
| _ -> true)
|> Seq.iter(fun e ->
let filename = $"{eventDirectory}\\p{e.MessagePosition.PartitionID.value}--{e.MessagePosition.SequenceNumber}--{e.MessagePosition.PartitionTimestamp |> MeteringDateTime.blobName}--{e.EventData.MessageType}.json"
let content = e.EventData |> Json.toStr 2

File.WriteAllText(
path = filename,
contents = content)
)

let printEventsWithoutPartitionKey captureDirectory partitionId =
readAllEvents<MeteringUpdateEvent>
captureDirectory
"whatever"
CaptureProcessor.toMeteringUpdateEvent
partitionId
CancellationToken.None
|> Seq.filter (fun e -> e.MessagePosition.PartitionID.value = null)
|> Seq.iter(fun e ->
printf "%d " e.MessagePosition.SequenceNumber
)

let oneOrMoreEventsHaveAPartitionID captureDirectory partitionId : bool =
readAllEvents<MeteringUpdateEvent>
captureDirectory
"whatever"
CaptureProcessor.toMeteringUpdateEvent
partitionId
CancellationToken.None
|> Seq.exists (fun e -> e.MessagePosition.PartitionID.value <> null)

let numberOfMessages captureDirectory partitionId =
readAllEvents<MeteringUpdateEvent>
captureDirectory
"whatever"
CaptureProcessor.toMeteringUpdateEvent
partitionId
CancellationToken.None
|> Seq.length

let marketplaceResourceIDs captureDirectory partitionId =
readAllEvents<MeteringUpdateEvent>
captureDirectory
"whatever"
CaptureProcessor.toMeteringUpdateEvent
partitionId
CancellationToken.None
|> Seq.choose (fun e -> e.EventData.MarketplaceResourceID)
|> Seq.distinct
|> Seq.sort

let numberOfUnprocessedMessagesInStateFile stateFile =
let state = File.ReadAllText(stateFile) |> Json.fromStr<MeterCollection>
state.UnprocessableMessages |> List.length

let numberOfPartitions = 3
let captureDirectory = @"..\..\..\..\..\..\testcaptures"

[ 0 .. numberOfPartitions - 1 ]
|> Seq.map(sprintf "%d")
|> Seq.map(PartitionID.create)
|> Seq.iter(fun partitionId ->
// Processing partion 0 consumed 1.0 GB, partition 1 consumed 3.5 GB, and partition 2 consumed 1.7 GB of memory

//// Start with an empty state and apply all events, to get the latest state
let stateFile = $"{captureDirectory}\state\state-p{ partitionId.value }.json"

// recreateLatestState captureDirectory partitionId stateFile

// printfn "Statefile %s has %d unprocessed messages" stateFile (numberOfUnprocessedMessagesInStateFile stateFile)

//// Expand the capture to individual JSON files, one for each event
//expandCaptureToEventsInIndividualJsonFiles
// captureDirectory
// partitionId
// $"{captureDirectory}\events\{partitionId.value}"

//// Print the sequence numbers of all events that do not have a partition key
//printEventsWithoutPartitionKey
// captureDirectory
// partitionId

//// Check if there are any events that do not have a partition key
//if oneOrMoreEventsHaveAPartitionID captureDirectory partitionId
//then printfn $"Partition {partitionId.value} has some events that *DO* have a partition key"
//else printfn $"No event in partition {partitionId.value} has a partition key"


// printfn $"Partition {partitionId.value} has {numberOfMessages captureDirectory partitionId} messages"

// Print all marketplace resource IDs by partition
marketplaceResourceIDs captureDirectory partitionId
|> Seq.iter (fun id -> printfn $"Partition {partitionId.value} has marketplace resource ID {id.ToString()}" )
)

0 comments on commit 98a32a1

Please sign in to comment.