Skip to content

Commit

Permalink
spline #1155 Introduce secondary transaction ID
Browse files Browse the repository at this point in the history
  • Loading branch information
wajda committed Jul 8, 2024
1 parent 33c48c5 commit a4b9c8d
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 26 deletions.
3 changes: 2 additions & 1 deletion arangodb-foxx-services/src/main/persistence/model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ export type TxNum = number
*/
export type WriteTxInfo = {
num: TxNum
uid: TxId
uid: TxId // primary key - universally unique // TODO: double check with ArangoDB if it's indeed unique for entire collection life even if records are constantly removed
sid: TxId // secondary key - for deduplication purpose
params: TxParams
}

Expand Down
18 changes: 10 additions & 8 deletions arangodb-foxx-services/src/main/services/execution-event-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,16 @@ export function storeExecutionEvent(progress: Progress): void {
const progressEdge: Partial<ArangoDB.Edge> =
edge(CollectionName.Progress, progress._key, CollectionName.ExecutionPlan, progress.planKey, progress._key)

const wtxInfo: WriteTxInfo = TxManager.startWrite({
execEventInfo: {
_key: progress._key,
planKey: progress.planKey,
timestamp: progress.timestamp,
error: progress.error,
}
})
const wtxInfo: WriteTxInfo = TxManager.startWrite(
progress._key,
{
execEventInfo: {
_key: progress._key,
planKey: progress.planKey,
timestamp: progress.timestamp,
error: progress.error,
}
})

try {
store.insertOne(progressWithPlanDetails, CollectionName.Progress, wtxInfo)
Expand Down
15 changes: 9 additions & 6 deletions arangodb-foxx-services/src/main/services/execution-plan-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@ import { TxManager } from './txm'


export function storeExecutionPlan(eppm: ExecutionPlanPersistentModel): void {
withTimeTracking(`STORE PLAN ${eppm.executionPlan._key}`, () => {
const txInfo: WriteTxInfo = TxManager.startWrite({
execPlanInfo: {
_key: eppm.executionPlan._key,
}
})
const execPlanKey = eppm.executionPlan._key
withTimeTracking(`STORE PLAN ${execPlanKey}`, () => {
const txInfo: WriteTxInfo = TxManager.startWrite(
execPlanKey,
{
execPlanInfo: {
_key: execPlanKey,
}
})

try {
// execution plan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import { TxManager } from './tx-manager'
import events from 'events'
import { ReadTxInfo, TxAwareDocument, TxEvent, TxParams, WriteTxInfo } from '../../persistence/model'
import { ReadTxInfo, TxAwareDocument, TxEvent, TxId, TxParams, WriteTxInfo } from '../../persistence/model'
import * as Logger from '../../utils/logger'


Expand Down Expand Up @@ -64,8 +64,8 @@ export class SubscribableTxManagerDecorator implements TxManager {
return this.internalTxManager.startRead()
}

startWrite(txParams: TxParams): WriteTxInfo {
const wtxInfo = this.internalTxManager.startWrite(txParams)
startWrite(sid: TxId, txParams: TxParams): WriteTxInfo {
const wtxInfo = this.internalTxManager.startWrite(sid, txParams)
this.eventsEmitter.emit(TxEvent.StartWrite, wtxInfo)
return wtxInfo
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ export class TxManagerImpl implements TxManager {
throw new Error(`Failed to obtain a new Tx number after ${MAX_GET_TX_NUM_ATTEMPTS} attempts.`)
}

startWrite(txParams: TxParams = {}): WriteTxInfo {
startWrite(sid: TxId, txParams: TxParams = {}): WriteTxInfo {
// The following steps must be executed in the given exact order
// (opposite to one for the READ transaction) as follows:

Expand All @@ -91,6 +91,7 @@ export class TxManagerImpl implements TxManager {
const wtxInfo: WriteTxInfo = {
num: txNum,
uid: txId,
sid: sid,
params: txParams,
}
Logger.debug('[TX] WRITE STARTED', wtxInfo)
Expand Down
9 changes: 6 additions & 3 deletions arangodb-foxx-services/src/main/services/txm/tx-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,18 @@
* limitations under the License.
*/

import { ReadTxInfo, TxAwareDocument, TxParams, WriteTxInfo } from '../../persistence/model'
import { ReadTxInfo, TxAwareDocument, TxId, TxParams, WriteTxInfo } from '../../persistence/model'


export interface TxManager {
/**
* Start new logical transaction for WRITE
* @param sid secondary transaction ID - serves for deduplication purposes.
* It MUST be unique if and only if the data written by the transaction is logically unique.
* @param txParams additional parameters associated and stored with this transaction info.
* @return new WRITE transaction metadata
*/
startWrite(txParams: TxParams): WriteTxInfo
startWrite(sid: TxId, txParams: TxParams): WriteTxInfo

/**
* Start new logical transaction for READ
Expand All @@ -45,7 +48,7 @@ export interface TxManager {
/**
* Checks if all the `docs` are visible from the `rtx` READ transaction.
* The document is visible from the READ transaction if:
* = the document was created out of any logical transaction, OR
* - the document was created out of any logical transaction, OR
* - the doc's associated WRITE transaction has been committed
* before the current READ one started.
* @param rtx Read transaction info object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

import { ReadTxInfo, TxAwareDocument, TxEvent, TxParams, WriteTxInfo } from '../../../../src/main/persistence/model'
import { ReadTxInfo, TxAwareDocument, TxEvent, TxId, TxParams, WriteTxInfo } from '../../../../src/main/persistence/model'
import { SubscribableTxManagerDecorator } from '../../../../src/main/services/txm/subcribable-tx-manager-decorator'


Expand Down Expand Up @@ -60,11 +60,12 @@ test('startRead() should be delegated', () => {
})

test('startWrite() should be delegated', () => {
const dummySID: TxId = "dummy_tx_sid"
const dummyTxParams: TxParams = {}
const dummyResult = {}
mockTxManagerImpl.startWrite.mockReturnValue(dummyResult)

const res = txManagerDecorator.startWrite(dummyTxParams)
const res = txManagerDecorator.startWrite(dummySID, dummyTxParams)
expect(res).toBe(dummyResult)

expect(mockTxManagerImpl.startRead.mock.calls.length).toBe(0)
Expand All @@ -73,7 +74,8 @@ test('startWrite() should be delegated', () => {
expect(mockTxManagerImpl.rollback.mock.calls.length).toBe(0)
expect(mockTxManagerImpl.isVisibleFromTx.mock.calls.length).toBe(0)

expect(mockTxManagerImpl.startWrite.mock.lastCall[0]).toBe(dummyTxParams)
expect(mockTxManagerImpl.startWrite.mock.lastCall[0]).toBe(dummySID)
expect(mockTxManagerImpl.startWrite.mock.lastCall[1]).toBe(dummyTxParams)
})

test('commit() should be delegated', () => {
Expand Down Expand Up @@ -133,7 +135,7 @@ test('on(TX_START_WRITE)', () => {
const dummyTxInfo: WriteTxInfo = {}
mockTxManagerImpl.startWrite.mockReturnValue(dummyTxInfo)

txManagerDecorator.startWrite({})
txManagerDecorator.startWrite(null, null)

expect(startWriteHandler.mock.calls.length).toBe(1)
expect(preCommitHandler.mock.calls.length).toBe(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@ object CollectionDef {
override def collectionType = CollectionType.DOCUMENT

override def name: String = "txInfo"

override def indexDefs: Seq[IndexDef] = Seq(
IndexDef(Seq("sid"), (new PersistentIndexOptions).unique(true))
)
}

}
Expand Down

0 comments on commit a4b9c8d

Please sign in to comment.