Skip to content

Commit

Permalink
fix: bubble backfill transform. no op out of download metadata info.
Browse files Browse the repository at this point in the history
  • Loading branch information
kespinola committed Aug 26, 2024
1 parent 3213080 commit 7f3b391
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 80 deletions.
61 changes: 26 additions & 35 deletions backfill/src/worker/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ impl TryFrom<FetchedEncodedTransactionWithStatusMeta> for TransactionInfo {
fn try_from(
fetched_transaction: FetchedEncodedTransactionWithStatusMeta,
) -> Result<Self, Self::Error> {
tracing::info!("fetched transaction: {:?}", fetched_transaction);
let mut account_keys = Vec::new();
let encoded_transaction_with_status_meta = fetched_transaction.0;

Expand All @@ -62,49 +61,41 @@ impl TryFrom<FetchedEncodedTransactionWithStatusMeta> for TransactionInfo {
.transaction
.meta
.ok_or(ErrorKind::Generic(
"unable to get meta from transaction".to_string(),
"transaction metadata is missing".to_string(),
))?;

for address in msg.static_account_keys().iter().copied() {
account_keys.push(address);
}
let ui_loaded_addresses = meta.loaded_addresses;

let message_address_table_lookup = msg.address_table_lookups();

if message_address_table_lookup.is_some() {
if let OptionSerializer::Some(ui_lookup_table) = ui_loaded_addresses {
for address in ui_lookup_table.writable {
account_keys.push(PubkeyString(address).try_into()?);
}

for address in ui_lookup_table.readonly {
account_keys.push(PubkeyString(address).try_into()?);
}
let ui_loaded_addresses = match meta.loaded_addresses {
OptionSerializer::Some(addresses) => addresses,
OptionSerializer::None => {
return Err(ErrorKind::Generic(
"loaded addresses data is missing".to_string(),
))
}
}

let mut meta_inner_instructions = Vec::new();
OptionSerializer::Skip => {
return Err(ErrorKind::Generic(
"loaded addresses are skipped".to_string(),
));
}
};

let compiled_instruction = msg.instructions().to_vec();
let writtable_loaded_addresses = ui_loaded_addresses.writable;
let readable_loaded_addresses = ui_loaded_addresses.readonly;

let mut instructions = Vec::new();
if msg.address_table_lookups().is_some() {
for address in writtable_loaded_addresses {
account_keys.push(PubkeyString(address).try_into()?);
}

for inner in compiled_instruction {
instructions.push(InnerInstruction {
stack_height: Some(0),
instruction: CompiledInstruction {
program_id_index: inner.program_id_index,
accounts: inner.accounts,
data: inner.data,
},
});
for address in readable_loaded_addresses {
account_keys.push(PubkeyString(address).try_into()?);
}
}

meta_inner_instructions.push(InnerInstructions {
index: 0,
instructions,
});
let mut meta_inner_instructions = Vec::new();

if let OptionSerializer::Some(inner_instructions) = meta.inner_instructions {
for ix in inner_instructions {
Expand All @@ -117,9 +108,9 @@ impl TryFrom<FetchedEncodedTransactionWithStatusMeta> for TransactionInfo {
instruction: CompiledInstruction {
program_id_index: compiled.program_id_index,
accounts: compiled.accounts,
data: bs58::decode(compiled.data)
.into_vec()
.map_err(|e| ErrorKind::Generic(e.to_string()))?,
data: bs58::decode(compiled.data).into_vec().map_err(|e| {
ErrorKind::Generic(format!("Error decoding data: {}", e))
})?,
},
});
}
Expand Down
14 changes: 0 additions & 14 deletions core/src/metadata_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,20 +239,6 @@ impl DownloadMetadata {
&self,
download_metadata_info: &DownloadMetadataInfo,
) -> Result<(), MetadataJsonTaskError> {
let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(self.pool.clone());

if let Some(asset_data) =
asset_data::Entity::find_by_id(download_metadata_info.asset_data_id.clone())
.one(&conn)
.await?
{
if asset_data.slot_updated == download_metadata_info.slot
&& asset_data.reindex == Some(false)
{
return Ok(());
}
}

perform_metadata_json_task(
self.client.clone(),
self.pool.clone(),
Expand Down
1 change: 1 addition & 0 deletions grpc-ingest/src/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ pub async fn run_v2(config: ConfigIngester) -> anyhow::Result<()> {
.connection(connection.clone())
.handler(move |info| {
let pt_transactions = Arc::clone(&pt_transactions);

Box::pin(async move {
let info = TransactionInfo::try_parse_msg(info)?;

Expand Down
5 changes: 4 additions & 1 deletion grpc-ingest/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,11 +558,14 @@ impl IngestStream {
break;
},
claimed = self.pending(&mut connection, &start) => {
info!("redis=claimed stream={} claimed={:?}", config.name, claimed);
if let Ok(Some(claimed)) = claimed {

let ids = claimed.ids.clone();
let ids: Vec<&str> = ids.iter().map(|info| info.id.as_str()).collect();

info!("redis=claimed stream={} claimed={:?}", config.name, ids.len());


for StreamId { id, map } in claimed.ids.into_iter() {
executor.push(IngestStreamJob::Process((id, map)));
}
Expand Down
33 changes: 21 additions & 12 deletions program_transformers/src/bubblegum/db.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use {
crate::error::{ProgramTransformerError, ProgramTransformerResult},
das_core::DownloadMetadataInfo,
digital_asset_types::dao::{
asset, asset_authority, asset_creators, asset_data, asset_grouping, backfill_items,
cl_audits_v2, cl_items,
Expand Down Expand Up @@ -403,25 +404,23 @@ pub async fn upsert_asset_data<T>(
chain_data: JsonValue,
metadata_url: String,
metadata_mutability: Mutability,
metadata: JsonValue,
slot_updated: i64,
reindex: Option<bool>,
raw_name: Vec<u8>,
raw_symbol: Vec<u8>,
seq: i64,
) -> ProgramTransformerResult<()>
) -> ProgramTransformerResult<Option<DownloadMetadataInfo>>
where
T: ConnectionTrait + TransactionTrait,
{
let model = asset_data::ActiveModel {
id: ActiveValue::Set(id.clone()),
chain_data_mutability: ActiveValue::Set(chain_data_mutability),
chain_data: ActiveValue::Set(chain_data),
metadata_url: ActiveValue::Set(metadata_url),
metadata_url: ActiveValue::Set(metadata_url.clone()),
metadata_mutability: ActiveValue::Set(metadata_mutability),
metadata: ActiveValue::Set(metadata),
metadata: ActiveValue::Set(JsonValue::String("processing".to_string())),
slot_updated: ActiveValue::Set(slot_updated),
reindex: ActiveValue::Set(reindex),
reindex: ActiveValue::Set(Some(true)),
raw_name: ActiveValue::Set(Some(raw_name)),
raw_symbol: ActiveValue::Set(Some(raw_symbol)),
base_info_seq: ActiveValue::Set(Some(seq)),
Expand All @@ -435,9 +434,7 @@ where
asset_data::Column::ChainData,
asset_data::Column::MetadataUrl,
asset_data::Column::MetadataMutability,
// Don't update asset_data::Column::Metadata if it already exists. Even if we
// are indexing `update_metadata`` and there's a new URI, the new background
// task will overwrite it.
asset_data::Column::Metadata,
asset_data::Column::SlotUpdated,
asset_data::Column::Reindex,
asset_data::Column::RawName,
Expand All @@ -450,15 +447,27 @@ where

// Do not overwrite changes that happened after decompression (asset_data.base_info_seq = 0).
// Do not overwrite changes from a later Bubblegum instruction.
// Do not update the record if the incoming slot is larger than the current or if it's null.
// Update if the current slot on the record is null.
query.sql = format!(
"{} WHERE (asset_data.base_info_seq != 0 AND excluded.base_info_seq >= asset_data.base_info_seq) OR asset_data.base_info_seq IS NULL",
"{} WHERE ((asset_data.base_info_seq != 0 AND excluded.base_info_seq >= asset_data.base_info_seq) OR asset_data.base_info_seq IS NULL) AND (excluded.slot_updated <= asset_data.slot_updated OR asset_data.slot_updated IS NULL)",
query.sql
);
txn.execute(query)

let result = txn
.execute(query)
.await
.map_err(|db_err| ProgramTransformerError::StorageWriteError(db_err.to_string()))?;

Ok(())
if result.rows_affected() > 0 {
Ok(Some(DownloadMetadataInfo::new(
id,
metadata_url,
slot_updated,
)))
} else {
Ok(None)
}
}

#[allow(clippy::too_many_arguments)]
Expand Down
12 changes: 3 additions & 9 deletions program_transformers/src/bubblegum/mint_v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use {
},
json::ChainDataV1,
},
sea_orm::{query::JsonValue, ConnectionTrait, TransactionTrait},
sea_orm::{ConnectionTrait, TransactionTrait},
tracing::warn,
};

Expand Down Expand Up @@ -91,16 +91,14 @@ where
// automatically rolled back.
let multi_txn = txn.begin().await?;

upsert_asset_data(
let download_metadata_info = upsert_asset_data(
&multi_txn,
id_bytes.to_vec(),
chain_mutability,
chain_data_json,
uri.clone(),
Mutability::Mutable,
JsonValue::String("processing".to_string()),
slot_i,
Some(true),
name.to_vec(),
symbol.to_vec(),
seq as i64,
Expand Down Expand Up @@ -207,11 +205,7 @@ where
return Ok(None);
}

Ok(Some(DownloadMetadataInfo::new(
id_bytes.to_vec(),
uri,
slot_i,
)))
Ok(download_metadata_info)
}
_ => Err(ProgramTransformerError::NotImplemented),
};
Expand Down
1 change: 1 addition & 0 deletions program_transformers/src/bubblegum/transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ where
}
}
}

Err(ProgramTransformerError::ParsingError(
"Ix not parsed correctly".to_string(),
))
Expand Down
12 changes: 3 additions & 9 deletions program_transformers/src/bubblegum/update_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use {
},
json::ChainDataV1,
},
sea_orm::{query::*, ConnectionTrait, JsonValue},
sea_orm::{query::*, ConnectionTrait},
tracing::warn,
};

Expand Down Expand Up @@ -114,16 +114,14 @@ where
// automatically rolled back.
let multi_txn = txn.begin().await?;

upsert_asset_data(
let download_metadata_info = upsert_asset_data(
&multi_txn,
id_bytes.to_vec(),
chain_mutability,
chain_data_json,
uri.clone(),
Mutability::Mutable,
JsonValue::String("processing".to_string()),
slot_i,
Some(true),
name.into_bytes().to_vec(),
symbol.into_bytes().to_vec(),
seq as i64,
Expand Down Expand Up @@ -188,11 +186,7 @@ where
return Ok(None);
}

Ok(Some(DownloadMetadataInfo::new(
id_bytes.to_vec(),
uri,
slot_i,
)))
Ok(download_metadata_info)
}
_ => Err(ProgramTransformerError::NotImplemented),
};
Expand Down

0 comments on commit 7f3b391

Please sign in to comment.