Skip to content
This repository has been archived by the owner on May 27, 2024. It is now read-only.

feat: append OTP info to output #12

Merged
merged 39 commits into from
Feb 9, 2022
Merged

feat: append OTP info to output #12

merged 39 commits into from
Feb 9, 2022

Conversation

BumbleFlash
Copy link
Collaborator

@BumbleFlash BumbleFlash commented Apr 20, 2021

PR to append the output to the Chicago ride-hailing data, discussed in #11. (WIP)

TODO:

  • Review if TncToGtfsWriter should be removed (are we going to use it as-is). Update documentation either way to make it clearer what the parameters do

@BumbleFlash BumbleFlash requested a review from barbeau April 20, 2021 22:49
@BumbleFlash
Copy link
Collaborator Author

@barbeau This is a working sample using Kotlin flow that converts the callbacks to suspending functions. I'm still figuring out the way to make batches of requests and zip them. In all the examples here, they create n variables for n requests and then zip them at the end.

Also, the CI is gonna fail because the library is only in mavenLocal

@BumbleFlash
Copy link
Collaborator Author

BumbleFlash commented Apr 21, 2021

Update: @barbeau I pushed a change that is still synchronous but it's way faster than it used to be. I'm transforming the Chicago list to flow using the inbuild filter and collection to remove location values and perform callbacks on the IO thread.

Here's how I measured the efficiency;-
I measured elapsed time between the start and end of the API callback using the function getPlanData() in the first commit 760dce9 and the makePlanFlow function in the second commit and noted the results.
You'll see that converting the list to flow is more consistent and faster than looping it ourselves. Also, I'm calling the API in the Dispatcher.IO context, and however, the printing still uses the main thread.

getPlanData() makePlanFlow ()
Request 2 23ms 25ms
Request 3 45ms 30ms
Request 4 20ms 27ms
Request 5 89ms 16ms
Request 6 76ms 20ms
Request 7 30ms 23ms
Request 8 57ms 19ms
Request 9 21ms 29ms
Request 10 29ms 21ms
Average 43.33ms 23.33ms

pom.xml Outdated Show resolved Hide resolved
@barbeau
Copy link
Member

barbeau commented Apr 21, 2021

Nice! Could you add an average value at the bottom of each column? That should be easier to see the differences.

@barbeau
Copy link
Member

barbeau commented Apr 21, 2021

Also, typically you'd throw out the first value because of JIT - it will always be much larger than the rest because some of the code get's compiled on the fly.

@BumbleFlash
Copy link
Collaborator Author

@barbeau Apologies, just realized I was feeding two longitude values but it looks like it doesn't affect the processing times although the print takes longer since they actually have data to write.

@BumbleFlash
Copy link
Collaborator Author

@barbeau I'm having trouble emitting multiple values and collecting as one since this is what we're trying to do if the order of the data doesn't matter. However, the Kotlin flow manages to work in sync even if I emit the value from a different coroutine scope and collect it from the main. It's like it won't proceed to the next emit() before collect()

@barbeau
Copy link
Member

barbeau commented Apr 22, 2021

However, the Kotlin flow manages to work in sync even if I emit the value from a different coroutine scope and collect it from the main. It's like it won't proceed to the next emit() before collect()

If I'm understanding correctly, I think that's expected behavior - emits() only execute when they are collected. So all the emits() before the first collect() should execute in parallel on the call to the first collect(). But after that, if you try to emit() again, it won't execute until that flow is collected.

For producer/consumer problem, have you looked at Channels?

https://kotlinlang.org/docs/channels.html#building-channel-producers

To step back a minute, I think we have three things we're trying to accomplish for out-of-order execution:

  1. Parallel execution - All OTP requests and writes to CSV shouldn't block each other (unless required on actual CSV file access to ensure file integrity)
  2. Batching execution - We could have X producers (OTP requests) and Y consumers (CSV writes) in parallel

The flow solution at https://stackoverflow.com/questions/60551996/wait-for-result-from-multiple-callbacks-lambdas-in-kotlin/60556171#60556171 is close, but if you're emitting 10 requests and then collecting them, I believe you'll need to wait for all 10 to finish before batching the next 10 (unless you have more than one flow active at a time).

To keep this simple and make some progress towards a working solution, let's just implement the above flow solution first that blocks until the X requests finish, and then we can work on a parallel producer/consumer implementation in another PR.

@BumbleFlash
Copy link
Collaborator Author

BumbleFlash commented Apr 22, 2021

@barbeau Thanks for the explanation. To clarify, should we try to emit multiple calls and collect the response whenever it gets one, irrespective of the order?

However, I'm having trouble dynamically creating multiple flows and combining them as one to collect them later on the thread.

@barbeau
Copy link
Member

barbeau commented Apr 22, 2021

Ok - can you push that code (and comment out or remove other unused code for now) and I can take a look?

@BumbleFlash
Copy link
Collaborator Author

@barbeau I figured out the solution to this problem finally. It was right in front of me all along. I'm converting the list to a flow and then I'm giving another flow to the flatMapMerge function which calls the API. The flatMapMerge sequentially executes all the input and collects all the flow concurrently. Guess the solution was a combination of the getPlanData() and makePlanFlow().

 runBlocking {
            chicagoTncData
                .asFlow()
                .filter {
                    isValidLocation(
                        it.pickupCentroidLatitude,
                        it.pickupCentroidLongitude,
                        it.dropoffCentroidLatitude,
                        it.dropoffCentroidLongitude
                    )
                }
                .flatMapMerge(concurrency = 10) {
                    flow {
                        val origin = GtfsUtils.latLong(
                            it.pickupCentroidLatitude,
                            it.pickupCentroidLongitude
                        )
                        val destination = GtfsUtils.latLong(
                            it.dropoffCentroidLatitude,
                            it.dropoffCentroidLongitude
                        )

                        val requestParameters = RequestParameters(fromPlace = origin, toPlace = destination)
                        val planApi = PlanApi(url, requestParameters)
                        emit(makePlanRequest(planApi, chicagoTncData.indexOf(it)))
                    }
                }
                .collect {
                    println(it.additionalProperties)
                }
        }

The concurrency parameter is the maximum number of requests to be executed in parallel. We should able to make it dynamic by passing it in the function signature.

I tested the concurrency by appending the index of the data to the additional properties map in the Planner model and the collection was not in order of the data. So YAY!

@BumbleFlash BumbleFlash marked this pull request as ready for review April 22, 2021 21:57
@BumbleFlash BumbleFlash requested a review from barbeau April 22, 2021 21:57
@BumbleFlash
Copy link
Collaborator Author

@barbeau I have updated the ChicagoTncData model and added TODOS to the collect block on how to fill in the output information. Should we merge this PR and open another for the output file?

@@ -38,4 +39,11 @@ data class ChicagoTncData
@Parsed val pickupCentroidLongitude: Double = 0.0,
@Parsed val dropoffCentroidLatitude: Double = 0.0,
@Parsed val dropoffCentroidLongitude: Double = 0.0,
@Parsed var totalTravelTime: Int? = 0,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we want these metrics for the top 3 trip plan results, IIRC? So for this to output cleanly to CSV I think we'd need to label these "...1", and then have "...2", and "...3" for all the fields.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh - I get it. What if there are less than 3 ways to reach a destination? I guess they'll hold the default values then.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick clarification, should the top 3 be as-is or should we sort by travel time?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick clarification, should the top 3 be as-is or should we sort by travel time?

I would leave them sorted as-returned by OTP, because they are ordered by OTP's preference.

We should probably include somehow the priority preference used in the OTP requests, maybe as new model field? QUICK, SAFEST, etc. This obviously impacts the sorting.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, a common model field for all the 3 trip plan results that shows what the OptimizeType was.

@barbeau
Copy link
Member

barbeau commented Apr 25, 2021

Should we merge this PR and open another for the output file?

I'd say let's do it all in this PR. We probably won't know for sure that the model objects are correct until we actually output it.

@BumbleFlash BumbleFlash requested a review from barbeau April 30, 2021 23:02
@barbeau
Copy link
Member

barbeau commented Feb 9, 2022

CI is failing because CUTR-at-USF/opentripplanner-client-library#14 hasn't been resolved (that library artifact hasn't been published yet), but I'm going to merge anyway as this is a large PR and the main feature of this application. I'll open an issue separately for getting CI working again.

@barbeau barbeau merged commit 3b8ec66 into main Feb 9, 2022
@barbeau barbeau deleted the otp branch February 9, 2022 20:46
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants