Skip to content

Commit

Permalink
feat: update transactions based on subscriptions events and remove po…
Browse files Browse the repository at this point in the history
…lling (#583)

* fix: only show deployed contract notifications when address changes

* transaction subscription and refresh working

* rename plugin

* cleanup

* move plugin to a hook, rename back to listener, fix notification logic and improve deployment detection

* cleanup logs and todos

* cleanup

* move notify back, update unit tests

* implement tx subscription system

* cleanup, fix tests

* add unit test for listener hook

* remove pending computed test, add unit test for tx refresh

* exclude frontend test folder from sonarcloud analysis

* remove client_session_id from transactions

---------

Co-authored-by: Den <[email protected]>
Co-authored-by: Cristiam Da Silva <[email protected]>
  • Loading branch information
3 people authored Nov 14, 2024
1 parent cb516e4 commit 771db2a
Show file tree
Hide file tree
Showing 22 changed files with 310 additions and 134 deletions.
5 changes: 1 addition & 4 deletions backend/consensus/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,7 @@ async def exec_transaction(
Node,
] = node_factory,
):
msg_handler = self.msg_handler.with_client_session(
transaction.client_session_id
)
msg_handler = self.msg_handler
if (
transactions_processor.get_transaction_by_hash(transaction.hash)["status"]
!= TransactionStatus.PENDING.value
Expand Down Expand Up @@ -361,7 +359,6 @@ async def exec_transaction(
type=TransactionType.RUN_CONTRACT.value,
nonce=nonce,
leader_only=transaction.leader_only, # Cascade
client_session_id=transaction.client_session_id,
triggered_by_hash=transaction.hash,
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""remove client session id from transactions
Revision ID: 579e86111b36
Revises: ab256b41602a
Create Date: 2024-11-08 10:41:56.112444
"""

from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision: str = "579e86111b36"
down_revision: Union[str, None] = "ab256b41602a"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("transactions", "client_session_id")
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"transactions",
sa.Column(
"client_session_id",
sa.VARCHAR(length=255),
autoincrement=False,
nullable=True,
),
)
# ### end Alembic commands ###
3 changes: 0 additions & 3 deletions backend/database_handler/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,6 @@ class Transactions(Base):
created_at: Mapped[Optional[datetime.datetime]] = mapped_column(
DateTime(True), server_default=func.current_timestamp(), init=False
)
client_session_id: Mapped[Optional[str]] = mapped_column(
String(255)
) # Used to identify the client session that is subscribed to this transaction's events
leader_only: Mapped[bool] = mapped_column(Boolean)
r: Mapped[Optional[int]] = mapped_column(Integer)
s: Mapped[Optional[int]] = mapped_column(Integer)
Expand Down
3 changes: 0 additions & 3 deletions backend/database_handler/transactions_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ def _parse_transaction_data(transaction_data: Transactions) -> dict:
"v": transaction_data.v,
"created_at": transaction_data.created_at.isoformat(),
"leader_only": transaction_data.leader_only,
"client_session_id": transaction_data.client_session_id,
"triggered_by": transaction_data.triggered_by_hash,
"triggered_transactions": [
transaction.hash
Expand Down Expand Up @@ -113,7 +112,6 @@ def insert_transaction(
type: int,
nonce: int,
leader_only: bool,
client_session_id: str | None,
triggered_by_hash: (
str | None
) = None, # If filled, the transaction must be present in the database (committed)
Expand Down Expand Up @@ -146,7 +144,6 @@ def insert_transaction(
s=None,
v=None,
leader_only=leader_only,
client_session_id=client_session_id,
triggered_by=(
self.session.query(Transactions).filter_by(hash=triggered_by_hash).one()
if triggered_by_hash
Expand Down
3 changes: 0 additions & 3 deletions backend/domain/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ class Transaction:
leader_only: bool = (
False # Flag to indicate if this transaction should be processed only by the leader. Used for fast and cheap execution of transactions.
)
client_session_id: str | None = None

def to_dict(self):
return {
Expand All @@ -98,7 +97,6 @@ def to_dict(self):
"s": self.s,
"v": self.v,
"leader_only": self.leader_only,
"client_session_id": self.client_session_id,
}


Expand All @@ -119,5 +117,4 @@ def transaction_from_dict(input: dict) -> Transaction:
s=input.get("s"),
v=input.get("v"),
leader_only=input.get("leader_only", False),
client_session_id=input["client_session_id"],
)
3 changes: 1 addition & 2 deletions backend/protocol_rpc/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def fund_account(

nonce = transactions_processor.get_transaction_count(None)
transaction_hash = transactions_processor.insert_transaction(
None, account_address, None, amount, 0, nonce, False, get_client_session_id()
None, account_address, None, amount, 0, nonce, False
)
return transaction_hash

Expand Down Expand Up @@ -485,7 +485,6 @@ def send_raw_transaction(
transaction_type,
nonce,
leader_only,
get_client_session_id(),
)

return transaction_hash
Expand Down
21 changes: 14 additions & 7 deletions backend/protocol_rpc/message_handler/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,20 @@ def log_endpoint_info(self, func):
return log_endpoint_info_wrapper(self, self.config)(func)

def _socket_emit(self, log_event: LogEvent):
self.socketio.emit(
log_event.name,
log_event.to_dict(),
to=log_event.client_session_id
or self.client_session_id
or get_client_session_id(),
)
if log_event.name == "transaction_status_updated":
self.socketio.emit(
log_event.name,
log_event.to_dict(),
room=log_event.data.get("hash"),
)
else:
self.socketio.emit(
log_event.name,
log_event.to_dict(),
to=log_event.client_session_id
or self.client_session_id
or get_client_session_id(),
)

def _log_message(self, log_event: LogEvent):
logging_status = log_event.type.value
Expand Down
13 changes: 12 additions & 1 deletion backend/protocol_rpc/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import logging
from flask import Flask
from flask_jsonrpc import JSONRPC
from flask_socketio import SocketIO
from flask_socketio import SocketIO, join_room, leave_room
from flask_cors import CORS
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import create_engine
Expand Down Expand Up @@ -131,6 +131,17 @@ def run_socketio():
host="0.0.0.0",
allow_unsafe_werkzeug=True,
)

@socketio.on("subscribe")
def handle_subscribe(topics):
for topic in topics:
join_room(topic)

@socketio.on("unsubscribe")
def handle_unsubscribe(topics):
for topic in topics:
leave_room(topic)

logging.getLogger("werkzeug").setLevel(
os.environ.get("FLASK_LOG_LEVEL", logging.ERROR)
)
Expand Down
1 change: 0 additions & 1 deletion frontend/src/components/Simulator/TransactionItem.vue
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ const nodeStore = useNodeStore();
const props = defineProps<{
transaction: TransactionItem;
}>();
console.log('🚀 ~ transaction:', props.transaction);
const isDetailsModalOpen = ref(false);
Expand Down
1 change: 1 addition & 0 deletions frontend/src/hooks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ export * from './useInputMap';
export * from './useContractQueries';
export * from './useFileName';
export * from './useSetupStores';
export * from './useTransactionListener';
6 changes: 5 additions & 1 deletion frontend/src/hooks/useSetupStores.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
useNodeStore,
useTutorialStore,
} from '@/stores';
import { useDb } from '@/hooks';
import { useDb, useTransactionListener } from '@/hooks';
import { v4 as uuidv4 } from 'uuid';
import type { Address } from '@/types';

Expand All @@ -17,6 +17,7 @@ export const useSetupStores = () => {
const nodeStore = useNodeStore();
const tutorialStore = useTutorialStore();
const db = useDb();
const transactionListener = useTransactionListener();
const contractFiles = await db.contractFiles.toArray();
const exampleFiles = contractFiles.filter((c) => c.example);

Expand Down Expand Up @@ -48,6 +49,9 @@ export const useSetupStores = () => {
contractsStore.deployedContracts = await db.deployedContracts.toArray();
transactionsStore.transactions = await db.transactions.toArray();

transactionsStore.initSubscriptions();
transactionsStore.refreshPendingTransactions();
transactionListener.init();
contractsStore.getInitialOpenedFiles();
tutorialStore.resetTutorialState();
nodeStore.getValidatorsData();
Expand Down
40 changes: 40 additions & 0 deletions frontend/src/hooks/useTransactionListener.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { useTransactionsStore } from '@/stores';
import type { TransactionItem } from '@/types';
import { useWebSocketClient } from '@/hooks';

export function useTransactionListener() {
const transactionsStore = useTransactionsStore();
const webSocketClient = useWebSocketClient();

function init() {
webSocketClient.on(
'transaction_status_updated',
handleTransactionStatusUpdate,
);
}

async function handleTransactionStatusUpdate(eventData: any) {
const newTx = await transactionsStore.getTransaction(eventData.data.hash);

if (!newTx) {
console.warn('Server tx not found for local tx:', newTx);
transactionsStore.removeTransaction(newTx);
return;
}

const currentTx = transactionsStore.transactions.find(
(t: TransactionItem) => t.hash === eventData.data.hash,
);

if (!currentTx) {
// This happens regularly when local transactions get cleared (e.g. user clears all txs or deploys new contract instance)
return;
}

transactionsStore.updateTransaction(newTx);
}

return {
init,
};
}
5 changes: 1 addition & 4 deletions frontend/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { createPinia } from 'pinia';
import Notifications from '@kyvg/vue3-notification';
import App from './App.vue';
import router from './router';
import { persistStorePlugin, TransactionsListenerPlugin } from '@/plugins';
import { persistStorePlugin } from '@/plugins';
import { VueSpinnersPlugin } from 'vue3-spinners';
import registerGlobalComponents from '@/components/global/registerGlobalComponents';
import { VueQueryPlugin } from '@tanstack/vue-query';
Expand Down Expand Up @@ -32,9 +32,6 @@ app.use(FloatingVue, {
});
app.use(Notifications);
app.use(VueSpinnersPlugin);
app.use(TransactionsListenerPlugin, {
interval: 5000,
});

const plausible = createPlausible({
init: {
Expand Down
1 change: 0 additions & 1 deletion frontend/src/plugins/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
export * from './persistStore';
export * from './transactionsListener';
64 changes: 0 additions & 64 deletions frontend/src/plugins/transactionsListener.ts

This file was deleted.

1 change: 1 addition & 0 deletions frontend/src/stores/contracts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ export const useContractsStore = defineStore('contractsStore', () => {
const index = deployedContracts.value.findIndex(
(c) => c.contractId === contractId,
);

const newItem = { contractId, address, defaultState };

if (index === -1) {
Expand Down
12 changes: 6 additions & 6 deletions frontend/src/stores/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ export const useNodeStore = defineStore('nodeStore', () => {
];

trackedEvents.forEach((eventName) => {
webSocketClient.on(eventName, (data: any) => {
webSocketClient.on(eventName, (eventData: any) => {
addLog({
scope: data.scope,
name: data.name,
type: data.type,
message: data.message,
data: data.data,
scope: eventData.scope,
name: eventData.name,
type: eventData.type,
message: eventData.message,
data: eventData.data,
});
});
});
Expand Down
Loading

0 comments on commit 771db2a

Please sign in to comment.