Skip to content
This repository has been archived by the owner on Feb 8, 2018. It is now read-only.

backfill the status, route, and ref columns of the exchanges table #2779

Closed
chadwhitacre opened this issue Sep 23, 2014 · 67 comments
Closed

Comments

@chadwhitacre
Copy link
Contributor

Right now the exchanges.status column has NULL values. It would be great to constrain that column to non-NULL, but it would take some effort with the Balanced API to determine the proper value for previous exchanges. IRC

Want to back this issue? Post a bounty on it! We accept bounties via Bountysource.

@Changaco
Copy link
Contributor

Related to #2443 and #364.

@rohitpaulk
Copy link
Contributor

There are 37557 exchanges with NULL status values. Do all of these necessarily have to be present in Balanced? Could they also be old exchanges from Stripe or so?

@rohitpaulk
Copy link
Contributor

I've also noticed a few payouts with a status of 'pending' that have modified one's balance.

EDIT: My bad, I didn't realize that we deducted balance for payouts with a status of pre/pending

@Changaco Changaco changed the title backfill exchanges.status column backfill the status and route columns of the exchanges table Mar 25, 2015
@Changaco Changaco added this to the Balanced shutdown milestone Mar 25, 2015
@Changaco
Copy link
Contributor

Added to the Balanced shutdown milestone.

@chadwhitacre
Copy link
Contributor Author

We have a user getting bitten by #3302, which we decided to fix with this ticket.

https://gratipay.freshdesk.com/helpdesk/tickets/1867

@chadwhitacre
Copy link
Contributor Author

I'm working on manually fixing this for the 1867 case, and am blocking payday (#3307) on it.

@chadwhitacre
Copy link
Contributor Author

IRC

@chadwhitacre
Copy link
Contributor Author

@chadwhitacre
Copy link
Contributor Author

We are expecting one of the old deposits to fail, at which time Balanced will post back so we can update exchanges.status. We'll need to insert the old exchange_route before we can connect the exchange to it. For now I'm only worrying about the most recent failure.

@chadwhitacre
Copy link
Contributor Author

I set exchanges.route for the exchange in question and clicked "Replay" on the Balanced dashboard:

https://dashboard.balancedpayments.com/#/marketplaces/MP12Xw5lL6iaILtqImIoroDL/events/EVfe85f7b4d70211e4b84a0663eb3e293e

It's still in the "retrying" state, and I'm not seeing the status updated on the user's History page.

@chadwhitacre
Copy link
Contributor Author

Here's the error in Sentry (cf. #3302):

https://app.getsentry.com/gratipay/gratipay-com/group/60437761/

@chadwhitacre
Copy link
Contributor Author

I'm not getting good visibility into the status of the webhook postback. From Sentry it looks like we haven't generated any errors since I hit "Replay," but Sentry samples (right? how?), so not every request is recorded. If the postback works then there also won't be a Sentry entry. The absence of a Sentry entry either means it worked or it didn't but Sentry isn't showing us the specific failure instance.

All we get in the Balanced dashboard is this:

screen shot 2015-04-02 at 5 08 53 pm

@chadwhitacre
Copy link
Contributor Author

Using the Chrome inspector I've found the json endpoint for callbacks, which includes this:

      "attempts_remaining": 9,
      "attempts": 1,
      "attempt_limit": 10,

@chadwhitacre
Copy link
Contributor Author

the callback will be retried up to 10 times with an initial retry delay of 10 minutes which will increase exponentially with each failure for the next 7 days.

https://docs.balancedpayments.com/1.1/api/callbacks/

@chadwhitacre
Copy link
Contributor Author

Weeeeeeeelllllll, I clicked Replay more than 10 minutes ago, and it's still reporting as retrying with 9 attempts left.

@chadwhitacre
Copy link
Contributor Author

So I could update the user's balance manually, but I don't see that I can cancel the callback. :-(

@chadwhitacre
Copy link
Contributor Author

If I run record_exchange_result manually and then start payday, and the callback succeeds while payday is running, then we will send the user twice as much money as we should. That'd be a mess to clean up.

@chadwhitacre
Copy link
Contributor Author

But at this point we're down to the wire on payday.

@chadwhitacre
Copy link
Contributor Author

"raise TooMany" should go to zero with #3806, and we'll handle "No route for" by creating routes on the fly while looping.

@chadwhitacre
Copy link
Contributor Author

#!/usr/bin/env python -u
from __future__ import absolute_import, division, print_function, unicode_literals

import json, os, sys, traceback
from gratipay import wireup
from decimal import Decimal as D
from pprint import pprint

db = wireup.db(wireup.env())
dirname = os.path.join(os.path.dirname(__file__), 'transactions')
i = 0
I = float('inf')
class Done(Exception): pass

class ExchangeMismatch(Exception):
    def __str__(self):
        return "{} {} {}\n\n{}".format(*self.args)
class NoExchange(ExchangeMismatch): pass
class AmbiguousExchange(ExchangeMismatch): pass


def process_thing(cur, thing):
    print('{} -> '.format(thing['description']), end='')

    participants = cur.all( "SELECT username FROM participants "
                            "WHERE balanced_customer_href='/customers/'||%s"
                          , (thing['links']['customer'],)
                           )
    print('{} | '.format(', '.join(participants)), end='')

    route = cur.one( "SELECT * FROM exchange_routes WHERE address='/cards/'||%s"
                   , (thing['links']['source'],)
                    )
    if route is None:
        if not participants:
            print("No route for {}.".format(thing['links']['source']))
        else:
            print("No route for {}.".format(thing['links']['source']))
        return

    # Use the username that is current as of this db transaction.
    username = cur.one( "SELECT username FROM participants WHERE id=%s"
                      , (route.participant,)
                       )

    params = dict( status   = thing['status']
                 , amount   = D(thing['amount']) / 100
                 , ts       = thing['created_at']
                 , route_id = route.id
                 , username = username
                  )

    nalready = cur.one("""\
        SELECT count(*) FROM exchanges
         WHERE status = %(status)s
           AND route = %(route)s
           AND participant = %(username)s
           AND amount + fee = %(amount)s
           AND ((   (%(ts)s::timestamptz - "timestamp") < interval '0'
                AND (%(ts)s::timestamptz - "timestamp") > interval '-15m'
                ) OR (
                    (%(ts)s::timestamptz - "timestamp") > interval '0'
                AND (%(ts)s::timestamptz - "timestamp") < interval '15m'
                ))
    """, dict(params, route=route.id))
    if nalready == 1:
        print("Already linked!")
        return

    mogrified = cur.mogrify("""\
    WITH rows AS (
        UPDATE exchanges
           SET status = %(status)s
             , route = %(route_id)s
         WHERE (status is NULL or status = %(status)s)
           AND route is NULL
           AND participant = %(username)s
           AND amount + fee = %(amount)s
           AND ((   (%(ts)s::timestamptz - "timestamp") < interval '0'
                AND (%(ts)s::timestamptz - "timestamp") > interval '-15m'
                ) OR (
                    (%(ts)s::timestamptz - "timestamp") > interval '0'
                AND (%(ts)s::timestamptz - "timestamp") < interval '15m'
                ))
     RETURNING 1 AS count
    ) SELECT * FROM rows;
    """, params)
    print("Updating exchanges for {}: ".format(username), end='')
    nupdated = cur.one(mogrified, default=0)
    Error = None
    if nupdated == 0:
        Error = NoExchange
    elif nupdated > 1:
        Error = AmbiguousExchange
    if Error:
        raise Error(params['username'], params['amount'], params['ts'], mogrified)
    print("{}.".format(nupdated))


with db.get_cursor() as cur:
    try:
        for filename in reversed(sorted(os.listdir(dirname))):
            data = json.load(open(os.path.join(dirname, filename)))

            for key in data:
                if key in ('links', 'meta'): continue
                things = reversed(data[key])

                if key == 'card_holds':
                    continue  # We don't track these.

                if key != 'debits':
                    continue  # Let's start with credit card charges.

                for thing in things:
                    print("{} | ".format(thing['created_at']), end='')

                    try:
                        process_thing(cur, thing)
                    except KeyboardInterrupt:
                        raise
                    except:
                        print("Exception! Thing:")
                        pprint(thing)
                        print('-'*78)
                        traceback.print_exc(file=sys.stdout)
                        print('='*78)

                    if i > I: raise Done()
                    i += 1
    except Done:
        pass
    print()
    raise Exception  # trigger rollback

@chadwhitacre
Copy link
Contributor Author

Once this is done we can drop column exchanges.participant, since we'll have a reference through exchange_routes. That'll knock one out for #835.

@chadwhitacre
Copy link
Contributor Author

Alright, so after some poking on #3806, I'm now thinking that we should pick one of the duplicates rather than deduplicating.

@chadwhitacre
Copy link
Contributor Author

Working on this at Blue Canary with @kaguillera. Caught him up to speed, and now diving back in. We just discovered that later transactions in Balanced have an exchange_id and a participant_id. Looks like it starts around August, 2014. Let's see how much ambiguity that removes ...

@chadwhitacre
Copy link
Contributor Author

We just verified that every transaction with a exchange_id also has a participant_id.

@chadwhitacre
Copy link
Contributor Author

GitHub comments as version control. 🐱

#!/usr/bin/env python -u
from __future__ import absolute_import, division, print_function, unicode_literals

import json, os, sys, traceback
from gratipay import wireup
from decimal import Decimal as D
from pprint import pprint
from gratipay.models.exchange_route import ExchangeRoute
from gratipay.models.participant import Participant

db = wireup.db(wireup.env())
dirname = os.path.join(os.path.dirname(__file__), 'transactions')
i = 0
I = float('inf')
class Done(Exception): pass

class ExchangeMismatch(Exception):
    def __str__(self):
        return "{} {} {}\n\n{}".format(*self.args)
class NoExchange(ExchangeMismatch): pass
class AmbiguousExchange(ExchangeMismatch): pass


def get_participant(cur, customer_id, old_username):
    participant = cur.one("SELECT participants.*::participants FROM participants "
                          "WHERE balanced_customer_href='/customers/'||%s", (customer_id,))
    if participant is None:
        participant = cur.one("SELECT participants.*::participants FROM participants "
                              "WHERE username=%s", (old_username,))
        #XXX Check for username changes!
    if participant is None:
        raise Exception("Unable to get a participant.")
    return participant


def create_route(cur, thing):
    participant = get_participant(cur, thing['links']['customer'], thing['description'])
    route = ExchangeRoute.insert(participant, 'balanced-cc', '/cards/'+thing['links']['source'])
    return route


def pick_route(cur, routes, thing):
    import pdb; pdb.set_trace()
    route = None
    return route


def process_thing(cur, thing):
    print(thing['description'], end='')
    route, participant = None, None

    # Try to pick a route.
    routes = cur.all( "SELECT * FROM exchange_routes "
                      "WHERE network='balanced-cc' and address='/cards/'||%s"
                    , (thing['links']['source'],)
                     )
    if not routes:
        print("No route for {}.".format(thing['links']['source']), end='')
        route = create_route(cur, thing)
    elif len(routes) > 1:
        print("{} routes for {}.".format(len(routes), thing['links']['source'], end=''))
        route = pick_route(cur, routes, thing)
    else:
        route = routes[0]

    # Ack.
    if type(route.participant) is long:
        route.set_attributes(participant=Participant.from_id(route.participant))

    # Bail if we don't have a route.
    if not route:
        print()
        return

    # Use the username that is current as of this db transaction.
    username = cur.one( "SELECT username FROM participants WHERE id=%s"
                      , (route.participant,)
                       )

    params = dict( status   = thing['status']
                 , amount   = D(thing['amount']) / 100
                 , ts       = thing['created_at']
                 , route_id = route.id
                 , username = username
                  )

    nalready = cur.one("""\
        SELECT id FROM exchanges
         WHERE status = %(status)s
           AND route = %(route)s
           AND participant = %(username)s
           AND amount + fee = %(amount)s
           AND ((   (%(ts)s::timestamptz - "timestamp") < interval '0'
                AND (%(ts)s::timestamptz - "timestamp") > interval '-15m'
                ) OR (
                    (%(ts)s::timestamptz - "timestamp") > interval '0'
                AND (%(ts)s::timestamptz - "timestamp") < interval '15m'
                ))
    """, dict(params, route=route.id))
    if nalready == 1:
        print("Already linked!")
        return

    mogrified = cur.mogrify("""\
    WITH rows AS (
        UPDATE exchanges
           SET status = %(status)s
             , route = %(route_id)s
         WHERE (status is NULL or status = %(status)s)
           AND route is NULL
           AND participant = %(username)s
           AND amount + fee = %(amount)s
           AND ((   (%(ts)s::timestamptz - "timestamp") < interval '0'
                AND (%(ts)s::timestamptz - "timestamp") > interval '-15m'
                ) OR (
                    (%(ts)s::timestamptz - "timestamp") > interval '0'
                AND (%(ts)s::timestamptz - "timestamp") < interval '15m'
                ))
     RETURNING 1 AS count
    ) SELECT * FROM rows;
    """, params)
    print("Updating exchanges for {}: ".format(username), end='')
    nupdated = cur.one(mogrified, default=0)
    Error = None
    if nupdated == 0:
        Error = NoExchange
    elif nupdated > 1:
        Error = AmbiguousExchange
    if Error:
        raise Error(params['username'], params['amount'], params['ts'], mogrified)
    print("{}.".format(nupdated))


with db.get_cursor() as cur:
    try:
        for filename in reversed(sorted(os.listdir(dirname))):
            data = json.load(open(os.path.join(dirname, filename)))

            for key in data:
                if key in ('links', 'meta'): continue
                things = reversed(data[key])

                if key == 'card_holds':
                    continue  # We don't track these.

                if key != 'debits':
                    continue  # Let's start with credit card charges.

                for thing in things:
                    print("{} | ".format(thing['created_at']), end='')

                    try:
                        process_thing(cur, thing)
                    except KeyboardInterrupt:
                        raise
                    except:
                        print("Exception! Thing:")
                        pprint(thing)
                        print('-'*78)
                        traceback.print_exc(file=sys.stdout)
                        print('='*78)

                    if i > I: raise Done()
                    i += 1
    except Done:
        pass
    print()
    raise Exception  # trigger rollback

@rohitpaulk
Copy link
Contributor

@chadwhitacre
Copy link
Contributor Author

Yeah, yeah. :P

@chadwhitacre
Copy link
Contributor Author

@rohitpaulk et al. Moving to a PR, #3807.

chadwhitacre added a commit that referenced this issue Oct 16, 2015
Transferwise is needed for #3828.
Both are needed for #2779.
@chadwhitacre chadwhitacre changed the title backfill the status and route columns of the exchanges table backfill the status, route, and ref columns of the exchanges table Oct 21, 2015
@chadwhitacre
Copy link
Contributor Author

Got this far enough for the Balanced Shutdown, taking off that milestone now.

@chadwhitacre chadwhitacre removed this from the Balanced Shutdown milestone Oct 21, 2015
@chadwhitacre
Copy link
Contributor Author

Done for the three TransferWise exchanges to date.

@chadwhitacre
Copy link
Contributor Author

Moving to a new PR for the remaining exchanges: #3912.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants