-
Notifications
You must be signed in to change notification settings - Fork 1
/
transaction_database.py
130 lines (108 loc) · 4.82 KB
/
transaction_database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import postgres_connection
import json
# note: query must be compatible with find_transaction_details_by_sig
# for tx-error list we want only transactions with tx_slot entry
# for tx search we want both tx_slot and tx_infos
def run_query(transaction_row_limit=50, filter_txsig=None):
maprows = postgres_connection.query(
"""
SELECT * FROM (
SELECT
signature,
txi.transaction_id,
( SELECT count(distinct slot) FROM banking_stage_results_2.transaction_slot WHERE transaction_id=tx_slot.transaction_id ) AS num_relative_slots,
(
SELECT ARRAY_AGG(json_build_object('slot', tx_slot.slot, 'error', err.error_text, 'count', count)::text)
FROM banking_stage_results_2.errors err
WHERE err.error_code=tx_slot.error_code
) AS all_errors,
tx_slot.utc_timestamp,
-- optional fields from transaction_infos
( txi is not null ) AS was_included_in_block,
txi.cu_requested,
txi.prioritization_fees
FROM banking_stage_results_2.transactions txs
-- INNER JOIN / LEFT JOIN
{join_tx_slot} banking_stage_results_2.transaction_slot tx_slot USING (transaction_id)
LEFT JOIN banking_stage_results_2.transaction_infos txi USING (transaction_id)
WHERE true
AND (%s or signature = %s AND (tx_slot IS NOT NULL OR txi IS NOT NULL))
) AS data
-- transaction_id is required as tie breaker
ORDER BY utc_timestamp DESC, transaction_id DESC
LIMIT %s
"""
# this approach is a bit ugly, but the performance of LEFT JOIN + WHERE not null is terrible
.format(join_tx_slot = "INNER JOIN" if filter_txsig is None else "LEFT JOIN"),
[
filter_txsig is None, filter_txsig,
transaction_row_limit,
])
for index, row in enumerate(maprows):
row['pos'] = index + 1
map_jsons_in_row(row)
return maprows
def query_transactions_by_address(account_key: str, transaction_row_limit=100):
maprows = postgres_connection.query(
"""
WITH tx_slot_data AS (
SELECT
transaction_id,
min(slot) AS first_slot,
min(utc_timestamp) AS min_utc_timestamp
FROM banking_stage_results_2.transaction_slot tx_slot
GROUP BY transaction_id
)
SELECT
amt_txs.transaction_id,
sort_nr,
signature,
( txi is not null ) AS was_included_in_block,
txi.cu_requested,
txi.prioritization_fees,
tx_slot_data.min_utc_timestamp AS utc_timestamp
FROM banking_stage_results_2.accounts_map_transaction_latest amt_latest
-- amt.tx_ids is an array of transaction_ids limited to 1000 (see sidecar LIMIT_LATEST_TXS_PER_ACCOUNT)
INNER JOIN unnest(amt_latest.tx_ids) WITH ORDINALITY AS amt_txs(transaction_id, sort_nr) ON true
INNER JOIN banking_stage_results_2.accounts acc ON acc.acc_id=amt_latest.acc_id
INNER JOIN banking_stage_results_2.transactions txs ON txs.transaction_id=amt_txs.transaction_id
LEFT JOIN tx_slot_data ON tx_slot_data.transaction_id=amt_txs.transaction_id
LEFT JOIN banking_stage_results_2.transaction_infos txi ON txi.transaction_id=amt_txs.transaction_id
WHERE true
-- note: check for txi is actually useless ATM as txi is always updated aling with amt_latest
AND (tx_slot_data IS NOT NULL OR txi IS NOT NULL)
AND account_key = %s
ORDER BY sort_nr DESC
LIMIT %s
""", [
account_key,
# note: there is only a limited number of transaction ids per account stored in the postgres array amt_latest.tx_ids
transaction_row_limit,
])
for index, row in enumerate(maprows):
row['pos'] = index + 1
return maprows
# may return multiple rows
def search_transaction_by_sig(tx_sig: str):
maprows = run_query(transaction_row_limit=10, filter_txsig=tx_sig)
return maprows
# return (rows, is_limit_exceeded)
def search_transactions_by_address(account_key: str) -> (list, bool):
page_size = 10
maprows = query_transactions_by_address(transaction_row_limit=page_size+1, account_key=account_key)
if len(maprows) == page_size+1:
print("limit exceeded while searching for transactions by address")
return maprows[:page_size], True
return maprows, False
def map_jsons_in_row(row):
errors = []
if row["all_errors"] is None:
row["all_errors"] = []
return
for errors_json in row["all_errors"]:
errors.append(json.loads(errors_json))
row["errors_array"] = errors
def main():
run_query()
if __name__=="__main__":
main()