forked from a16z/nft-analyst-starter-pack
-
Notifications
You must be signed in to change notification settings - Fork 0
/
export_data.py
164 lines (139 loc) · 5.21 KB
/
export_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import sys
if sys.version_info[0] == 3 and sys.version_info[1] >= 10:
raise Exception("Python >=3.10 is not supported at this time.")
import asyncio
import os
import sys
import tempfile
import warnings
from datetime import datetime, timedelta
import click
import ethereumetl
import numpy as np
import pandas as pd
from ethereumetl.service.eth_service import EthService
from web3 import Web3
from core.generate_metadata_output import generate_metadata_output
from core.generate_sales_output import generate_sales_output
from core.generate_transfers_output import generate_transfers_output
from jobs.export_logs import export_logs
from jobs.export_token_transfers import export_token_transfers
from jobs.get_nft_metadata import get_metadata_for_collection
from jobs.update_block_to_date_mapping import update_block_to_date_mapping
from jobs.update_eth_prices import update_eth_prices
from utils.check_contract_support import check_contract_support
from utils.extract_unique_column_value import extract_unique_column_value
from utils.find_deployment_block_for_contract import find_deployment_block_for_contract
@click.command(context_settings=dict(help_option_names=["-h", "--help"]))
@click.option(
"-a",
"--alchemy-api-key",
required=True,
type=str,
help="The Alchemy API key to use for data extraction.",
)
@click.option(
"-c",
"--contract-address",
required=True,
type=str,
help="The contract address of the desired NFT collection.",
)
def export_data(contract_address, alchemy_api_key):
# Check if contract address is supported by Alchemy
check_contract_support(
alchemy_api_key=alchemy_api_key, contract_address=contract_address
)
warnings.simplefilter(action="ignore", category=FutureWarning)
print("Process started for contract address: " + str(contract_address))
# Assign file paths (persisting files only)
date_block_mapping_csv = "./raw-data/date_block_mapping.csv"
eth_prices_csv = "./raw-data/eth_prices.csv"
sales_csv = "sales_" + contract_address + ".csv"
metadata_csv = "metadata_" + contract_address + ".csv"
transfers_csv = "transfers_" + contract_address + ".csv"
# Set provider
provider_uri = "https://eth-mainnet.alchemyapi.io/v2/" + alchemy_api_key
web3 = Web3(Web3.HTTPProvider(provider_uri))
eth_service = EthService(web3)
ethereum_etl_batch_size = 1000
ethereum_etl_max_workers = 8
# Get block range
start_block = find_deployment_block_for_contract(contract_address, web3)
yesterday = datetime.today() - timedelta(days=1)
_, end_block = eth_service.get_block_range_for_date(yesterday)
with tempfile.NamedTemporaryFile(
delete=False
) as logs_csv, tempfile.NamedTemporaryFile(
delete=False
) as transaction_hashes_txt, tempfile.NamedTemporaryFile(
delete=False
) as token_ids_txt, tempfile.NamedTemporaryFile(
delete=False
) as raw_attributes_csv:
# Export token transfers
export_token_transfers(
start_block=start_block,
end_block=end_block,
batch_size=ethereum_etl_batch_size,
provider_uri=provider_uri,
max_workers=ethereum_etl_max_workers,
tokens=contract_address,
output=transfers_csv,
)
# Extract staging files
extract_unique_column_value(
input_filename=transfers_csv,
output_filename=transaction_hashes_txt.name,
column="transaction_hash",
)
extract_unique_column_value(
input_filename=transfers_csv,
output_filename=token_ids_txt.name,
column="value",
)
# Export logs
export_logs(
start_block=start_block,
end_block=end_block,
batch_size=ethereum_etl_batch_size,
provider_uri=provider_uri,
max_workers=ethereum_etl_max_workers,
tx_hashes_filename=transaction_hashes_txt.name,
output=logs_csv.name,
)
# Update date block mapping
update_block_to_date_mapping(
filename=date_block_mapping_csv, eth_service=eth_service
)
# Update ETH prices
update_eth_prices(filename=eth_prices_csv)
# Generate sales output
generate_sales_output(
transfers_file=transfers_csv,
logs_file=logs_csv.name,
date_block_mapping_file=date_block_mapping_csv,
eth_prices_file=eth_prices_csv,
output=sales_csv,
)
# Generate sales output
generate_transfers_output(
transfers_file=transfers_csv,
date_block_mapping_file=date_block_mapping_csv,
output=transfers_csv,
)
# Get metadata for collection
get_metadata_for_collection(
api_key=alchemy_api_key,
contract_address=contract_address,
output=raw_attributes_csv.name,
)
# Generate metadata output
generate_metadata_output(
raw_attributes_file=raw_attributes_csv.name,
token_ids_file=token_ids_txt.name,
output=metadata_csv,
)
print("Data exported to sales.csv and metadata.csv")
if __name__ == "__main__":
export_data()