diff --git a/app-spec.yml b/app-spec.yml index 74efe80..bd5274c 100644 --- a/app-spec.yml +++ b/app-spec.yml @@ -55,17 +55,17 @@ services: tag: ${GITHUB_SHA} health_check: http_path: /api/v1 - initial_delay_seconds: 60 - timeout_seconds: 1200 - period_seconds: 20 + initial_delay_seconds: 0 + timeout_seconds: 1 + period_seconds: 10 success_threshold: 1 - failure_threshold: 20 + failure_threshold: 6 http_port: 8050 routes: - path: /api/v1/ preserve_path_prefix: true instance_count: 1 - instance_size_slug: basic-xs + instance_size_slug: basic-s name: carbon-api - run_command: gunicorn --worker-tmp-dir /dev/shm --timeout ${GUNICORN_TIMEOUT} src.apps.api.app:app + run_command: gunicorn --workers=1 --threads=2 --worker-tmp-dir /dev/shm --timeout ${GUNICORN_TIMEOUT} src.apps.api.app:app source_dir: / diff --git a/src/apps/api/app.py b/src/apps/api/app.py index 6fad8de..ebfbf8b 100644 --- a/src/apps/api/app.py +++ b/src/apps/api/app.py @@ -25,14 +25,27 @@ def output_json(data, code, headers=None): api.add_resource(endpoints.CreditsRaw, '/credits/raw') api.add_resource(endpoints.CreditsGlobalAggregation, '/credits/agg') api.add_resource(endpoints.CreditsDatesAggregation, '/credits/agg/') -api.add_resource(endpoints.CreditsCountriesAggregation, '/credits/agg/countries') -api.add_resource(endpoints.CreditsProjectsAggregation, '/credits/agg/projects') -api.add_resource(endpoints.CreditsMethodologiesAggregation, '/credits/agg/methodologies') +api.add_resource(endpoints.CreditsCountriesAggregation, '/credits/agg/country') +api.add_resource(endpoints.CreditsProjectsAggregation, '/credits/agg/project') +api.add_resource(endpoints.CreditsMethodologiesAggregation, '/credits/agg/methodology') api.add_resource(endpoints.CreditsVintageAggregation, '/credits/agg/vintage') +api.add_resource(endpoints.CreditsPoolAggregation, '/credits/agg/pool') +api.add_resource(endpoints.CreditsPoolVintageAggregation, '/credits/agg/pool/vintage') +api.add_resource(endpoints.CreditsPoolMethodologyAggregation, '/credits/agg/pool/methodology') +api.add_resource(endpoints.CreditsPoolProjectsAggregation, '/credits/agg/pool/project') +api.add_resource(endpoints.CreditsPoolDatesAggregation, '/credits/agg/pool/') +api.add_resource(endpoints.CreditsBridgeVintageAggregation, '/credits/agg/bridge/vintage') +api.add_resource(endpoints.CreditsBridgeCountriesAggregation, '/credits/agg/bridge/country') +api.add_resource(endpoints.CreditsBridgeProjectsAggregation, '/credits/agg/bridge/project') +api.add_resource(endpoints.CreditsBridgeDateAggregation, '/credits/agg/bridge/') +api.add_resource(endpoints.CreditsBridgeAggregation, '/credits/agg/bridge') + api.add_resource(endpoints.PoolsRaw, '/pools/raw') api.add_resource(endpoints.PoolsGlobalAggregation, '/pools/agg') api.add_resource(endpoints.PoolsDatesAggregation, '/pools/agg/') +api.add_resource(endpoints.PoolsTokensAndDatesAggregation, '/pools/agg/tokens/') + api.add_resource(endpoints.Holders, '/holders') @@ -48,7 +61,9 @@ def output_json(data, code, headers=None): api.add_resource(endpoints.RetirementsDatesAggregation, '/retirements//agg/') api.add_resource(endpoints.RetirementsTokensAggregation, '/retirements/klima/agg/tokens') api.add_resource(endpoints.RetirementsTokensAndDatesAggregation, '/retirements/klima/agg/tokens/') -api.add_resource(endpoints.RetirementsBeneficiariesAggregation, '/retirements//agg/beneficiaries') +api.add_resource(endpoints.RetirementsBeneficiariesAggregation, '/retirements//agg/beneficiary') +api.add_resource(endpoints.RetirementsOriginAndDatesAggregation, '/retirements/all/agg/origin/') + api.add_resource(endpoints.Info, '', '/') diff --git a/src/apps/api/endpoints/__init__.py b/src/apps/api/endpoints/__init__.py index e2f0b7c..67d878d 100644 --- a/src/apps/api/endpoints/__init__.py +++ b/src/apps/api/endpoints/__init__.py @@ -7,12 +7,23 @@ CreditsProjectsAggregation, CreditsMethodologiesAggregation, CreditsVintageAggregation, + CreditsPoolAggregation, + CreditsPoolVintageAggregation, + CreditsPoolMethodologyAggregation, + CreditsPoolProjectsAggregation, + CreditsPoolDatesAggregation, + CreditsBridgeVintageAggregation, + CreditsBridgeCountriesAggregation, + CreditsBridgeProjectsAggregation, + CreditsBridgeDateAggregation, + CreditsBridgeAggregation, CreditsGlobalAggregation ) from .pools import ( # noqa PoolsRaw, PoolsGlobalAggregation, - PoolsDatesAggregation + PoolsDatesAggregation, + PoolsTokensAndDatesAggregation ) from .holders import Holders # noqa from .prices import Prices # noqa @@ -23,6 +34,7 @@ RetirementsBeneficiariesAggregation, RetirementsGlobalAggregation, RetirementsTokensAndDatesAggregation, - RetirementsTokensAggregation + RetirementsTokensAggregation, + RetirementsOriginAndDatesAggregation, ) from .tokens import Tokens # noqa diff --git a/src/apps/api/endpoints/carbon_metrics.py b/src/apps/api/endpoints/carbon_metrics.py index fe713e4..f79cc4e 100644 --- a/src/apps/api/endpoints/carbon_metrics.py +++ b/src/apps/api/endpoints/carbon_metrics.py @@ -1,7 +1,10 @@ -from flask_restful import Resource +from flask_restful import Resource, reqparse from src.apps.services import Metrics as Service, layout_cache, DashArgumentException from . import helpers +carbon_metrics_parser = reqparse.RequestParser() +carbon_metrics_parser.add_argument('sample', type=helpers.validate_list(["daily", "monthly"]), default="all") + class CarbonMetrics(Resource): @layout_cache.cached(query_string=True) @@ -18,5 +21,11 @@ def get(self, chain): service = Service() if (chain not in ["eth", "polygon", "celo", "all"]): raise DashArgumentException(f"Unknown chain '{chain}'") + args = carbon_metrics_parser.parse_args() + sample = args["sample"] metrics = getattr(service, chain)() + + if sample == "monthly": + metrics = metrics.monthly_sample("date") + return metrics diff --git a/src/apps/api/endpoints/credits.py b/src/apps/api/endpoints/credits.py index 0792443..eaabe13 100644 --- a/src/apps/api/endpoints/credits.py +++ b/src/apps/api/endpoints/credits.py @@ -4,8 +4,9 @@ BRIDGES = ["all", "offchain", "toucan", "c3", "moss", "polygon", "eth"] -POOLS = ["ubo", "nbo", "nct", "bct"] -STATUSES = ["issued", "bridged", "deposited", "redeemed", "retired"] +POOLS = ["ubo", "nbo", "nct", "bct", "all"] +STATUSES = ["issued", "bridged", "deposited", "redeemed", "retired", "all_retired", "all"] +OFFCHAIN_FILTER = ["tokenized", "toucan", "c3", "moss"] DATE_FIELDS = [ "bridged_date", "issuance_date", @@ -21,6 +22,11 @@ credits_filter_parser.add_argument('bridge', type=helpers.validate_list(BRIDGES), default="all") credits_filter_parser.add_argument('pool', type=helpers.validate_list(POOLS + [None]), default=None) credits_filter_parser.add_argument('status', type=helpers.validate_list(STATUSES + [None]), default=None) +credits_filter_parser.add_argument( + 'offchain_filter', + type=helpers.validate_list(OFFCHAIN_FILTER + [None]), + default=None +) class AbstractCredits(Resource): @@ -30,23 +36,56 @@ def get_default_date_field(self): args = credits_filter_parser.parse_args() bridge = args["bridge"] status = args["status"] + if status is None: return "issuance_date" if bridge == "offchain" else "bridged_date" else: return helpers.status_date_column(status) - def get_credits(self): + @helpers.with_daterange_filter("bridged_date") + @helpers.with_daterange_filter("issuance_date") + @helpers.with_daterange_filter("retirement_date") + @helpers.with_daterange_filter("deposited_date") + @helpers.with_daterange_filter("redeemed_date") + def get_credits(self, bridge=None): args = credits_filter_parser.parse_args() - bridge = args["bridge"] + if not bridge: + bridge = args["bridge"] pool = args["pool"] status = args["status"] + offchain_filter = args["offchain_filter"] + + # Accept a 'all' value for pools + if pool == "all": + pool = None + + # Accept a 'all' value for status + if status == "all": + status = None # auto select status if status is None: status = "issued" if bridge == "offchain" else "bridged" - # Return credits - return Service().filter(bridge, pool, status) + # Select credits + df = Service().filter(bridge, pool, status) + + # Filter offchain credits + if offchain_filter: + df = df.offchain_filter(offchain_filter) + + return df + + def get_pooled_credits(self, bridge=None): + """ + Hack: Filter the polygon datasets for pool analysis + because the pool columns quantities are not properly made + """ + args = credits_filter_parser.parse_args() + credits = self.get_credits(bridge) + if args["bridge"] in ["toucan", "c3", "polygon"]: + credits = credits.pool_analysis() + return credits class CreditsRaw(AbstractCredits): @@ -59,9 +98,6 @@ class CreditsRaw(AbstractCredits): """ ) @helpers.with_output_formatter - @helpers.with_daterange_filter("bridged_date") - @helpers.with_daterange_filter("issuance_date") - @helpers.with_daterange_filter("retirement_date") def get(self): return self.get_credits() @@ -77,11 +113,6 @@ class CreditsDatesAggregation(AbstractCredits): """ ) @helpers.with_output_formatter - @helpers.with_daterange_filter("bridged_date") - @helpers.with_daterange_filter("issuance_date") - @helpers.with_daterange_filter("retirement_date") - @helpers.with_daterange_filter("deposited_date") - @helpers.with_daterange_filter("redeemed_date") def get(self, freq): return helpers.apply_date_aggregation( DATE_FIELDS, @@ -147,6 +178,159 @@ def get(self): return credits +class CreditsPoolAggregation(AbstractCredits): + @layout_cache.cached(query_string=True) + @helpers.with_errors_handler + @helpers.with_help( + f"""{BASE_HELP} + """ + ) + def get(self): + credits = self.get_credits().pool_analysis().pool_summary().resolve().to_dict(orient='records')[0] + return credits + + +class CreditsPoolVintageAggregation(AbstractCredits): + @layout_cache.cached(query_string=True) + @helpers.with_errors_handler + @helpers.with_help( + f"""{BASE_HELP} + {helpers.OUTPUT_FORMATTER_HELP} + """ + ) + @helpers.with_output_formatter + def get(self): + credits = self.get_pooled_credits().vintage_agg().pool_summary("vintage") + return credits + + +class CreditsPoolMethodologyAggregation(AbstractCredits): + @layout_cache.cached(query_string=True) + @helpers.with_errors_handler + @helpers.with_help( + f"""{BASE_HELP} + {helpers.OUTPUT_FORMATTER_HELP} + """ + ) + @helpers.with_output_formatter + def get(self): + credits = self.get_pooled_credits().methodologies_agg().pool_summary("methodology") + return credits + + +class CreditsPoolCountriesAggregation(AbstractCredits): + @layout_cache.cached(query_string=True) + @helpers.with_errors_handler + @helpers.with_help( + f"""{BASE_HELP} + {helpers.OUTPUT_FORMATTER_HELP} + """ + ) + @helpers.with_output_formatter + def get(self): + credits = self.get_pooled_credits().countries_agg().pool_summary(["country_code", "country"]) + return credits + + +class CreditsPoolProjectsAggregation(AbstractCredits): + @layout_cache.cached(query_string=True) + @helpers.with_errors_handler + @helpers.with_help( + f"""{BASE_HELP} + {helpers.OUTPUT_FORMATTER_HELP} + """ + ) + @helpers.with_output_formatter + def get(self): + credits = self.get_pooled_credits().projects_agg().pool_summary("project_type") + return credits + + +class CreditsPoolDatesAggregation(AbstractCredits): + @layout_cache.cached(query_string=True) + @helpers.with_errors_handler + @helpers.with_help( + f"""{BASE_HELP} + {helpers.OUTPUT_FORMATTER_HELP} + """ + ) + @helpers.with_output_formatter + def get(self, freq): + date_column = self.get_default_date_field() + credits = self.get_credits().date_agg(date_column, freq).pool_summary(date_column) + return credits + + +class CreditsBridgeVintageAggregation(AbstractCredits): + @layout_cache.cached(query_string=True) + @helpers.with_errors_handler + @helpers.with_help( + f"""{BASE_HELP} + {helpers.OUTPUT_FORMATTER_HELP} + """ + ) + @helpers.with_output_formatter + def get(self): + credits = self.get_credits(bridge="offchain").vintage_agg().bridge_summary("vintage") + return credits + + +class CreditsBridgeCountriesAggregation(AbstractCredits): + @layout_cache.cached(query_string=True) + @helpers.with_errors_handler + @helpers.with_help( + f"""{BASE_HELP} + {helpers.OUTPUT_FORMATTER_HELP} + """ + ) + @helpers.with_output_formatter + def get(self): + credits = self.get_credits(bridge="offchain").countries_agg().bridge_summary(["country_code", "country"]) + return credits + + +class CreditsBridgeProjectsAggregation(AbstractCredits): + @layout_cache.cached(query_string=True) + @helpers.with_errors_handler + @helpers.with_help( + f"""{BASE_HELP} + {helpers.OUTPUT_FORMATTER_HELP} + """ + ) + @helpers.with_output_formatter + def get(self): + credits = self.get_credits(bridge="offchain").projects_agg().bridge_summary("project_type") + return credits + + +class CreditsBridgeDateAggregation(AbstractCredits): + @layout_cache.cached(query_string=True) + @helpers.with_errors_handler + @helpers.with_help( + f"""{BASE_HELP} + {helpers.dates_aggregation_help(DATE_FIELDS)} + {helpers.OUTPUT_FORMATTER_HELP} + {helpers.DATES_FILTER_HELP} + """ + ) + @helpers.with_output_formatter + def get(self, freq): + return self.get_credits(bridge="offchain").date_agg("issuance_date", freq).bridge_summary("issuance_date") + + +class CreditsBridgeAggregation(AbstractCredits): + @layout_cache.cached(query_string=True) + @helpers.with_errors_handler + @helpers.with_help( + f"""{BASE_HELP} + {helpers.dates_aggregation_help(DATE_FIELDS)} + {helpers.DATES_FILTER_HELP} + """ + ) + def get(self): + return self.get_credits(bridge="offchain").bridge_summary("quantity").resolve().to_dict(orient='records')[0] + + class CreditsGlobalAggregation(AbstractCredits): @layout_cache.cached(query_string=True) @helpers.with_errors_handler diff --git a/src/apps/api/endpoints/helpers.py b/src/apps/api/endpoints/helpers.py index 4c90cc8..2ddff3e 100644 --- a/src/apps/api/endpoints/helpers.py +++ b/src/apps/api/endpoints/helpers.py @@ -116,6 +116,7 @@ def wrapper(*func_args, **kwargs): df = df.sort_values(by=sort_by, ascending=sort_order == "asc") # Paginate results + print(df.shape) items_count = df.shape[0] page = args["page"] page_size = args["page_size"] @@ -125,7 +126,10 @@ def wrapper(*func_args, **kwargs): page_size = items_count # Compute info - pages_count = math.ceil(items_count / page_size) + if items_count > 0: + pages_count = math.ceil(items_count / page_size) + else: + pages_count = 1 # Slice dataframe df = df[page_size * page:page_size * (page + 1)] diff --git a/src/apps/api/endpoints/info.py b/src/apps/api/endpoints/info.py index c762718..fb926f7 100644 --- a/src/apps/api/endpoints/info.py +++ b/src/apps/api/endpoints/info.py @@ -10,10 +10,21 @@ def get(self): "credits/agg", "credits/agg/daily", "credits/agg/monthly", - "credits/agg/countries", - "credits/agg/projects", - "credits/agg/methodologies", + "credits/agg/countriy", + "credits/agg/project", + "credits/agg/methodology", "credits/agg/vintage", + "credits/agg/pool", + "credits/agg/pool/vintage", + "credits/agg/pool/methodology", + "credits/agg/pool/project", + "credits/agg/pool/daily", + "credits/agg/pool/monthly", + "credits/agg/bridge/vintage", + "credits/agg/bridge/country", + "credits/agg/bridge/project", + "credits/agg/bridge/daily", + "credits/agg/bridge/monthly", "pools/raw", "pools/agg", "pools/agg/daily", @@ -28,12 +39,14 @@ def get(self): "retirements/all/agg", "retirements/all/agg/daily", "retirements/all/agg/monthly", - "retirements/all/agg/beneficiaries", + "retirements/all/agg/beneficiary", + "retirements/all/agg/origin/daily", + "retirements/all/agg/origin/monthly", "retirements/klima/raw", "retirements/klima/agg", "retirements/klima/agg/daily", "retirements/klima/agg/monthly", - "retirements/klima/agg/beneficiaries", + "retirements/klima/agg/beneficiary", "retirements/klima/agg/tokens", "retirements/klima/agg/tokens/daily", "retirements/klima/agg/tokens/monthly" diff --git a/src/apps/api/endpoints/pools.py b/src/apps/api/endpoints/pools.py index 027f5d0..4cc76e5 100644 --- a/src/apps/api/endpoints/pools.py +++ b/src/apps/api/endpoints/pools.py @@ -3,7 +3,7 @@ from . import helpers -POOLS = ["ubo", "nbo", "nct", "bct"] +POOLS = ["ubo", "nbo", "nct", "bct", "all"] STATUSES = ["deposited", "redeemed", "retired"] OPERATORS = ["sum", "cumsum"] DATE_FIELDS = [ @@ -31,6 +31,9 @@ def get_default_date_field(self): else: return helpers.status_date_column(status) + @helpers.with_daterange_filter("retirement_date") + @helpers.with_daterange_filter("deposited_date") + @helpers.with_daterange_filter("redeemed_date") def get_pool(self): args = pools_filter_parser.parse_args() pool = args["pool"] @@ -49,9 +52,6 @@ class PoolsRaw(AbstractPools): """ ) @helpers.with_output_formatter - @helpers.with_daterange_filter("retirement_date") - @helpers.with_daterange_filter("deposited_date") - @helpers.with_daterange_filter("redeemed_date") def get(self): return self.get_pool() @@ -65,9 +65,6 @@ class PoolsDatesAggregation(AbstractPools): """ ) @helpers.with_output_formatter - @helpers.with_daterange_filter("retirement_date") - @helpers.with_daterange_filter("deposited_date") - @helpers.with_daterange_filter("redeemed_date") def get(self, freq): return helpers.apply_date_aggregation( DATE_FIELDS, @@ -77,6 +74,22 @@ def get(self, freq): ) +class PoolsTokensAndDatesAggregation(AbstractPools): + @layout_cache.cached(query_string=True) + @helpers.with_errors_handler + @helpers.with_help( + f""" + Aggregates Pools retirements on date and token + {helpers.OUTPUT_FORMATTER_HELP} + """ + ) + @helpers.with_output_formatter + def get(self, freq): + date_field = self.get_default_date_field() + pools = self.get_pool().date_agg(date_field, freq).pool_summary(date_field) + return pools + + class PoolsGlobalAggregation(AbstractPools): @layout_cache.cached(query_string=True) @helpers.with_errors_handler diff --git a/src/apps/api/endpoints/retirements.py b/src/apps/api/endpoints/retirements.py index 643c582..2c4aad0 100644 --- a/src/apps/api/endpoints/retirements.py +++ b/src/apps/api/endpoints/retirements.py @@ -50,7 +50,22 @@ class RetirementsTokensAndDatesAggregation(Resource): ) @helpers.with_output_formatter def get(self, freq): - retirements = Service().get("klima").date_agg(["retirement_date"], freq).summary() + retirements = Service().get("klima").date_agg(["retirement_date"], freq).token_summary() + return retirements + + +class RetirementsOriginAndDatesAggregation(Resource): + @layout_cache.cached(query_string=True) + @helpers.with_errors_handler + @helpers.with_help( + f""" + Aggregates Klima retirements on retirement date and token + {helpers.OUTPUT_FORMATTER_HELP} + """ + ) + @helpers.with_output_formatter + def get(self, freq): + retirements = Service().get("all").date_agg(["retirement_date"], freq).origin_summary() return retirements diff --git a/src/apps/api/test.py b/src/apps/api/test.py index e114022..7af7068 100644 --- a/src/apps/api/test.py +++ b/src/apps/api/test.py @@ -9,10 +9,19 @@ "credits/agg/daily", "credits/agg/daily?status=retired&retirement_date_gt=2021-10-30T00:38:28&retirement_date_lt=2022-10-30T00:38:28&sort_by=retirement_date&operator=cumsum", # noqa "credits/agg/monthly", - "credits/agg/countries", + "credits/agg/country", "credits/agg/projects", - "credits/agg/methodologies", + "credits/agg/methodology", "credits/agg/vintage", + "credits/agg/pool", + "credits/agg/pool/vintage", + "credits/agg/pool/methodology", + "credits/agg/pool/project", + "credits/agg/pool/daily", + "credits/agg/pool/monthly", + "credits/agg/bridge/project", + "credits/agg/bridge/vintage", + "credits/agg/bridge/country", "pools/raw", "pools/agg", "pools/agg/daily", @@ -29,12 +38,14 @@ "retirements/all/agg", "retirements/all/agg/daily", "retirements/all/agg/monthly", - "retirements/all/agg/beneficiaries", + "retirements/all/agg/beneficiary", + "retirements/all/agg/origin/daily", + "retirements/all/agg/origin/monthly", "retirements/klima/raw", "retirements/klima/agg", "retirements/klima/agg/daily", "retirements/klima/agg/monthly", - "retirements/klima/agg/beneficiaries", + "retirements/klima/agg/beneficiary", "retirements/klima/agg/tokens", "retirements/klima/agg/tokens/daily", "retirements/klima/agg/tokens/monthly" @@ -44,10 +55,10 @@ url = f"{BASE_URL}/{path}" response = requests.get(url) if response.status_code != 200: - print(f"{url} => Failed ({response.status_code})") print("---") print(response.text) print("---") + print(f"{url} => Failed ({response.status_code})") exit(1) print(f"{url} => OK") diff --git a/src/apps/services/cache.py b/src/apps/services/cache.py index fa793de..beeef0c 100644 --- a/src/apps/services/cache.py +++ b/src/apps/services/cache.py @@ -154,15 +154,19 @@ def convert_date(date): if isinstance(df[date_column][0], datetime.date): date = datetime.date(date.year, date.month, date.day) return date + + if end is None and begin is None: + return df + if df.empty: return df if end is not None: df = df[ - (df[date_column] <= convert_date(end)) + (df[date_column] <= end) ] if begin is not None: df = df[ - (df[date_column] >= convert_date(begin)) + (df[date_column] >= begin) ] return df @@ -184,7 +188,7 @@ def daily_agg(self, df, columns): date_column = columns[0] """Adds an aggregation by day""" df = self.date_manipulations(df, date_column, "daily") - df = df.groupby(columns) + df = df.groupby(columns, group_keys=False) return df def monthly_agg(self, df, columns): @@ -193,7 +197,7 @@ def monthly_agg(self, df, columns): columns = [columns] date_column = columns[0] df = self.date_manipulations(df, date_column, "monthly") - df = df.groupby(columns) + df = df.groupby(columns, group_keys=False) return df @final_cached_command() @@ -222,7 +226,7 @@ def sum(self, df, column): def sum_over_time(self, df, date_column, column, freq): df = self.date_manipulations(df, date_column, freq) df = df.sort_values(by=date_column, ascending=True) - df = df.groupby(date_column)[column].sum().to_frame().reset_index() + df = df.groupby(date_column, group_keys=False)[column].sum().to_frame().reset_index() df[column] = df[column].cumsum() return df @@ -231,6 +235,14 @@ def cumsum(self, df, column): """Cumulative sum""" return df[column].cumsum() + @chained_cached_command() + def monthly_sample(self, df, date_column): + """Samples daily data into monthly data""" + return df.groupby( + pd.DatetimeIndex(df[date_column]).to_period('M'), + group_keys=False + ).nth(-1).reset_index(drop=True) + def date_manipulations(self, df, date_column, freq): if date_column not in df: raise DashArgumentException(f"Unknown column '{date_column}'") diff --git a/src/apps/services/credits.py b/src/apps/services/credits.py index 1ee63a9..9250a2a 100644 --- a/src/apps/services/credits.py +++ b/src/apps/services/credits.py @@ -21,6 +21,13 @@ def load_df(self, bridge: str, pool: str, status: str): if bridge in ["offchain"]: if status == "issued": df = s3.load("verra_data_v2") + elif status == "bridged": + df = s3.load("verra_data_v2") + df = df.query("toucan | c3 | moss") + # This is a hack to get all retired offsets even if the retirements occured offchain + elif status == "all_retired": + df = s3.load("verra_data_v2") + df = df[df["status"] == "Retired"] elif status == "retired": df = s3.load("verra_retirements") else: @@ -29,14 +36,14 @@ def load_df(self, bridge: str, pool: str, status: str): elif bridge in ["toucan", "c3", "polygon"]: if status == "bridged": df = s3.load("polygon_bridged_offsets_v2") - elif status == "retired": + elif status in ["retired", "all_retired"]: df = s3.load("polygon_retired_offsets_v2") else: raise helpers.DashArgumentException(f"Unknown credit status {status}") elif bridge in ["moss", "eth"]: if status == "bridged": df = s3.load("eth_moss_bridged_offsets_v2") - elif status == "retired": + elif status in ["retired", "all_retired"]: df = s3.load("eth_retired_offsets_v2") else: raise helpers.DashArgumentException(f"Unknown credit status {status}") @@ -54,6 +61,7 @@ def load_df(self, bridge: str, pool: str, status: str): "project_type", "region", "country", + "country_code", "methodology", "vintage", "name", @@ -71,12 +79,9 @@ def load_df(self, bridge: str, pool: str, status: str): df = df[df["bridge"].str.lower() == bridge.lower()].reset_index(drop=True) # Filter pool - if pool: - df = self.drop_duplicates(df) - if pool == "all": - df = self.filter_pool_quantity(df, "total_quantity") - else: - df = self.filter_pool_quantity(df, f"{pool}_quantity") + if pool and pool != "all": + quantity_column = f"{pool}_quantity" + df = df[df[quantity_column] > 0] return df @@ -90,63 +95,88 @@ def filter(self, df, bridge, pool, status): @chained_cached_command() def vintage_agg(self, df): """Adds an aggregation on vintage""" - df = df.groupby("vintage") + df = df.groupby("vintage", group_keys=False) return df @chained_cached_command() def countries_agg(self, df): - df = df.groupby(["country", "country_code"]) + df = df.groupby(["country", "country_code"], group_keys=False) return df @chained_cached_command() def projects_agg(self, df): - df = df.groupby("project_type") + df = df.groupby("project_type", group_keys=False) return df @chained_cached_command() def methodologies_agg(self, df): - df = df.groupby("methodology") + df = df.groupby("methodology", group_keys=False) return df - def _summary(self, df, result_cols): - """Creates a summary""" - group_by_cols = result_cols.copy() - group_by_cols.remove("quantity") - df = ( - df.groupby(group_by_cols)["quantity"] - .sum() - .to_frame() - .reset_index(drop=True) - ) - df = df[result_cols] + @chained_cached_command() + def pool_summary(self, df, kept_fields=[]): + columns = [ + "bct_quantity", + "nct_quantity", + "ubo_quantity", + "nbo_quantity", + "mco2_quantity" + ] + if isinstance(df, pd.DataFrame): + df = df.groupby(lambda x: True, group_keys=False) + + if not isinstance(kept_fields, list): + kept_fields = [kept_fields] + + def summary(df): + res_df = pd.DataFrame() + for kept_field in kept_fields: + res_df[kept_field] = [df[kept_field].iloc[0]] + + total_quantity = df["total_quantity"].sum() + res_df["total_quantity"] = [total_quantity] + not_pooled_quantity = total_quantity + for column in columns: + if column in df: + column_quantity = df[column].sum() + res_df[column] = [column_quantity] + not_pooled_quantity -= column_quantity + res_df["not_pooled_quantity"] = [not_pooled_quantity] + + return res_df + df = df.apply(summary).reset_index(drop=True) return df - @final_cached_command() - def pool_summary(self, df): - """Creates a summary for pool data""" - return self._summary(df, [ - "project_id", - "token Address", - "quantity", - "vintage", - "country", - "project_type", - "methodology", - "name" - ]) + @chained_cached_command() + def bridge_summary(self, df, kept_fields): + # Full aggregation if we are presented a dataframe not grouped yet + if isinstance(df, pd.DataFrame): + df = df.groupby(lambda x: True) + + column = "quantity" + if not isinstance(kept_fields, list): + kept_fields = [kept_fields] + + def summary(df): + res_df = pd.DataFrame() + for kept_field in kept_fields: + res_df[kept_field] = [df[kept_field].iloc[0]] + bridged_quantity = 0 + for bridge in helpers.ALL_BRIDGES: + filtered_df = df[df["bridge"].str.lower() == bridge.lower()] + this_bridge_quantity = filtered_df[column].sum() + res_df[f"{bridge}_quantity"] = [this_bridge_quantity] + bridged_quantity = bridged_quantity + this_bridge_quantity + total_quantity = df[column].sum() + res_df["total_quantity"] = [total_quantity] + res_df["not_bridge_quantity"] = [total_quantity - bridged_quantity] + res_df["bridge_quantity"] = [bridged_quantity] + res_df["bridge_ratio"] = [bridged_quantity / total_quantity] + return res_df + + df = df.apply(summary).reset_index(drop=True) - @final_cached_command() - def bridge_summary(self, df): - """Creates a summary for bridge data""" - return self._summary(df, [ - "project_id", - "quantity", - "vintage", - "country", - "project_type", - "methodology", - "name" - ]) + return df @final_cached_command() def average(self, df, column, weights): @@ -177,7 +207,9 @@ def filter_pool_quantity(self, df, quantity_column): return df - def drop_duplicates(self, df): + @chained_cached_command() + def pool_analysis(self, df): + """When analysing pools we need a subset of the data""" df = df.drop_duplicates(subset=["token_address"], keep="first") df = df.reset_index(drop=True) return df diff --git a/src/apps/services/helpers.py b/src/apps/services/helpers.py index 38f608f..e896e0f 100644 --- a/src/apps/services/helpers.py +++ b/src/apps/services/helpers.py @@ -14,7 +14,7 @@ def status_date_column(status): return "issuance_date" elif status == "bridged": return "bridged_date" - elif status == "retired": + elif status in ["retired", "all_retired"]: return "retirement_date" elif status == "redeemed": return "redeemed_date" diff --git a/src/apps/services/metrics.py b/src/apps/services/metrics.py index c21cbd0..c3dfa77 100644 --- a/src/apps/services/metrics.py +++ b/src/apps/services/metrics.py @@ -42,7 +42,20 @@ def all(self, _df): "date_celo", "date_polygon" ]) + all = all.fillna(method="backfill") all = all.fillna(0) + + # Compute cross chain protocol data + all["total_nct_supply"] = all.nct_supply_polygon + all.nct_supply_celo + all["total_bct_supply"] = all.bct_supply_polygon + all.bct_supply_celo + all["total_nbo_supply"] = all.nbo_supply_polygon + all["total_ubo_supply"] = all.ubo_supply_polygon + all["total_mco2_supply"] = all.mco2_supply_polygon + all.mco2_supply_celo + all.mco2_supply_eth + + all["total_toucan_supply"] = all.total_bct_supply + all.total_nct_supply + all["total_c3_supply"] = all.total_ubo_supply + all.total_nbo_supply + all["total_moss_supply"] = all.total_mco2_supply + return all @final_cached_command() diff --git a/src/apps/services/pools.py b/src/apps/services/pools.py index d59eacf..19a72fd 100644 --- a/src/apps/services/pools.py +++ b/src/apps/services/pools.py @@ -1,3 +1,4 @@ +import pandas as pd from . import ( helpers, S3, @@ -17,7 +18,6 @@ def __init__(self, commands=[]): def load_df(self, pool, status): s3 = S3() - # autoselect bridge if status == "retired": df = s3.load("polygon_pools_retired_offsets") elif status == "deposited": @@ -26,7 +26,6 @@ def load_df(self, pool, status): df = s3.load("polygon_pools_redeemed_offsets") else: raise helpers.DashArgumentException(f"Unknown credit status {status}") - if pool and pool != "all": df = self.filter_df_by_pool(df, pool) @@ -54,7 +53,24 @@ def quantities(self, bridge) -> dict: return dict(zip(pool_labels, values)) + @chained_cached_command() + def pool_summary(self, df, date_field): + tokens = Tokens().get_dict() + + def summary(df): + res_df = pd.DataFrame() + res_df[date_field] = [df[date_field].iloc[0]] + for token in tokens: + address = tokens[token]["token_address"] + filtered_df = df[df["pool"] == address] + res_df[f"{token.lower()}_quantity"] = [filtered_df["quantity"].sum()] + res_df[f"{token.lower()}_count"] = [filtered_df["quantity"].count()] + return res_df + + df = df.apply(summary).reset_index(drop=True) + return df + def filter_df_by_pool(self, df, pool): - pool_address = Tokens().get(pool)["address"] + pool_address = Tokens().get(pool)["token_address"] df = df[(df["pool"] == pool_address)].reset_index(drop=True) return df diff --git a/src/apps/services/retirements.py b/src/apps/services/retirements.py index 349c9b2..d70229d 100644 --- a/src/apps/services/retirements.py +++ b/src/apps/services/retirements.py @@ -10,18 +10,6 @@ ) -def summary(df): - res_df = pd.DataFrame() - res_df["retirement_date"] = [df["retirement_date"].iloc[0]] - res_df["amount_retired"] = [df["quantity"].sum()] - res_df["number_of_retirements"] = [df["quantity"].count()] - for token in helpers.ALL_TOKENS: - filtered_df = df[df["token"] == token.upper()] - res_df[f"amount_retired_{token}"] = [filtered_df["quantity"].sum()] - res_df[f"number_of_retirements_{token}"] = [filtered_df["quantity"].count()] - return res_df - - class Retirements(DfCacheable): """Service for carbon metrics""" def __init__(self, commands=[]): @@ -57,16 +45,42 @@ def filter_tokens(self, df, tokens): ] @chained_cached_command() - def summary(self, df): + def token_summary(self, df): + def summary(df): + res_df = pd.DataFrame() + res_df["retirement_date"] = [df["retirement_date"].iloc[0]] + res_df["amount_retired"] = [df["quantity"].sum()] + res_df["number_of_retirements"] = [df["quantity"].count()] + for token in helpers.ALL_TOKENS: + filtered_df = df[df["token"] == token.upper()] + res_df[f"amount_retired_{token}"] = [filtered_df["quantity"].sum()] + res_df[f"number_of_retirements_{token}"] = [filtered_df["quantity"].count()] + return res_df + + df = df.apply(summary).reset_index(drop=True) + return df + + @chained_cached_command() + def origin_summary(self, df): + def summary(df): + res_df = pd.DataFrame() + res_df["retirement_date"] = [df["retirement_date"].iloc[0]] + for origin in ["Offchain", "Klima"]: + filtered_df = df[df["origin"] == origin] + res_df[f"amount_retired_{origin.lower()}"] = [filtered_df["quantity"].sum()] + res_df[f"number_of_retirements_{origin.lower()}"] = [filtered_df["quantity"].count()] + + return res_df + df = df.apply(summary).reset_index(drop=True) return df @chained_cached_command() def beneficiaries_agg(self, df): - df = df.groupby("beneficiary") + df = df.groupby("beneficiary", group_keys=False) return df @chained_cached_command() def tokens_agg(self, df): - df = df.groupby("token") + df = df.groupby("token", group_keys=False) return df