Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon: Optimize trade aggregation #1639

Closed
wants to merge 10 commits into from

Conversation

poliha
Copy link
Contributor

@poliha poliha commented Aug 22, 2019

This PR implements the solution to optimize the trade aggregation endpoints as discussed in #632

Copy link
Member

@ire-and-curses ire-and-curses left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! A few comments, mostly for my understanding.

newStartTime = action.EndTimeFilter.ToInt64()
}
q = prevURL.Query()
q.Set("order", "desc")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be q.Set("order", "asc")? If not, I think this needs an explanatory comment.

}
q = prevURL.Query()
q.Set("order", "asc")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be `q.Set("order", "desc")? If not, I think this needs an explanatory comment.


if action.PagingParams.Order == "asc" {
// next link
newStartTime := action.Records[len(action.Records)-1].Timestamp + action.ResolutionFilter
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably better named as nextStartTime

action.Page.Links.Next = hal.NewLink(nextURL.String())

// prev link
newStartTime = action.Records[0].Timestamp
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably better named as prevStartTime


// prev link
newStartTime = action.Records[0].Timestamp
if !action.EndTimeFilter.IsNil() && newStartTime >= action.EndTimeFilter.ToInt64() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the previous (i.e. earlier in time) page, right? Shouldn't the start time also then be constrained by action.StartTimeFilter, if one was set? That would prevent records earlier than the filtered start time from being returned.

The case where this previous page would hit the EndTimeFilter seems strange to me. That would imply that both the current and the previous page were later than the specified end time. Do we want to return empty pages, or error in this situation? (curious about your thoughts here)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, I am following the convention used to generate prev links by the API, where the order is flipped in order to get previous data for the supplied parameters. See the prev/next links here as an example https://horizon.stellar.org/ledgers?order=desc

The case where this previous page would hit the EndTimeFilter seems strange to me. That would imply that both the current and the previous page were later than the specified end time. ....

This can happen if the EndTimeFilter is set to 0. We currently return empty pages.

This was a bit confusing for me to wrap my head around as well, particularly because of the way the previous links are constructed. I opted to default to the current API format and discuss this more when we talk about changes that we want to make to the API which I believe you are currently working on.

}
return q, nil
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that nothing beyond here runs if the query was descending, but this feels error prone if any of the returns above ever change. I think this would be better as two branches of an if, ideally abstracted outside of this function.

return &TradeAggregationsQ{}, errors.Errorf("endtime(%d) is less than maximum resolution range(%d)", q.endTime, maxTimeRangeMillis)
}
adjustedTime = q.endTime - maxTimeRangeMillis
if q.startTime < adjustedTime {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This clipping of times seems similar to what was done in actions_trade. Is there a way to simplify by doing the clipping to boundaries only once?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will look into this. But iirc, the clippings in actions_trade are done when generating the next/prev links from the returned results.

return q, nil
}

if q.endTime > adjustedTime {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment about the time clipping - seems we are doing similar work again.


// SetPageLimit sets the number of records to be returned for weekly resolution queries to a maximum of
// 1 year(52 weeks). This is done to aid performance as querying for multiple years of data leads to slow queries.
func (q *TradeAggregationsQ) SetPageLimit() (*TradeAggregationsQ, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function name is very general for something that is specific to only one resolution.

@@ -0,0 +1,128 @@
package history
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was there anything that couldn't be tested? This seems like a small number of tests given the number of if blocks that were added.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are just the initial tests will be adding more.

q := newUrl.Query()
action.Page.Links.Self = hal.NewLink(action.FullURL().String())
nextURL := action.FullURL()
prevURL := action.FullURL()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you adding a prev link? It seems like a nice to have, but I don't see anything about your change that makes it more necessary than before.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, It is just a nice to have in order for it to be similar to other end points. Can remove this.

@@ -315,6 +315,13 @@ var configOpts = []*support.ConfigOption{
FlagDefault: false,
Usage: "[EXPERIMENTAL] enables accounts for signer endpoint using an alternative ingest system",
},
&support.ConfigOption{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not too excited about this option, it seems pretty specific and it's not something that admins would really know how to set properly, or what the implications are of setting it vs. not setting it.

I think the approach of limiting the maximum number of records returned in certain cases in an unusual way (a limit of 52 rather than 200) introduces unexpected behavior that may cause bugs in clients. As a client, it's reasonable to conclude that if the query has a limit of 200 set, and then fewer than 200 results come back, that there are no more results. I think a meaningful amount of client code is written in this way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This option was mainly necessitated for my tests. It was easier for me to just change the config and restart horizon rather than rebuilding and redeploying.

// timeRangeOrderAsc generates the queries used in setting the startTime and endTime when they are not provided.
// Used when the records are to be returned in ascending order.
func timeRangeOrderAsc(q *TradeAggregationsQ) (string, string) {
startTime := minTradeTime(q.baseAssetID, q.counterAssetID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the code, it seems like if the caller sets startTime but not endTime and does asc sort, this will always overwrite the startTime to be the earliest trade on this market. Shouldn't the caller's startTime be respected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would only be called when the startTime is set to zero. When a startTime is provided then it is respected.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you make a great point. From reading LimitTimeRange, it looks like it has an invariant after it's called: if order is asc and startTime is set, then endTime will always be set. That means this function will only get called when startTime is zero.

I find it difficult to understand what's happening the way this code is structured. I think I might have made the code that calls this check the invariants and make it more clear to readers what's really going on. I'd try to be more explicit about when exactly this code gets called.

// get startTime and endTime from the history_trades table
startTime, endTime := generateTimeRangeQuery(q)
bucketSQL = bucketSQL.Where(fmt.Sprintf("ledger_closed_at >= %s", startTime))
bucketSQL = bucketSQL.Where(fmt.Sprintf("ledger_closed_at < %s", endTime))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using fmt.Sprintf like this for SQL is a dangerous habit, especially because the format string is %s.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I agree, I am reviewing how to do this with the support/db package now. When I wrote this, I reviewed the github.com/Masterminds/squirrel package which was already in use in this file and this was the only way to add sub-queries.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't an example use just two lines below:

bucketSQL = bucketSQL.Where(sq.GtOrEq{"ledger_closed_at": q.startTime.ToTime()})
bucketSQL = bucketSQL.Where(sq.Lt{"ledger_closed_at": q.endTime.ToTime()})

// generateTimeRangeQuery returns the formatted sql queries for calculating the startTime and endTime
// from the history_trades table. This is used when either the startTime or endTime is 0.
func generateTimeRangeQuery(q *TradeAggregationsQ) (string, string) {
if q.pagingParams.Order == "desc" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @ire-and-curses is making a point about defensive programming. This function has an unstated invariant: q.pagingParams.Order should only be asc or desc. A defensive programming principle is that a functions should assert or otherwise check for their invariants being violated. Particularly risky is the approach this code takes, where violating the invariant (say someone later introduces a new sort order, custom) results not only in no error but also in unexpected behavior (custom will be treated as asc).

func leastTimestamp(ledgerTime strtime.Millis, resolution, offset, baseAssetID, counterAssetID, pageLimit int64) string {
adjustSeconds := ((pageLimit * resolution) + offset) / 1000
ledgerTimeInSeconds := ledgerTime.ToInt64() / 1000
return fmt.Sprintf(`(SELECT LEAST(TO_TIMESTAMP(%d) AT TIME ZONE 'UTC', TO_TIMESTAMP((extract(epoch from ledger_closed_at) + %d))AT TIME ZONE 'UTC') FROM %s as mltq)`, ledgerTimeInSeconds, adjustSeconds, minTradeTime(baseAssetID, counterAssetID))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like it ignores any startTime the caller may have passed in. Shouldn't this compute min(q.endTime, q.startTime + adjustSeconds)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used when q.startTime is zero

if q.endTime < offsetMillis {
return &TradeAggregationsQ{}, errors.Errorf("endtime(%d) is less than offset(%d)", q.endTime, offsetMillis)
}
if q.endTime < maxTimeRangeMillis {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, q.pagingParams.Limit defaults to 200 right? It should be ok to request a slice like [epoch start, epoch start + 50 hours) where q.resolution = 1 hour without specifying limit explicitly. It seems like this check disallows that.

// leastTimestamp formats a sql select query to get the lesser of the provided timestamp or the
// adjustedTimestamp. Where adjustedTimestamp is calculated as
// (minTradeTime() + ((pageLimit * resolution) + offset))
func leastTimestamp(ledgerTime strtime.Millis, resolution, offset, baseAssetID, counterAssetID, pageLimit int64) string {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Functions like this with 6 arguments, 5 of which are the same type are error-prone. It's easy to reverse the order of two of these arguments and because the args are the same type, the type checker won't detect it. Better is to make this a member of TradeAggregationsQ, which already has access to all of these parameters except possibly for ledgerTime.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. will look into it.

}
adjustedTime = q.endTime - maxTimeRangeMillis
if q.startTime < adjustedTime {
q.startTime = adjustedTime
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this changes the behavior of the trade aggregations query. In the current query, if there are gaps in the buckets (which happens frequently, when there are no trades in a bucket's time interval), then potentially many buckets with timestamp earlier than adjustedTime will be returned.

We don't explicitly promise this is the behavior, but changing it is probably going to be a breaking change for some exchanges. This is the kind of implication that should be brought up and discussed. I didn't see any mention of it in the PR, and it makes me worry that we would have merged this change in behavior silently.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My mistake was assuming that from my proposal it was clear that this changes the behaviour of the endpoint.
You are right, the changes here will only return responses for timestamp within the adjustedTime and would require users to fetch the next set of records using the next link.
I am still in the process of testing it out, but potentially this can lead to the endpoint returning empty records for some time periods. In this scenario, my suggestion will be to generate the next link with a startTime or endTime that is beyond the adjustedTime.

I think that this is a better trade-off than calculating buckets for all time and returning just a few records. Given that this is experimental, I was not expecting this to be merged right away, giving us ample time to inform users of the new changes.

// get startTime and endTime from the history_trades table
startTime, endTime := generateTimeRangeQuery(q)
bucketSQL = bucketSQL.Where(fmt.Sprintf("ledger_closed_at >= %s", startTime))
bucketSQL = bucketSQL.Where(fmt.Sprintf("ledger_closed_at < %s", endTime))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't an example use just two lines below:

bucketSQL = bucketSQL.Where(sq.GtOrEq{"ledger_closed_at": q.startTime.ToTime()})
bucketSQL = bucketSQL.Where(sq.Lt{"ledger_closed_at": q.endTime.ToTime()})

func greatestTimestamp(ledgerTime strtime.Millis, resolution, offset, baseAssetID, counterAssetID, pageLimit int64) string {
adjustSeconds := ((pageLimit * resolution) + offset) / 1000
ledgerTimeInSeconds := ledgerTime.ToInt64() / 1000
return fmt.Sprintf(`(SELECT GREATEST(TO_TIMESTAMP(%d) AT TIME ZONE 'UTC', TO_TIMESTAMP((extract(epoch from ledger_closed_at) - %d))AT TIME ZONE 'UTC') FROM %s as mltq)`, ledgerTimeInSeconds, adjustSeconds, maxTradeTime(baseAssetID, counterAssetID))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this becomes a very complicated query. If I understand your comment correctly we need to bound the time queries in the subquery. Maybe instead of adding more things to a single query we could split this into multiple queries execute inside the trade aggregations function in db package. So the algorithm will be something like this:

  1. If start_time not provided find the first trade for the given market (let's call it query FT - first trade).
  2. If end_time not provided then: end_time = start_time + limit * bucket size. If end_time provided make sure previous value does not exceed.
  3. Insert values to the query: ledger_closed_at >= 'start_time' AND ledger_closed_at >= 'end_time'.

For order=desc just flip the values (if not set: start_time = now and end_time = start_time - limit * bucket size). That way you don't need to do a lot of query concatenation. Please let me know if I'm missing anything here.

If my thinking is wrong could you add a very short summary of changes in this PR that could help me better understand it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay splitting might bring more clarity. Hmm I think if we flip the values for order=desc we might end up with data outside the specified range with the way the query is currently setup. Your suggestion gives me an idea to rewrite the query though.

Here is a summary of what this PR does to startTime and endTime values
calculate the maxTimeRange = (limit * resolution) + offset

if startTime is zero

  • startTime = first trade for the asset pair

if endTime is zero

  • endTime = last trade for the asset pair

If order is asc

  • startTime is not changed
  • adjustedTime = startTime + maxTimeRange
  • endTime = min(endTime, adjustedTime)

If order is desc

  • endTime is not changed
  • adjustedTime = endTime - maxTimeRange
  • startTime = max(startTime, adjustedTime)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @poliha this is much more clear now. One question:

if endTime is zero

  • endTime = last trade for the asset pair

I'm wondering if in such case endTime should be equal last ledger close time. I think clients code will be much simpler this way. Let's say that I want to display last day/week/month data in a chart. If there are were no trades recently, I'd need to manually check this and add recent data (with 0s) and remove older data (outside a time range) from the response.

Your suggestion gives me an idea to rewrite the query though.

Add a comment when your changes are implemented.

@ire-and-curses
Copy link
Member

@poliha I'm going to close this PR for now. It will be referenced when we return to this work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants