Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable out-of-order transaction processing for asset table #77

Merged
merged 36 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
ff389cc
Fix docker preparation script to build SPL
danenbm May 24, 2023
9299d79
Update owner and delegate in asset table when collection or creator v…
danenbm May 24, 2023
02d4a05
Modify program transformers to upsert in asset table
danenbm May 25, 2023
63b1e48
Update mint and decompress to be able to upsert asset info out of order
danenbm May 26, 2023
bee5977
Add second sequence number for compression status fields
danenbm May 26, 2023
69f9cc3
Reduce logging in docker
danenbm May 26, 2023
1dfdfd0
Comment out compressed_seq before regenerating Sea ORM objects
danenbm May 26, 2023
ba6b323
Merge branch 'main' into danenbm/update-asset
danenbm May 30, 2023
507b393
Add migration for asset specification
danenbm Jun 1, 2023
634030d
Update README
danenbm Jun 1, 2023
07b6712
Rename PNFT and regenerate Sea ORM types
danenbm Jun 2, 2023
238b043
Apply usage of compressed_seq after regenerating Sea ORM types
danenbm Jun 2, 2023
aa6fe0d
Merge branch 'main' into danenbm/update-asset
danenbm Jun 15, 2023
9eae93b
Add owner delegate sequence number for owner and delegate fields.
danenbm Jun 16, 2023
d4fa87d
Regenerating database types
danenbm Jun 16, 2023
060a2a0
Update handling for non null constrained asset table
danenbm Jun 20, 2023
9e06391
Update tests to use new Sea ORM types
danenbm Jun 20, 2023
b9427cd
Use owner_and_delegate_seq to separate upserts
danenbm Jun 20, 2023
0b030f2
Adding was_decompressed flag to replace compressed_seq
danenbm Jun 20, 2023
d42fca0
Regenerating Sea ORM types
danenbm Jun 20, 2023
ed8f03b
Update code to use was_decompressed flag
danenbm Jun 20, 2023
b1a349e
Fix new boolean SQL conditions
danenbm Jun 20, 2023
4c6e51d
Update comment
danenbm Jun 21, 2023
4c0e259
Remove column updates in asset table during mint for items not in model
danenbm Jun 21, 2023
2b4e5e6
Clippy fixes in ingester main
danenbm Jun 22, 2023
ff0e987
Merge branch 'main' into danenbm/update-asset
danenbm Jun 22, 2023
843dd6b
Cleanup debug comment
danenbm Jun 22, 2023
f13f7d5
Merge branch 'main' into danenbm/update-asset
danenbm Jun 22, 2023
2713a18
Allow for sequence number to be NULL (needed after decompress now)
danenbm Jun 22, 2023
f7a5143
Add leaf specific sequence number to protect that field in asset table
danenbm Jun 23, 2023
bf2d2bc
Revert "Allow for sequence number to be NULL (needed after decompress…
danenbm Jun 23, 2023
eeb56ac
Merge branch 'main' into danenbm/update-asset
danenbm Jun 27, 2023
7c9e1c1
Merge branch 'main' into danenbm/update-asset
danenbm Jun 28, 2023
5b7a33d
Update nft_ingester/src/program_transformers/bubblegum/redeem.rs
danenbm Jul 10, 2023
365a9bc
Merge branch 'main' into danenbm/update-asset
danenbm Jul 17, 2023
1de90db
Merge branch 'main' into danenbm/update-asset
danenbm Jul 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ Then with a local `DATABASE_URL` var exported like this `export DATABASE_URL=pos

If you need to install `sea-orm-cli` run `cargo install sea-orm-cli`.

Note: The current SeaORM types were generated using version 0.9.3 so unless you want to upgrade you can install using `cargo install sea-orm-cli --version 0.9.3`.

Also note: The migration `m20230224_093722_performance_improvements` needs to be commented out of the migration lib.rs in order for the Sea ORM `Relations` to generate correctly.
danenbm marked this conversation as resolved.
Show resolved Hide resolved

#### Developing Locally
*Prerequisites*
* A Postgres Server running with the database setup according to ./init.sql
Expand Down
3 changes: 3 additions & 0 deletions digital_asset_types/src/dao/generated/asset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub struct Model {
pub slot_updated: i64,
pub data_hash: Option<String>,
pub creator_hash: Option<String>,
pub compressed_seq: Option<i64>,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
Expand Down Expand Up @@ -72,6 +73,7 @@ pub enum Column {
SlotUpdated,
DataHash,
CreatorHash,
CompressedSeq,
}

#[derive(Copy, Clone, Debug, EnumIter, DerivePrimaryKey)]
Expand Down Expand Up @@ -124,6 +126,7 @@ impl ColumnTrait for Column {
Self::SlotUpdated => ColumnType::BigInteger.def(),
Self::DataHash => ColumnType::Char(Some(50u32)).def().null(),
Self::CreatorHash => ColumnType::Char(Some(50u32)).def().null(),
Self::CompressedSeq => ColumnType::BigInteger.def().null(),
}
}
}
Expand Down
100 changes: 50 additions & 50 deletions digital_asset_types/src/dao/generated/sea_orm_active_enums.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,29 @@ use serde::{Deserialize, Serialize};
#[sea_orm(
rs_type = "String",
db_type = "Enum",
enum_name = "specification_versions"
enum_name = "v1_account_attachments"
)]
pub enum SpecificationVersions {
pub enum V1AccountAttachments {
#[sea_orm(string_value = "edition")]
Edition,
#[sea_orm(string_value = "edition_marker")]
EditionMarker,
#[sea_orm(string_value = "master_edition_v1")]
MasterEditionV1,
#[sea_orm(string_value = "master_edition_v2")]
MasterEditionV2,
#[sea_orm(string_value = "unknown")]
Unknown,
}
#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "owner_type")]
pub enum OwnerType {
#[sea_orm(string_value = "single")]
Single,
#[sea_orm(string_value = "token")]
Token,
#[sea_orm(string_value = "unknown")]
Unknown,
#[sea_orm(string_value = "v0")]
V0,
#[sea_orm(string_value = "v1")]
V1,
#[sea_orm(string_value = "v2")]
V2,
}
#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
#[sea_orm(
Expand All @@ -36,18 +48,6 @@ pub enum RoyaltyTargetType {
Unknown,
}
#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "task_status")]
pub enum TaskStatus {
#[sea_orm(string_value = "failed")]
Failed,
#[sea_orm(string_value = "pending")]
Pending,
#[sea_orm(string_value = "running")]
Running,
#[sea_orm(string_value = "success")]
Success,
}
#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "chain_mutability")]
pub enum ChainMutability {
#[sea_orm(string_value = "immutable")]
Expand All @@ -58,42 +58,32 @@ pub enum ChainMutability {
Unknown,
}
#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "owner_type")]
pub enum OwnerType {
#[sea_orm(string_value = "single")]
Single,
#[sea_orm(string_value = "token")]
Token,
#[sea_orm(string_value = "unknown")]
Unknown,
}
#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "mutability")]
pub enum Mutability {
#[sea_orm(string_value = "immutable")]
Immutable,
#[sea_orm(string_value = "mutable")]
Mutable,
#[sea_orm(string_value = "unknown")]
Unknown,
#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "task_status")]
pub enum TaskStatus {
#[sea_orm(string_value = "failed")]
Failed,
#[sea_orm(string_value = "pending")]
Pending,
#[sea_orm(string_value = "running")]
Running,
#[sea_orm(string_value = "success")]
Success,
}
#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
#[sea_orm(
rs_type = "String",
db_type = "Enum",
enum_name = "v1_account_attachments"
enum_name = "specification_versions"
)]
pub enum V1AccountAttachments {
#[sea_orm(string_value = "edition")]
Edition,
#[sea_orm(string_value = "edition_marker")]
EditionMarker,
#[sea_orm(string_value = "master_edition_v1")]
MasterEditionV1,
#[sea_orm(string_value = "master_edition_v2")]
MasterEditionV2,
pub enum SpecificationVersions {
#[sea_orm(string_value = "unknown")]
Unknown,
#[sea_orm(string_value = "v0")]
V0,
#[sea_orm(string_value = "v1")]
V1,
#[sea_orm(string_value = "v2")]
V2,
}
#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
#[sea_orm(
Expand All @@ -116,10 +106,20 @@ pub enum SpecificationAssetClass {
Print,
#[sea_orm(string_value = "PRINTABLE_NFT")]
PrintableNft,
#[sea_orm(string_value = "PROGRAMMABLE_NFT")]
ProgrammableNft,
#[sea_orm(string_value = "TRANSFER_RESTRICTED_NFT")]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can remove later

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is that, because we won't ever use it?

TransferRestrictedNft,
#[sea_orm(string_value = "unknown")]
Unknown,
#[sea_orm(string_value = "PNFT")]
ProgrammableNft,
}
#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "mutability")]
pub enum Mutability {
#[sea_orm(string_value = "immutable")]
Immutable,
#[sea_orm(string_value = "mutable")]
Mutable,
#[sea_orm(string_value = "unknown")]
Unknown,
}
3 changes: 2 additions & 1 deletion digital_asset_types/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,15 @@ pub fn create_asset(
royalty_target_type,
royalty_target,
royalty_amount,
asset_data: Some(id.clone()),
asset_data: Some(id),
burnt: false,
created_at: None,
specification_asset_class: SpecificationAssetClass::Nft,
slot_updated: 0,
data_hash: None,
alt_id: None,
creator_hash: None,
compressed_seq: Some(0),
},
)
}
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ services:
- ./ledger:/config:rw
- ./solana-test-validator-geyser-config:/plugin-config:rw
environment:
RUST_LOG: info
RUST_LOG: error
PLUGIN_MESSENGER_CONFIG.messenger_type: "Redis"
PLUGIN_MESSENGER_CONFIG.connection_config: '{redis_connection_str="redis://redis"}'
ports:
Expand Down
4 changes: 4 additions & 0 deletions migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ mod m20230203_205959_improve_upsert_perf;
mod m20230224_093722_performance_improvements;
mod m20230310_162227_add_indexes_to_bg;
mod m20230317_121944_remove_indexes_for_perf;
mod m20230526_120101_add_compressed_seq;
mod m20230601_120101_add_pnft_enum_val;

pub struct Migrator;

Expand All @@ -39,6 +41,8 @@ impl MigratorTrait for Migrator {
Box::new(m20230224_093722_performance_improvements::Migration),
Box::new(m20230310_162227_add_indexes_to_bg::Migration),
Box::new(m20230317_121944_remove_indexes_for_perf::Migration),
Box::new(m20230526_120101_add_compressed_seq::Migration),
Box::new(m20230601_120101_add_pnft_enum_val::Migration),
]
}
}
2 changes: 1 addition & 1 deletion migration/src/m20230224_093722_performance_improvements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl MigrationTrait for Migration {
Ok(())
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
async fn down(&self, _manager: &SchemaManager) -> Result<(), DbErr> {
Ok(())
}
}
32 changes: 32 additions & 0 deletions migration/src/m20230526_120101_add_compressed_seq.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use digital_asset_types::dao::asset;
use sea_orm_migration::prelude::*;

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// Replace the sample below with your own migration scripts
manager
.alter_table(
Table::alter()
.table(asset::Entity)
.add_column(ColumnDef::new(Alias::new("compressed_seq")).big_integer())
.to_owned(),
)
.await
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// Replace the sample below with your own migration scripts
manager
.alter_table(
Table::alter()
.table(asset::Entity)
.drop_column(Alias::new("compressed_seq"))
.to_owned(),
)
.await
}
}
29 changes: 29 additions & 0 deletions migration/src/m20230601_120101_add_pnft_enum_val.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use sea_orm_migration::{
prelude::*,
sea_orm::{ConnectionTrait, DatabaseBackend, Statement},
};

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// Replace the sample below with your own migration scripts
manager
.get_connection()
.execute(Statement::from_string(
DatabaseBackend::Postgres,
"ALTER TYPE specification_asset_class ADD VALUE IF NOT EXISTS 'PROGRAMMABLE_NFT';"
.to_string(),
))
.await?;

Ok(())
}

async fn down(&self, _manager: &SchemaManager) -> Result<(), DbErr> {
// Replace the sample below with your own migration scripts
Ok(())
}
}
58 changes: 46 additions & 12 deletions nft_ingester/src/program_transformers/bubblegum/burn.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use crate::{
error::IngesterError,
};
use super::{update_asset, save_changelog_event};
use super::{save_changelog_event, upsert_asset_with_leaf_schema};
use crate::error::IngesterError;
use blockbuster::{
instruction::InstructionBundle,
programs::bubblegum::{BubblegumInstruction, LeafSchema},
};
use digital_asset_types::dao::asset;
use sea_orm::{entity::*, ConnectionTrait, TransactionTrait};
use sea_orm::{
entity::*, query::*, sea_query::OnConflict, ConnectionTrait, DbBackend, EntityTrait,
TransactionTrait,
};

pub async fn burn<'c, T>(
parsing_result: &BubblegumInstruction,
Expand All @@ -19,18 +20,51 @@ where
{
if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) {
let seq = save_changelog_event(cl, bundle.slot, txn).await?;
#[allow(unreachable_patterns)]
return match le.schema {
LeafSchema::V1 { id, .. } => {
LeafSchema::V1 {
id,
delegate,
owner,
..
} => {
let id_bytes = id.to_bytes().to_vec();
let asset_to_update = asset::ActiveModel {
id: Unchanged(id_bytes.clone()),

let asset_model = asset::ActiveModel {
id: Set(id_bytes.to_vec()),
burnt: Set(true),
seq: Set(seq as i64), // gummyroll seq
..Default::default()
};
// Don't send sequence number with this update, because we will always
// run this update even if it's from a backfill/replay.
update_asset(txn, id_bytes, None, asset_to_update).await

// Upsert asset table `burnt` column.
let query = asset::Entity::insert(asset_model)
.on_conflict(
OnConflict::columns([asset::Column::Id])
.update_columns([
asset::Column::Burnt,
//TODO maybe handle slot updated.
])
.to_owned(),
)
.build(DbBackend::Postgres);
txn.execute(query).await?;

// Partial update of asset table with just leaf schema elements.
let delegate = if owner == delegate {
None
} else {
Some(delegate.to_bytes().to_vec())
};
let owner_bytes = owner.to_bytes().to_vec();
upsert_asset_with_leaf_schema(
txn,
id_bytes.clone(),
le.leaf_hash.to_vec(),
delegate,
owner_bytes,
seq as i64,
)
.await
}
_ => Err(IngesterError::NotImplemented),
};
Expand Down
Loading