diff --git a/storage/innobase/clone/clone0copy.cc b/storage/innobase/clone/clone0copy.cc index c7d488f1dbd1..63306a6a9d60 100644 --- a/storage/innobase/clone/clone0copy.cc +++ b/storage/innobase/clone/clone0copy.cc @@ -327,14 +327,14 @@ int Clone_Snapshot::update_binlog_position() { } int Clone_Snapshot::wait_trx_end(THD *thd, trx_id_t trx_id) { - auto trx = trx_rw_is_active(trx_id, nullptr, false); + auto trx = trx_sys->rw_trx_hash.find(trx_id, false); if (trx == nullptr) { return (0); } auto wait_cond = [&](bool alert, bool &result) { /* Check if transaction is still active. */ - auto trx = trx_rw_is_active(trx_id, nullptr, false); + auto trx = trx_sys->rw_trx_hash.find(trx_id, false); if (trx == nullptr) { result = false; return (0); diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc index fc194d2e4939..4c73ce703336 100644 --- a/storage/innobase/handler/ha_innodb.cc +++ b/storage/innobase/handler/ha_innodb.cc @@ -652,6 +652,7 @@ static PSI_mutex_info all_innodb_mutexes[] = { #endif /* UNIV_DEBUG */ PSI_MUTEX_KEY(rw_lock_list_mutex, 0, 0, PSI_DOCUMENT_ME), PSI_MUTEX_KEY(rw_lock_mutex, 0, 0, PSI_DOCUMENT_ME), + PSI_MUTEX_KEY(rw_trx_hash_element_mutex, 0, 0, PSI_DOCUMENT_ME), PSI_MUTEX_KEY(srv_innodb_monitor_mutex, 0, 0, PSI_DOCUMENT_ME), PSI_MUTEX_KEY(srv_misc_tmpfile_mutex, 0, 0, PSI_DOCUMENT_ME), PSI_MUTEX_KEY(srv_monitor_file_mutex, 0, 0, PSI_DOCUMENT_ME), @@ -19322,6 +19323,22 @@ static bool innodb_buffer_pool_size_validate(THD *thd, return true; } +/** + Gets current trx. + + This function may be called during InnoDB initialisation, when + innodb_hton_ptr->slot is not yet set to meaningful value. +*/ +trx_t *current_trx() { + THD *thd = current_thd; + if (likely(thd != nullptr) && innodb_hton_ptr->slot != HA_SLOT_UNDEF) { + trx_t *&trx = thd_to_trx(thd); + return (trx); + } else { + return (nullptr); + } +} + /** Update the system variable innodb_buffer_pool_size using the "saved" value. This function is registered as a callback with MySQL. @param[in] thd thread handle diff --git a/storage/innobase/include/row0vers.h b/storage/innobase/include/row0vers.h index b6cc006b126c..f06eb45e6f33 100644 --- a/storage/innobase/include/row0vers.h +++ b/storage/innobase/include/row0vers.h @@ -48,6 +48,7 @@ class ReadView; /** Finds out if an active transaction has inserted or modified a secondary index record. + @param[in,out] caller_trx trx of current thread @param[in] rec record in a secondary index @param[in] index the secondary index @param[in] offsets rec_get_offsets(rec, index) @@ -56,8 +57,8 @@ class ReadView; negatives. The caller must confirm all positive results by checking if the trx is still active. */ -trx_t *row_vers_impl_x_locked(const rec_t *rec, const dict_index_t *index, - const ulint *offsets); +trx_t *row_vers_impl_x_locked(trx_t *caller_trx, const rec_t *rec, + const dict_index_t *index, const ulint *offsets); /** Finds out if we must preserve a delete marked earlier version of a clustered index record, because it is >= the purge view. @@ -119,6 +120,7 @@ dberr_t row_vers_build_for_consistent_read( /** Constructs the last committed version of a clustered index record, which should be seen by a semi-consistent read. */ void row_vers_build_for_semi_consistent_read( + trx_t* caller_trx, /*!< in/out: trx of current thread */ const rec_t *rec, /*!< in: record in a clustered index; the caller must have a latch on the page; this latch locks the top of the stack of versions diff --git a/storage/innobase/include/sync0sync.h b/storage/innobase/include/sync0sync.h index cab905f81146..46fcb564a997 100644 --- a/storage/innobase/include/sync0sync.h +++ b/storage/innobase/include/sync0sync.h @@ -175,6 +175,7 @@ extern mysql_pfs_key_t clone_task_mutex_key; extern mysql_pfs_key_t clone_snapshot_mutex_key; extern mysql_pfs_key_t parallel_read_mutex_key; extern mysql_pfs_key_t dblwr_mutex_key; +extern mysql_pfs_key_t rw_trx_hash_element_mutex_key; #endif /* UNIV_PFS_MUTEX */ #ifdef UNIV_PFS_RWLOCK diff --git a/storage/innobase/include/sync0types.h b/storage/innobase/include/sync0types.h index 31a42c5c98ce..608fc7a44fb9 100644 --- a/storage/innobase/include/sync0types.h +++ b/storage/innobase/include/sync0types.h @@ -275,6 +275,7 @@ enum latch_level_t { SYNC_THREADS, SYNC_TRX, SYNC_POOL, + SYNC_RW_TRX_HASH_ELEMENT, SYNC_POOL_MANAGER, SYNC_TRX_SYS, SYNC_LOCK_SYS_SHARDED, @@ -466,6 +467,7 @@ enum latch_id_t { LATCH_ID_DBLR, LATCH_ID_REDO_LOG_ARCHIVE_ADMIN_MUTEX, LATCH_ID_REDO_LOG_ARCHIVE_QUEUE_MUTEX, + LATCH_ID_RW_TRX_HASH_ELEMENT, LATCH_ID_TEST_MUTEX, LATCH_ID_MAX = LATCH_ID_TEST_MUTEX }; diff --git a/storage/innobase/include/trx0sys.h b/storage/innobase/include/trx0sys.h index 49aed07851cb..871f24a5c09e 100644 --- a/storage/innobase/include/trx0sys.h +++ b/storage/innobase/include/trx0sys.h @@ -49,6 +49,7 @@ this program; if not, write to the Free Software Foundation, Inc., #include #include #include "trx0trx.h" +#include "lf.h" #ifndef UNIV_HOTBACKUP typedef UT_LIST_BASE_NODE_T(trx_t) trx_ut_list_t; @@ -60,6 +61,239 @@ class ReadView; /** The transaction system */ extern trx_sys_t *trx_sys; +trx_t *current_trx(); + +struct rw_trx_hash_element_t { + rw_trx_hash_element_t() : id(0), trx(nullptr) { + mutex_create(LATCH_ID_RW_TRX_HASH_ELEMENT, &mutex); + } + + ~rw_trx_hash_element_t() { mutex_free(&mutex); } + + trx_id_t id; /* lf_hash_init() relies on this to be first in the struct */ + trx_t *trx; + ib_mutex_t mutex; +}; + +/** + Wrapper around LF_HASH to store set of in memory read-write transactions. +*/ + +class rw_trx_hash_t { + LF_HASH hash; + + /** + Constructor callback for lock-free allocator. + + Object is just allocated and is not yet accessible via rw_trx_hash by + concurrent threads. Object can be reused multiple times before it is freed. + Every time object is being reused initializer() callback is called. + */ + + static void rw_trx_hash_constructor(uchar *arg) { + new (arg + LF_HASH_OVERHEAD) rw_trx_hash_element_t(); + } + + /** + Destructor callback for lock-free allocator. + + Object is about to be freed and is not accessible via rw_trx_hash by + concurrent threads. + */ + + static void rw_trx_hash_destructor(uchar *arg) { + reinterpret_cast(arg + LF_HASH_OVERHEAD) + ->~rw_trx_hash_element_t(); + } + + /** + Initializer callback for lock-free hash. + + Object is not yet accessible via rw_trx_hash by concurrent threads, but is + about to become such. Object id can be changed only by this callback and + remains the same until all pins to this object are released. + + Object trx can be changed to 0 by erase() under object mutex protection, + which indicates it is about to be removed from lock-free hash and become + not accessible by concurrent threads. + */ + + static void rw_trx_hash_initializer(rw_trx_hash_element_t *element, + trx_t *trx) { + element->trx = trx; + element->id = trx->id; + trx->rw_trx_hash_element = element; + } + + /** + Gets LF_HASH pins. + + Pins are used to protect object from being destroyed or reused. They are + normally stored in trx object for quick access. If caller doesn't have trx + available, we try to get it using currnet_trx(). If caller doesn't have trx + at all, temporary pins are allocated. + */ + + LF_PINS *get_pins(trx_t *trx) { + if (!trx->rw_trx_hash_pins) { + trx->rw_trx_hash_pins = lf_hash_get_pins(&hash); + ut_a(trx->rw_trx_hash_pins != nullptr); + } + return trx->rw_trx_hash_pins; + } + + public: + void init() { + lf_hash_init(&hash, sizeof(rw_trx_hash_element_t), LF_HASH_UNIQUE, 0, + sizeof(trx_id_t), nullptr, &my_charset_bin); + hash.alloc.constructor = rw_trx_hash_constructor; + hash.alloc.destructor = rw_trx_hash_destructor; + hash.initialize = + reinterpret_cast(rw_trx_hash_initializer); + } + + void destroy() { lf_hash_destroy(&hash); } + + /** + Releases LF_HASH pins. + + Must be called by thread that owns trx_t object when the latter is being + "detached" from thread (e.g. released to the pool by trx_free()). Can be + called earlier if thread is expected not to use rw_trx_hash. + + Since pins are not allowed to be transferred to another thread, + initialisation thread calls this for recovered transactions. + */ + + void put_pins(trx_t *trx) { + if (trx->rw_trx_hash_pins) { + lf_hash_put_pins(trx->rw_trx_hash_pins); + trx->rw_trx_hash_pins = nullptr; + } + } + +#ifdef UNIV_DEBUG + static void validate_element(trx_t *trx) { + mutex_enter(&trx->mutex); + ut_ad(trx_state_eq(trx, TRX_STATE_ACTIVE) || + trx_state_eq(trx, TRX_STATE_PREPARED)); + mutex_exit(&trx->mutex); + } +#endif // UNIV_DEBUG + + /** + Finds trx object in lock-free hash with given id. + + Only ACTIVE or PREPARED trx objects may participate in hash. Nevertheless + the transaction may get committed before this method returns. + + With do_ref_count == false the caller may dereference returned trx pointer + only if lock_sys->mutex was acquired before calling find(). + + With do_ref_count == true caller may dereference trx even if it is not + holding lock_sys->mutex. Caller is responsible for calling + trx_release_reference() when it is done playing with trx. + + Ideally this method should get caller rw_trx_hash_pins along with trx + object as a parameter, similar to insert() and erase(). However most + callers lose trx early in their call chains and it is not that easy to pass + them through. + + So we take more expensive approach: get trx through current_thd()->ha_data. + Some threads don't have trx attached to THD, and at least server + initialisation thread, fts_optimize_thread, srv_master_thread, + dict_stats_thread, srv_monitor_thread, btr_defragment_thread don't even + have THD at all. For such cases we allocate pins only for duration of + search and free them immediately. + + This has negative performance impact and should be fixed eventually (by + passing caller_trx as a parameter). Still stream of DML is more or less Ok. + + @return + @retval 0 not found + @retval pointer to trx + */ + + trx_t *find(trx_t *caller_trx, trx_id_t trx_id, bool do_ref_count = false) { + if (!trx_id) { + return nullptr; + } + + trx_t *trx = nullptr; + LF_PINS *pins = caller_trx ? get_pins(caller_trx) : lf_hash_get_pins(&hash); + ut_a(pins != nullptr); + + auto element = reinterpret_cast( + lf_hash_search(&hash, pins, reinterpret_cast(&trx_id), + sizeof(trx_id_t))); + if (element) { + mutex_enter(&element->mutex); + lf_hash_search_unpin(pins); + if ((trx = element->trx)) { + ut_d(validate_element(trx)); + if (do_ref_count) { + trx->reference(); + } + } + mutex_exit(&element->mutex); + } + else { + lf_hash_search_unpin(pins); + } + if (!caller_trx) { + lf_hash_put_pins(pins); + } + return trx; + } + + trx_t *find(trx_id_t trx_id, bool do_ref_count = false) { + return find(current_trx(), trx_id, do_ref_count); + } + + /** + Inserts trx to lock-free hash. + + Object becomes accessible via rw_trx_hash. + */ + + void insert(trx_t *trx) { + ut_d(validate_element(trx)); + int res = + lf_hash_insert(&hash, get_pins(trx), reinterpret_cast(trx)); + ut_a(res == 0); + } + + /** + Removes trx from lock-free hash. + + Object becomes not accessible via rw_trx_hash. But it still can be pinned + by concurrent find(), which is supposed to release it immediately after + it sees object trx is 0. + */ + + void erase(trx_t *trx) { + ut_d(validate_element(trx)); + mutex_enter(&trx->rw_trx_hash_element->mutex); + trx->rw_trx_hash_element->trx = nullptr; + mutex_exit(&trx->rw_trx_hash_element->mutex); + int res = lf_hash_delete(&hash, get_pins(trx), + reinterpret_cast(&trx->id), + sizeof(trx_id_t)); + ut_a(res == 0); + } + + /** + Returns the number of elements in the hash. + + The number is exact only if hash is protected against concurrent + modifications (e.g. single threaded startup or hash is protected + by some mutex). Otherwise the number may be used as a hint only, + because it may change even before this method returns. + */ + + int32_t size() { return hash.count.load(std::memory_order_relaxed); } +}; + /** Checks if a page address is the trx sys header page. @param[in] page_id page id @return true if trx sys header page */ @@ -162,13 +396,6 @@ UNIV_INLINE trx_id_t trx_read_trx_id( const byte *ptr); /*!< in: pointer to memory from where to read */ -/** Looks for the trx handle with the given id in rw trxs list. - The caller must be holding trx_sys->mutex. - @param[in] trx_id trx id to search for - @return the trx handle or NULL if not found */ -UNIV_INLINE -trx_t *trx_get_rw_trx_by_id(trx_id_t trx_id); - /** Returns the minimum trx id in rw trx list. This is the smallest id for which the trx can possibly be active. (But, you must look at the trx->state to find out if the minimum trx id transaction itself is active, or already @@ -177,39 +404,6 @@ trx_t *trx_get_rw_trx_by_id(trx_id_t trx_id); UNIV_INLINE trx_id_t trx_rw_min_trx_id(void); -/** Checks if a rw transaction with the given id is active. -@param[in] trx_id trx id of the transaction -@param[in] corrupt NULL or pointer to a flag that will be set if - corrupt -@return transaction instance if active, or NULL */ -UNIV_INLINE -trx_t *trx_rw_is_active_low(trx_id_t trx_id, ibool *corrupt); - -/** Checks if a rw transaction with the given id is active. -Please note, that positive result means only that the trx was active -at some moment during the call, but it might have already become -TRX_STATE_COMMITTED_IN_MEMORY before the call returns to the caller, as this -transition is protected by trx->mutex and trx_sys->mutex, but it is impossible -for the caller to hold any of these mutexes when calling this function as the -function itself internally acquires trx_sys->mutex which would cause recurrent -mutex acquisition if caller already had trx_sys->mutex, or latching order -violation in case of holding trx->mutex. -@param[in] trx_id trx id of the transaction -@param[in] corrupt NULL or pointer to a flag that will be set if - corrupt -@param[in] do_ref_count if true then increment the trx_t::n_ref_count -@return transaction instance if active, or NULL; */ -UNIV_INLINE -trx_t *trx_rw_is_active(trx_id_t trx_id, ibool *corrupt, bool do_ref_count); - -#if defined UNIV_DEBUG || defined UNIV_BLOB_LIGHT_DEBUG -/** Assert that a transaction has been recovered. - @return true */ -UNIV_INLINE -ibool trx_assert_recovered(trx_id_t trx_id) /*!< in: transaction identifier */ - MY_ATTRIBUTE((warn_unused_result)); -#endif /* UNIV_DEBUG || UNIV_BLOB_LIGHT_DEBUG */ - /** Persist transaction number limit below which all transaction GTIDs are persisted to disk table. @param[in] gtid_trx_no transaction number */ @@ -259,11 +453,6 @@ Check if there are any active (non-prepared) transactions. @return total number of active transactions or 0 if none */ ulint trx_sys_any_active_transactions(void); #endif /* !UNIV_HOTBACKUP */ -/** -Add the transaction to the RW transaction set -@param trx transaction instance to add */ -UNIV_INLINE -void trx_sys_rw_trx_add(trx_t *trx); #ifdef UNIV_DEBUG /** Validate the trx_sys_t::rw_trx_list. @@ -448,13 +637,13 @@ struct trx_sys_t { transactions added for purge. */ #endif /* UNIV_DEBUG */ - char pad1[64]; /*!< To avoid false sharing */ + char pad1[ut::INNODB_CACHE_LINE_SIZE]; /*!< To avoid false sharing */ trx_ut_list_t rw_trx_list; /*!< List of active and committed in memory read-write transactions, sorted on trx id, biggest first. Recovered transactions are always on this list. */ - char pad2[64]; /*!< To avoid false sharing */ + char pad2[ut::INNODB_CACHE_LINE_SIZE]; /*!< To avoid false sharing */ trx_ut_list_t mysql_trx_list; /*!< List of transactions created for MySQL. All user transactions are on mysql_trx_list. The rw_trx_list @@ -474,7 +663,7 @@ struct trx_sys_t { to ensure right order of removal and consistent snapshot. */ - char pad3[64]; /*!< To avoid false sharing */ + char pad3[ut::INNODB_CACHE_LINE_SIZE]; /*!< To avoid false sharing */ Rsegs rsegs; /*!< Vector of pointers to rollback segments. These rsegs are iterated @@ -498,8 +687,14 @@ struct trx_sys_t { transactions), protected by rseg->mutex */ - TrxIdSet rw_trx_set; /*!< Mapping from transaction id - to transaction instance */ + const char rw_trx_hash_pre_pad[ut::INNODB_CACHE_LINE_SIZE]; + + /** + Lock-free hash of in memory read-write transactions. + */ + rw_trx_hash_t rw_trx_hash; + + const char rw_trx_hash_post_pad[ut::INNODB_CACHE_LINE_SIZE]; ulint n_prepared_trx; /*!< Number of transactions currently in the XA PREPARED state */ diff --git a/storage/innobase/include/trx0sys.ic b/storage/innobase/include/trx0sys.ic index 3dab5984b1cc..3ea606d0f26d 100644 --- a/storage/innobase/include/trx0sys.ic +++ b/storage/innobase/include/trx0sys.ic @@ -179,22 +179,6 @@ trx_id_t trx_read_trx_id( return (mach_read_from_6(ptr)); } -UNIV_INLINE -trx_t *trx_get_rw_trx_by_id(trx_id_t trx_id) { - ut_ad(trx_id > 0); - ut_ad(trx_sys_mutex_own()); - - if (trx_sys->rw_trx_set.empty()) { - return (nullptr); - } - - TrxIdSet::iterator it; - - it = trx_sys->rw_trx_set.find(TrxTrack(trx_id)); - - return (it == trx_sys->rw_trx_set.end() ? nullptr : it->m_trx); -} - /** Returns the minimum trx id in trx list. This is the smallest id for which the trx can possibly be active. (But, you must look at the trx->state to find out if the minimum trx id transaction itself is active, or already @@ -218,25 +202,6 @@ trx_id_t trx_rw_min_trx_id_low(void) { return (id); } -#if defined UNIV_DEBUG || defined UNIV_BLOB_LIGHT_DEBUG -/** Assert that a transaction has been recovered. - @return true */ -UNIV_INLINE -ibool trx_assert_recovered(trx_id_t trx_id) /*!< in: transaction identifier */ -{ - const trx_t *trx; - - trx_sys_mutex_enter(); - - trx = trx_get_rw_trx_by_id(trx_id); - ut_a(trx->is_recovered); - - trx_sys_mutex_exit(); - - return (TRUE); -} -#endif /* UNIV_DEBUG || UNIV_BLOB_LIGHT_DEBUG */ - /** Returns the minimum trx id in rw trx list. This is the smallest id for which the rw trx can possibly be active. (But, you must look at the trx->state to find out if the minimum trx id transaction itself is active, or already @@ -253,57 +218,6 @@ trx_id_t trx_rw_min_trx_id(void) { return (id); } -UNIV_INLINE -trx_t *trx_rw_is_active_low(trx_id_t trx_id, ibool *corrupt) { - trx_t *trx; - - ut_ad(trx_sys_mutex_own()); - - if (trx_id < trx_rw_min_trx_id_low()) { - trx = nullptr; - } else if (trx_id >= trx_sys->max_trx_id) { - /* There must be corruption: we let the caller handle the - diagnostic prints in this case. */ - - trx = nullptr; - if (corrupt != nullptr) { - *corrupt = TRUE; - } - } else { - trx = trx_get_rw_trx_by_id(trx_id); - /* We remove trx from rw trxs list and change state to - TRX_STATE_COMMITTED_IN_MEMORY in a same critical section protected by - trx_sys->mutex, which we happen to hold here, so we expect the state of trx - to match its presence in that list */ - ut_ad(trx == nullptr || !trx_state_eq(trx, TRX_STATE_COMMITTED_IN_MEMORY)); - } - - return (trx); -} - -UNIV_INLINE -trx_t *trx_rw_is_active(trx_id_t trx_id, ibool *corrupt, bool do_ref_count) { - trx_t *trx; - - /* Fast checking. If it's smaller than minimal active trx id, just - return NULL. */ - if (trx_sys->min_active_id.load() > trx_id) { - return (nullptr); - } - - trx_sys_mutex_enter(); - - trx = trx_rw_is_active_low(trx_id, corrupt); - - if (trx != nullptr) { - trx = trx_reference(trx, do_ref_count); - } - - trx_sys_mutex_exit(); - - return (trx); -} - /** Allocates a new transaction id. @return new, allocated trx id */ UNIV_INLINE @@ -363,15 +277,4 @@ bool trx_sys_need_rollback() { return (n_trx > 0); } -/** -Add the transaction to the RW transaction set -@param trx transaction instance to add */ -UNIV_INLINE -void trx_sys_rw_trx_add(trx_t *trx) { - ut_ad(trx->id != 0); - - trx_sys->rw_trx_set.insert(TrxTrack(trx->id, trx)); - ut_d(trx->in_rw_trx_list = true); -} - #endif /* !UNIV_HOTBACKUP */ diff --git a/storage/innobase/include/trx0trx.h b/storage/innobase/include/trx0trx.h index 94d5f8ffef72..96a277db12a8 100644 --- a/storage/innobase/include/trx0trx.h +++ b/storage/innobase/include/trx0trx.h @@ -53,6 +53,7 @@ this program; if not, write to the Free Software Foundation, Inc., #include "fts0fts.h" #endif /* !UNIV_HOTBACKUP */ #include "srv0srv.h" +#include "lf.h" // Forward declaration struct mtr_t; @@ -63,6 +64,8 @@ class ReadView; // Forward declaration class FlushObserver; +struct rw_trx_hash_element_t; + /** Dummy session used currently in MySQL interface */ extern sess_t *trx_dummy_sess; @@ -370,26 +373,6 @@ tagged as such. @param[in,out] trx Transaction that needs to be "upgraded" to RW from RO */ void trx_set_rw_mode(trx_t *trx); -/** -Increase the reference count. If the transaction is in state -TRX_STATE_COMMITTED_IN_MEMORY then the transaction is considered -committed and the reference count is not incremented. -@param trx Transaction that is being referenced -@param do_ref_count Increment the reference iff this is true -@return transaction instance if it is not committed */ -UNIV_INLINE -trx_t *trx_reference(trx_t *trx, bool do_ref_count); - -/** -Release the transaction. Decrease the reference count. -@param trx Transaction that is being released */ -UNIV_INLINE -void trx_release_reference(trx_t *trx); - -/** -Check if the transaction is being referenced. */ -#define trx_is_referenced(t) ((t)->n_ref > 0) - /** @param[in] requestor Transaction requesting the lock @param[in] holder Transaction holding the lock @@ -1195,13 +1178,12 @@ struct trx_t { const char *start_file; /*!< Filename where it was started */ #endif /* UNIV_DEBUG */ - lint n_ref; /*!< Count of references, protected - by trx_t::mutex. We can't release the - locks nor commit the transaction until - this reference is 0. We can change - the state to COMMITTED_IN_MEMORY to - signify that it is no longer - "active". */ + /** + Count of references. + We can't release the locks nor commit the transaction until this reference + is 0. We can change the state to COMMITTED_IN_MEMORY to signify + that it is no longer "active". */ + std::atomic_long n_ref; /** Version of this instance. It is incremented each time the instance is re-used in trx_start_low(). It is used to track @@ -1230,6 +1212,8 @@ struct trx_t { doing Non-locking Read-only Read Committed on DD tables */ #endif /* UNIV_DEBUG */ + rw_trx_hash_element_t *rw_trx_hash_element; + LF_PINS *rw_trx_hash_pins; ulint magic_n; bool is_read_uncommitted() const { @@ -1250,6 +1234,24 @@ struct trx_t { } bool allow_semi_consistent() const { return (skip_gap_locks()); } + + bool is_referenced() { return n_ref.load(std::memory_order_relaxed) > 0; } + + void reference() { +#ifdef UNIV_DEBUG + auto old_n_ref = +#endif + n_ref.fetch_add(1, std::memory_order_relaxed); + ut_ad(old_n_ref >= 0); + } + + void release_reference() { +#ifdef UNIV_DEBUG + auto old_n_ref = +#endif + n_ref.fetch_sub(1, std::memory_order_relaxed); + ut_ad(old_n_ref > 0); + } }; #ifndef UNIV_HOTBACKUP diff --git a/storage/innobase/include/trx0trx.ic b/storage/innobase/include/trx0trx.ic index 6171a1f5f10d..1b0eb83915b8 100644 --- a/storage/innobase/include/trx0trx.ic +++ b/storage/innobase/include/trx0trx.ic @@ -225,44 +225,6 @@ bool trx_is_rseg_assigned(const trx_t *trx) /*!< in: transaction */ trx->rsegs.m_noredo.rseg != nullptr); } -/** -Increase the reference count. If the transaction is in state -TRX_STATE_COMMITTED_IN_MEMORY then the transaction is considered -committed and the reference count is not incremented. -@param trx Transaction that is being referenced -@param do_ref_count Increment the reference iff this is true -@return transaction instance if it is not committed */ -UNIV_INLINE -trx_t *trx_reference(trx_t *trx, bool do_ref_count) { - trx_mutex_enter(trx); - - if (trx_state_eq(trx, TRX_STATE_COMMITTED_IN_MEMORY)) { - trx_mutex_exit(trx); - trx = nullptr; - } else if (do_ref_count) { - ut_ad(trx->n_ref >= 0); - ++trx->n_ref; - trx_mutex_exit(trx); - } else { - trx_mutex_exit(trx); - } - - return (trx); -} - -/** -Release the transaction. Decrease the reference count. -@param trx Transaction that is being released */ -UNIV_INLINE -void trx_release_reference(trx_t *trx) { - trx_mutex_enter(trx); - - ut_ad(trx->n_ref > 0); - --trx->n_ref; - - trx_mutex_exit(trx); -} - /** @param trx Get the active view for this transaction, if one exists @return the transaction's read view or NULL if one not assigned. */ diff --git a/storage/innobase/include/trx0types.h b/storage/innobase/include/trx0types.h index 88988b0cf630..dca3e5b83226 100644 --- a/storage/innobase/include/trx0types.h +++ b/storage/innobase/include/trx0types.h @@ -585,40 +585,6 @@ typedef std::priority_queue< typedef std::vector> trx_ids_t; -/** Mapping read-write transactions from id to transaction instance, for -creating read views and during trx id lookup for MVCC and locking. */ -struct TrxTrack { - explicit TrxTrack(trx_id_t id, trx_t *trx = nullptr) : m_id(id), m_trx(trx) { - // Do nothing - } - - trx_id_t m_id; - trx_t *m_trx; -}; - -struct TrxTrackHash { - size_t operator()(const TrxTrack &key) const { return (size_t(key.m_id)); } -}; - -/** -Comparator for TrxMap */ -struct TrxTrackHashCmp { - bool operator()(const TrxTrack &lhs, const TrxTrack &rhs) const { - return (lhs.m_id == rhs.m_id); - } -}; - -/** -Comparator for TrxMap */ -struct TrxTrackCmp { - bool operator()(const TrxTrack &lhs, const TrxTrack &rhs) const { - return (lhs.m_id < rhs.m_id); - } -}; - -// typedef std::unordered_set TrxIdSet; -typedef std::set> TrxIdSet; - struct TrxVersion { TrxVersion(trx_t *trx); diff --git a/storage/innobase/include/ut0cpu_cache.h b/storage/innobase/include/ut0cpu_cache.h index 40053e02dc3f..58bd06da4e39 100644 --- a/storage/innobase/include/ut0cpu_cache.h +++ b/storage/innobase/include/ut0cpu_cache.h @@ -34,7 +34,7 @@ Utilities related to CPU cache. */ namespace ut { /** CPU cache line size */ -#ifdef __powerpc__ +#if defined(__powerpc__) || defined(__aarch64__) constexpr size_t INNODB_CACHE_LINE_SIZE = 128; #else constexpr size_t INNODB_CACHE_LINE_SIZE = 64; diff --git a/storage/innobase/lock/lock0lock.cc b/storage/innobase/lock/lock0lock.cc index ced2e35f2d31..818fb6d7b086 100644 --- a/storage/innobase/lock/lock0lock.cc +++ b/storage/innobase/lock/lock0lock.cc @@ -850,6 +850,7 @@ static const lock_t *lock_rec_other_has_conflicting( /** Checks if some transaction has an implicit x-lock on a record in a secondary index. + @param[in/out] caller_trx trx of current thread @param[in] rec user record @param[in] index secondary index @param[in] offsets rec_get_offsets(rec, index) @@ -857,7 +858,8 @@ static const lock_t *lock_rec_other_has_conflicting( NOTE that this function can return false positives but never false negatives. The caller must confirm all positive results by checking if the trx is still active. */ -static trx_t *lock_sec_rec_some_has_impl(const rec_t *rec, dict_index_t *index, +static trx_t *lock_sec_rec_some_has_impl(trx_t *caller_trx, const rec_t *rec, + dict_index_t *index, const ulint *offsets) { trx_t *trx; trx_id_t max_trx_id; @@ -888,7 +890,7 @@ static trx_t *lock_sec_rec_some_has_impl(const rec_t *rec, dict_index_t *index, x-lock. We have to look in the clustered index. */ } else { - trx = row_vers_impl_x_locked(rec, index, offsets); + trx = row_vers_impl_x_locked(caller_trx, rec, index, offsets); } return (trx); @@ -912,14 +914,14 @@ static bool lock_rec_other_trx_holds_expl(ulint precise_mode, const trx_t *trx, /* We will inspect locks from various shards when inspecting transactions. */ locksys::Global_exclusive_latch_guard guard{}; - /* If trx_rw_is_active returns non-null impl_trx it only means that impl_trx + /* If rw_trx_hash.find returns non-null impl_trx it only means that impl_trx was active at some moment during the call, but might already be in TRX_STATE_COMMITTED_IN_MEMORY when we execute the body of the if. However, we hold exclusive latch on whole lock_sys, which prevents anyone from creating any new explicit locks. So, all explicit locks we will see must have been created at the time when the transaction was not committed yet. */ - if (trx_t *impl_trx = trx_rw_is_active(trx->id, nullptr, false)) { + if (trx_t *impl_trx = trx_sys->rw_trx_hash.find(trx->id, false)) { ulint heap_no = page_rec_get_heap_no(rec); mutex_enter(&trx_sys->mutex); @@ -5077,29 +5079,32 @@ static void rec_queue_validate_latched(const buf_block_t *block, trx_id = lock_clust_rec_some_has_impl(rec, index, offsets); - const trx_t *impl_trx = trx_rw_is_active_low(trx_id, nullptr); + trx_t *impl_trx = trx_sys->rw_trx_hash.find(trx_id, true); if (impl_trx != nullptr) { + ut_ad(impl_trx->is_referenced()); ut_ad(owns_page_shard(block->get_page_id())); ut_ad(trx_sys_mutex_own()); - /* impl_trx cannot become TRX_STATE_COMMITTED_IN_MEMORY nor removed from - rw_trx_set until we release trx_sys->mutex, which means that currently all - other threads in the system consider this impl_trx active and thus should - respect implicit locks held by impl_trx*/ - const lock_t *other_lock = - lock_rec_other_has_expl_req(LOCK_S, block, true, heap_no, impl_trx); - - /* The impl_trx is holding an implicit lock on the - given record 'rec'. So there cannot be another - explicit granted lock. Also, there can be another - explicit waiting lock only if the impl_trx has an - explicit granted lock. */ - - if (other_lock != nullptr) { - ut_a(lock_get_wait(other_lock)); - ut_a(lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP, block, heap_no, - impl_trx)); + /* trx->mutex is required. See trx_release_impl_and_expl_locks */ + trx_mutex_enter(impl_trx); + if (!trx_state_eq(impl_trx, TRX_STATE_COMMITTED_IN_MEMORY)) { + const lock_t *other_lock = + lock_rec_other_has_expl_req(LOCK_S, block, true, heap_no, impl_trx); + + /* The impl_trx is holding an implicit lock on the + given record 'rec'. So there cannot be another + explicit granted lock. Also, there can be another + explicit waiting lock only if the impl_trx has an + explicit granted lock. */ + + if (other_lock != nullptr) { + ut_a(lock_get_wait(other_lock)); + ut_a(lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP, block, heap_no, + impl_trx)); + } } + trx_mutex_exit(impl_trx); + impl_trx->release_reference(); } } @@ -5483,7 +5488,7 @@ static void lock_rec_convert_impl_to_expl_for_trx( trx_t *trx, /*!< in/out: active transaction */ ulint heap_no) /*!< in: rec heap number to lock */ { - ut_ad(trx_is_referenced(trx)); + ut_ad(trx->is_referenced()); DEBUG_SYNC_C("before_lock_rec_convert_impl_to_expl_for_trx"); { @@ -5526,18 +5531,19 @@ static void lock_rec_convert_impl_to_expl_for_trx( trx_mutex_exit(trx); } - trx_release_reference(trx); + trx->release_reference(); DEBUG_SYNC_C("after_lock_rec_convert_impl_to_expl_for_trx"); } /** If a transaction has an implicit x-lock on a record, but no explicit x-lock set on the record, sets one for it. +@param[in/out] caller_trx trx of current thread @param[in] block buffer block of rec @param[in] rec user record on page @param[in] index index of record @param[in] offsets rec_get_offsets(rec, index) */ -static void lock_rec_convert_impl_to_expl(const buf_block_t *block, +static void lock_rec_convert_impl_to_expl(trx_t* caller_trx, const buf_block_t *block, const rec_t *rec, dict_index_t *index, const ulint *offsets) { trx_t *trx; @@ -5554,11 +5560,11 @@ static void lock_rec_convert_impl_to_expl(const buf_block_t *block, trx_id = lock_clust_rec_some_has_impl(rec, index, offsets); - trx = trx_rw_is_active(trx_id, nullptr, true); + trx = trx_sys->rw_trx_hash.find(caller_trx, trx_id, true); } else { ut_ad(!dict_index_is_online_ddl(index)); - trx = lock_sec_rec_some_has_impl(rec, index, offsets); + trx = lock_sec_rec_some_has_impl(caller_trx, rec, index, offsets); if (trx) { DEBUG_SYNC_C("lock_rec_convert_impl_to_expl_will_validate"); ut_ad(!lock_rec_other_trx_holds_expl(LOCK_S | LOCK_REC_NOT_GAP, trx, rec, @@ -5569,7 +5575,7 @@ static void lock_rec_convert_impl_to_expl(const buf_block_t *block, if (trx != nullptr) { ulint heap_no = page_rec_get_heap_no(rec); - ut_ad(trx_is_referenced(trx)); + ut_ad(trx->is_referenced()); /* If the transaction is still active and has no explicit x-lock set on the record, set one for it. @@ -5584,7 +5590,7 @@ void lock_rec_convert_active_impl_to_expl(const buf_block_t *block, const rec_t *rec, dict_index_t *index, const ulint *offsets, trx_t *trx, ulint heap_no) { - trx_reference(trx, true); + trx->reference(); lock_rec_convert_impl_to_expl_for_trx(block, rec, index, offsets, trx, heap_no); } @@ -5624,7 +5630,7 @@ dberr_t lock_clust_rec_modify_check_and_lock( /* If a transaction has no explicit x-lock set on the record, set one for it */ - lock_rec_convert_impl_to_expl(block, rec, index, offsets); + lock_rec_convert_impl_to_expl(thr_get_trx(thr), block, rec, index, offsets); { locksys::Shard_latch_guard guard{block->get_page_id()}; @@ -5733,7 +5739,7 @@ dberr_t lock_sec_rec_read_check_and_lock( if ((page_get_max_trx_id(block->frame) >= trx_rw_min_trx_id() || recv_recovery_is_on()) && !page_rec_is_supremum(rec)) { - lock_rec_convert_impl_to_expl(block, rec, index, offsets); + lock_rec_convert_impl_to_expl(thr_get_trx(thr), block, rec, index, offsets); } { locksys::Shard_latch_guard guard{block->get_page_id()}; @@ -5782,7 +5788,7 @@ dberr_t lock_clust_rec_read_check_and_lock( heap_no = page_rec_get_heap_no(rec); if (heap_no != PAGE_HEAP_NO_SUPREMUM) { - lock_rec_convert_impl_to_expl(block, rec, index, offsets); + lock_rec_convert_impl_to_expl(thr_get_trx(thr), block, rec, index, offsets); } DEBUG_SYNC_C("after_lock_clust_rec_read_check_and_lock_impl_to_expl"); @@ -6242,8 +6248,8 @@ void lock_trx_release_locks(trx_t *trx) /*!< in/out: transaction */ check_trx_state(trx); ut_ad(trx_state_eq(trx, TRX_STATE_COMMITTED_IN_MEMORY)); - if (trx_is_referenced(trx)) { - while (trx_is_referenced(trx)) { + if (trx->is_referenced()) { + while (trx->is_referenced()) { trx_mutex_exit(trx); DEBUG_SYNC_C("waiting_trx_is_not_referenced"); @@ -6256,7 +6262,7 @@ void lock_trx_release_locks(trx_t *trx) /*!< in/out: transaction */ } } - ut_ad(!trx_is_referenced(trx)); + ut_ad(!trx->is_referenced()); /* If the background thread trx_rollback_or_clean_recovered() is still active then there is a chance that the rollback diff --git a/storage/innobase/read/read0read.cc b/storage/innobase/read/read0read.cc index 08503a76175a..4d4925738824 100644 --- a/storage/innobase/read/read0read.cc +++ b/storage/innobase/read/read0read.cc @@ -356,6 +356,7 @@ MVCC::~MVCC() { Copy the transaction ids from the source vector */ void ReadView::copy_trx_ids(const trx_ids_t &trx_ids) { + ut_ad(mutex_own(&trx_sys->mutex)); ulint size = trx_ids.size(); if (m_creator_trx_id > 0) { @@ -410,13 +411,25 @@ void ReadView::copy_trx_ids(const trx_ids_t &trx_ids) { m_up_limit_id = m_ids.front(); #ifdef UNIV_DEBUG - /* Assert that all transaction ids in list are active. */ - for (trx_ids_t::const_iterator it = trx_ids.begin(); it != trx_ids.end(); - ++it) { - trx_t *trx = trx_get_rw_trx_by_id(*it); - ut_ad(trx != nullptr); - ut_ad(trx->state == TRX_STATE_ACTIVE || trx->state == TRX_STATE_PREPARED); - } + /* Original assertion was here to make sure that rw_trx_ids and + rw_trx_hash are in sync and they hold either ACTIVE or PREPARED + transaction. + + Now rw_trx_hash.find() does + ut_ad(trx_state_eq(trx, TRX_STATE_ACTIVE) || + trx_state_eq(trx, TRX_STATE_PREPARED)). + No need to repeat it here. We even can't repeat it here: it'll be race + condition because we need trx->element->mutex locked to perform this + check (see how it is done in find()). + + Now rw_trx_ids and rw_trx_hash may get out of sync for a short while: + when transaction is registered it first gets added into rw_trx_ids + under trx_sys->mutex protection and then to rw_trx_hash without mutex + protection. Thus we need repeat this lookup. */ + for (trx_ids_t::const_iterator it = trx_ids.begin(); + it != trx_ids.end(); ++it) { + while (!trx_sys->rw_trx_hash.find(*it)); + } #endif /* UNIV_DEBUG */ } diff --git a/storage/innobase/row/row0row.cc b/storage/innobase/row/row0row.cc index 24ed70e4a599..fba14a3e2070 100644 --- a/storage/innobase/row/row0row.cc +++ b/storage/innobase/row/row0row.cc @@ -393,8 +393,7 @@ static inline dtuple_t *row_build_low(ulint type, const dict_index_t *index, times, and the cursor restore can happen multiple times for single insert or update statement. */ ut_a(!rec_offs_any_null_extern(rec, offsets) || - trx_rw_is_active(row_get_rec_trx_id(rec, index, offsets), nullptr, - false)); + trx_sys->rw_trx_hash.find(row_get_rec_trx_id(rec, index, offsets), false)); #endif /* UNIV_DEBUG || UNIV_BLOB_LIGHT_DEBUG */ if (type != ROW_COPY_POINTERS) { diff --git a/storage/innobase/row/row0sel.cc b/storage/innobase/row/row0sel.cc index 34f116cf7147..cd49d7088a96 100644 --- a/storage/innobase/row/row0sel.cc +++ b/storage/innobase/row/row0sel.cc @@ -752,9 +752,9 @@ static void row_sel_build_committed_vers_for_mysql( prebuilt->old_vers_heap = mem_heap_create(rec_offs_size(*offsets)); } - row_vers_build_for_semi_consistent_read(rec, mtr, clust_index, offsets, - offset_heap, prebuilt->old_vers_heap, - old_vers, vrow); + row_vers_build_for_semi_consistent_read( + prebuilt->trx, rec, mtr, clust_index, offsets, offset_heap, + prebuilt->old_vers_heap, old_vers, vrow); } /** Tests the conditions which determine when the index segment we are searching diff --git a/storage/innobase/row/row0vers.cc b/storage/innobase/row/row0vers.cc index 511ed3c39111..ecc5d0ac070c 100644 --- a/storage/innobase/row/row0vers.cc +++ b/storage/innobase/row/row0vers.cc @@ -203,6 +203,7 @@ and reports if it found a version which satisfies criterion specified by looking_for_match. If looking_for_match is true, it searches for a version which matches the secondary index record. Otherwise it searches for a version which does not match. +@param[in,out] caller_trx trx of currrent thread @param[in] looking_for_match are we looking for match? false means that we are looking for non-match @param[in] clust_index the clustered index @@ -228,12 +229,15 @@ does not match. looking_for_match to the given sec_rec is found among versions created by trx_id or the one version before them */ -static bool row_vers_find_matching( - bool looking_for_match, const dict_index_t *const clust_index, - const rec_t *const clust_rec, ulint *&clust_offsets, - const dict_index_t *const sec_index, const rec_t *const sec_rec, - const ulint *const sec_offsets, const ulint comp, const trx_id_t trx_id, - mtr_t *const mtr, mem_heap_t *&heap) { +static bool row_vers_find_matching(trx_t *caller_trx, bool looking_for_match, + const dict_index_t *const clust_index, + const rec_t *const clust_rec, + ulint *&clust_offsets, + const dict_index_t *const sec_index, + const rec_t *const sec_rec, + const ulint *const sec_offsets, + const ulint comp, const trx_id_t trx_id, + mtr_t *const mtr, mem_heap_t *&heap) { const rec_t *version = clust_rec; trx_id_t version_trx_id = trx_id; @@ -257,7 +261,7 @@ static bool row_vers_find_matching( delete-marked, because we never start a transaction by inserting a delete-marked record. */ ut_ad(prev_version || !rec_get_deleted_flag(version, comp) || - !trx_rw_is_active(trx_id, nullptr, false)); + !trx_sys->rw_trx_hash.find(caller_trx, trx_id, false)); /* Free version and clust_offsets. */ mem_heap_free(old_heap); @@ -284,6 +288,7 @@ static bool row_vers_find_matching( /** Finds out if an active transaction has inserted or modified a secondary index record. + @param[in,out] caller_trx trx of current thread @param[in] clust_rec clustered index record @param[in] clust_index the clustered index @param[in] sec_rec secondary index record @@ -295,14 +300,13 @@ static bool row_vers_find_matching( negatives. The caller must confirm all positive results by calling checking if the trx is still active.*/ UNIV_INLINE -trx_t *row_vers_impl_x_locked_low(const rec_t *const clust_rec, +trx_t *row_vers_impl_x_locked_low(trx_t* caller_trx, const rec_t *const clust_rec, const dict_index_t *const clust_index, const rec_t *const sec_rec, const dict_index_t *const sec_index, const ulint *const sec_offsets, mtr_t *const mtr) { trx_id_t trx_id; - ibool corrupt; ulint comp; ulint *clust_offsets; @@ -498,13 +502,15 @@ trx_t *row_vers_impl_x_locked_low(const rec_t *const clust_rec, rec_get_offsets(clust_rec, clust_index, nullptr, ULINT_UNDEFINED, &heap); trx_id = row_get_rec_trx_id(clust_rec, clust_index, clust_offsets); - corrupt = FALSE; - trx_t *trx = trx_rw_is_active(trx_id, &corrupt, true); + trx_t *trx = trx_sys->rw_trx_hash.find(caller_trx, trx_id, true); if (trx == nullptr) { /* The transaction that modified or inserted clust_rec is no longer active, or it is corrupt: no implicit lock on rec */ + trx_sys_mutex_enter(); + ibool corrupt = trx_id >= trx_sys->max_trx_id; + trx_sys_mutex_exit(); if (corrupt) { lock_report_trx_id_insanity(trx_id, clust_rec, clust_index, clust_offsets, trx_sys_get_max_trx_id()); @@ -520,10 +526,10 @@ trx_t *row_vers_impl_x_locked_low(const rec_t *const clust_rec, bool looking_for_match = rec_get_deleted_flag(sec_rec, comp); - if (!row_vers_find_matching(looking_for_match, clust_index, clust_rec, - clust_offsets, sec_index, sec_rec, sec_offsets, - comp, trx_id, mtr, heap)) { - trx_release_reference(trx); + if (!row_vers_find_matching(caller_trx, looking_for_match, clust_index, + clust_rec, clust_offsets, sec_index, sec_rec, + sec_offsets, comp, trx_id, mtr, heap)) { + trx->release_reference(); trx = nullptr; } @@ -533,8 +539,8 @@ trx_t *row_vers_impl_x_locked_low(const rec_t *const clust_rec, return trx; } -trx_t *row_vers_impl_x_locked(const rec_t *rec, const dict_index_t *index, - const ulint *offsets) { +trx_t *row_vers_impl_x_locked(trx_t *caller_trx, const rec_t *rec, + const dict_index_t *index, const ulint *offsets) { mtr_t mtr; trx_t *trx; const rec_t *clust_rec; @@ -570,10 +576,10 @@ trx_t *row_vers_impl_x_locked(const rec_t *rec, const dict_index_t *index, trx = nullptr; } else { - trx = row_vers_impl_x_locked_low(clust_rec, clust_index, rec, index, - offsets, &mtr); + trx = row_vers_impl_x_locked_low(caller_trx, clust_rec, clust_index, rec, + index, offsets, &mtr); - ut_ad(trx == nullptr || trx_is_referenced(trx)); + ut_ad(trx == nullptr || trx->is_referenced()); } mtr_commit(&mtr); @@ -1355,6 +1361,7 @@ dberr_t row_vers_build_for_consistent_read( /** Constructs the last committed version of a clustered index record, which should be seen by a semi-consistent read. */ void row_vers_build_for_semi_consistent_read( + trx_t *caller_trx, /*!< in/out: trx of current thread */ const rec_t *rec, /*!< in: record in a clustered index; the caller must have a latch on the page; this latch locks the top of the stack of versions @@ -1391,7 +1398,6 @@ void row_vers_build_for_semi_consistent_read( ut_ad(!vrow || !(*vrow)); for (;;) { - const trx_t *version_trx; mem_heap_t *heap2; rec_t *prev_version; trx_id_t version_trx_id; @@ -1401,19 +1407,7 @@ void row_vers_build_for_semi_consistent_read( rec_trx_id = version_trx_id; } - trx_sys_mutex_enter(); - version_trx = trx_get_rw_trx_by_id(version_trx_id); - /* Because version_trx is a read-write transaction, - its state cannot change from or to NOT_STARTED while - we are holding the trx_sys->mutex. It may change from - ACTIVE to PREPARED or COMMITTED. */ - if (version_trx && - trx_state_eq(version_trx, TRX_STATE_COMMITTED_IN_MEMORY)) { - version_trx = nullptr; - } - trx_sys_mutex_exit(); - - if (!version_trx) { + if (!trx_sys->rw_trx_hash.find(caller_trx, version_trx_id)) { committed_version_trx: /* We found a version that belongs to a committed transaction: return it. */ diff --git a/storage/innobase/sync/sync0debug.cc b/storage/innobase/sync/sync0debug.cc index 5c243345d9a9..e8f2d21ddeef 100644 --- a/storage/innobase/sync/sync0debug.cc +++ b/storage/innobase/sync/sync0debug.cc @@ -444,6 +444,7 @@ LatchDebug::LatchDebug() { LEVEL_MAP_INSERT(SYNC_THREADS); LEVEL_MAP_INSERT(SYNC_TRX); LEVEL_MAP_INSERT(SYNC_TRX_SYS); + LEVEL_MAP_INSERT(SYNC_RW_TRX_HASH_ELEMENT); LEVEL_MAP_INSERT(SYNC_LOCK_SYS_GLOBAL); LEVEL_MAP_INSERT(SYNC_LOCK_SYS_SHARDED); LEVEL_MAP_INSERT(SYNC_LOCK_WAIT_SYS); @@ -700,6 +701,7 @@ Latches *LatchDebug::check_order(const latch_t *latch, case SYNC_THREADS: case SYNC_LOCK_SYS_GLOBAL: case SYNC_LOCK_WAIT_SYS: + case SYNC_RW_TRX_HASH_ELEMENT: case SYNC_TRX_SYS: case SYNC_IBUF_BITMAP_MUTEX: case SYNC_TEMP_SPACE_RSEG: @@ -1491,6 +1493,9 @@ static void sync_latch_meta_init() UNIV_NOTHROW { LATCH_ADD_MUTEX(REDO_LOG_ARCHIVE_QUEUE_MUTEX, SYNC_NO_ORDER_CHECK, PFS_NOT_INSTRUMENTED); + LATCH_ADD_MUTEX(RW_TRX_HASH_ELEMENT, SYNC_RW_TRX_HASH_ELEMENT, + rw_trx_hash_element_mutex_key); + LATCH_ADD_MUTEX(DBLWR, SYNC_DBLWR, dblwr_mutex_key); LATCH_ADD_MUTEX(TEST_MUTEX, SYNC_NO_ORDER_CHECK, PFS_NOT_INSTRUMENTED); diff --git a/storage/innobase/sync/sync0sync.cc b/storage/innobase/sync/sync0sync.cc index a3f853b6621c..44daff6fdfaf 100644 --- a/storage/innobase/sync/sync0sync.cc +++ b/storage/innobase/sync/sync0sync.cc @@ -135,6 +135,7 @@ mysql_pfs_key_t event_manager_mutex_key; mysql_pfs_key_t sync_array_mutex_key; mysql_pfs_key_t zip_pad_mutex_key; mysql_pfs_key_t row_drop_list_mutex_key; +mysql_pfs_key_t rw_trx_hash_element_mutex_key; mysql_pfs_key_t file_open_mutex_key; mysql_pfs_key_t master_key_id_mutex_key; mysql_pfs_key_t clone_sys_mutex_key; diff --git a/storage/innobase/trx/trx0sys.cc b/storage/innobase/trx/trx0sys.cc index 33a85e120fa6..a9d388db35aa 100644 --- a/storage/innobase/trx/trx0sys.cc +++ b/storage/innobase/trx/trx0sys.cc @@ -520,7 +520,7 @@ void trx_sys_create(void) { new (&trx_sys->rw_trx_ids) trx_ids_t(ut_allocator(mem_key_trx_sys_t_rw_trx_ids)); - new (&trx_sys->rw_trx_set) TrxIdSet(); + trx_sys->rw_trx_hash.init(); new (&trx_sys->rsegs) Rsegs(); trx_sys->rsegs.set_empty(); @@ -586,7 +586,7 @@ void trx_sys_close(void) { trx_sys->rw_trx_ids.~trx_ids_t(); - trx_sys->rw_trx_set.~TrxIdSet(); + trx_sys->rw_trx_hash.destroy(); ut_free(trx_sys); diff --git a/storage/innobase/trx/trx0trx.cc b/storage/innobase/trx/trx0trx.cc index 832b901fe006..a95d097e5c89 100644 --- a/storage/innobase/trx/trx0trx.cc +++ b/storage/innobase/trx/trx0trx.cc @@ -253,6 +253,8 @@ struct TrxFactory { new (&trx->lock.table_locks) lock_pool_t(); + trx->rw_trx_hash_pins = nullptr; + trx_init(trx); trx->state = TRX_STATE_NOT_STARTED; @@ -448,6 +450,7 @@ static trx_t *trx_create_low() { /* We just got trx from pool, it should be non locking */ ut_ad(trx->will_lock == 0); + ut_ad(!trx->rw_trx_hash_pins); trx->persists_gtid = false; @@ -485,6 +488,7 @@ Release a trx_t instance back to the pool. static void trx_free(trx_t *&trx) { assert_trx_is_free(trx); + trx_sys->rw_trx_hash.put_pins(trx); trx->mysql_thd = nullptr; // FIXME: We need to avoid this heap free/alloc for each commit. @@ -675,7 +679,10 @@ static void trx_resurrect_table_ids(trx_t *trx, const trx_undo_ptr_t *undo_ptr, ut_ad(undo == undo_ptr->insert_undo || undo == undo_ptr->update_undo); - if (trx_state_eq(trx, TRX_STATE_COMMITTED_IN_MEMORY) || undo->empty) { + ut_ad(trx_state_eq(trx, TRX_STATE_ACTIVE) || + trx_state_eq(trx, TRX_STATE_PREPARED)); + + if (undo->empty) { return; } @@ -937,10 +944,31 @@ static void trx_resurrect_update( } } +/** Mapping read-write transactions from id to transaction instance, for +creating read views and during trx id lookup for MVCC and locking. */ +struct TrxTrack { + explicit TrxTrack(trx_id_t id, trx_t *trx = nullptr) : m_id(id), m_trx(trx) { + // Do nothing + } + + trx_id_t m_id; + trx_t *m_trx; +}; + +/** +Comparator for TrxMap */ +struct TrxTrackCmp { + bool operator()(const TrxTrack &lhs, const TrxTrack &rhs) const { + return (lhs.m_id < rhs.m_id); + } +}; + +using TrxIdSet = std::set>; + /** Resurrect the transactions that were doing inserts and updates at the time of a crash, they need to be undone. @param[in] rseg rollback segment */ -static void trx_resurrect(trx_rseg_t *rseg) { +static void trx_resurrect(trx_rseg_t *rseg, TrxIdSet& set) { trx_t *trx; trx_undo_t *undo; @@ -951,7 +979,8 @@ static void trx_resurrect(trx_rseg_t *rseg) { undo = UT_LIST_GET_NEXT(undo_list, undo)) { trx = trx_resurrect_insert(undo, rseg); - trx_sys_rw_trx_add(trx); + set.insert(TrxTrack(trx->id, trx)); + ut_d(trx->in_rw_trx_list = true); trx_resurrect_table_ids(trx, &trx->rsegs.m_redo, undo); } @@ -959,12 +988,9 @@ static void trx_resurrect(trx_rseg_t *rseg) { /* Ressurrect transactions that were doing updates. */ for (undo = UT_LIST_GET_FIRST(rseg->update_undo_list); undo != nullptr; undo = UT_LIST_GET_NEXT(undo_list, undo)) { - /* Check the trx_sys->rw_trx_set first. */ - trx_sys_mutex_enter(); - - trx_t *trx = trx_get_rw_trx_by_id(undo->trx_id); - - trx_sys_mutex_exit(); + /* Check if trx_id was already registered first. */ + TrxIdSet::iterator it = set.find(TrxTrack(undo->trx_id)); + trx_t *trx = it == set.end() ? nullptr : it->m_trx; if (trx == nullptr) { trx = trx_allocate_for_background(); @@ -975,7 +1001,8 @@ static void trx_resurrect(trx_rseg_t *rseg) { trx_resurrect_update(trx, undo, rseg); - trx_sys_rw_trx_add(trx); + set.insert(TrxTrack(trx->id, trx)); + ut_d(trx->in_rw_trx_list = true); trx_resurrect_table_ids(trx, &trx->rsegs.m_redo, undo); } @@ -987,12 +1014,13 @@ static void trx_resurrect(trx_rseg_t *rseg) { transactions to be rolled back or cleaned up are built based on the undo log lists. */ void trx_lists_init_at_db_start(void) { + TrxIdSet set; ut_a(srv_is_being_started); /* Look through the rollback segments in the TRX_SYS for transaction undo logs. */ for (auto rseg : trx_sys->rsegs) { - trx_resurrect(rseg); + trx_resurrect(rseg, set); } /* Look through the rollback segments in each RSEG_ARRAY for @@ -1001,20 +1029,24 @@ void trx_lists_init_at_db_start(void) { for (auto undo_space : undo::spaces->m_spaces) { undo_space->rsegs()->s_lock(); for (auto rseg : *undo_space->rsegs()) { - trx_resurrect(rseg); + trx_resurrect(rseg, set); } undo_space->rsegs()->s_unlock(); } undo::spaces->s_unlock(); - TrxIdSet::iterator end = trx_sys->rw_trx_set.end(); + TrxIdSet::iterator end = set.end(); - for (TrxIdSet::iterator it = trx_sys->rw_trx_set.begin(); it != end; ++it) { + for (TrxIdSet::iterator it = set.begin(); it != end; ++it) { ut_ad(it->m_trx->in_rw_trx_list); - if (it->m_trx->state == TRX_STATE_ACTIVE || - it->m_trx->state == TRX_STATE_PREPARED) { - trx_sys->rw_trx_ids.push_back(it->m_id); + auto trx = it->m_trx; + if (trx_state_eq(trx, TRX_STATE_ACTIVE) || + trx_state_eq(trx, TRX_STATE_PREPARED)) + { + trx_sys->rw_trx_hash.insert(trx); + trx_sys->rw_trx_hash.put_pins(trx); + trx_sys->rw_trx_ids.push_back(trx->id); } UT_LIST_ADD_FIRST(trx_sys->rw_trx_list, it->m_trx); @@ -1192,9 +1224,9 @@ void trx_assign_rseg_temp(trx_t *trx) { trx_sys->rw_trx_ids.push_back(trx->id); - trx_sys->rw_trx_set.insert(TrxTrack(trx->id, trx)); - mutex_exit(&trx_sys->mutex); + + trx_sys->rw_trx_hash.insert(trx); } } @@ -1264,10 +1296,14 @@ static void trx_start_low( ut_ad(!trx->in_rw_trx_list); - /* We tend to over assert and that complicates the code somewhat. - e.g., the transaction state can be set earlier but we are forced to - set it under the protection of the trx_sys_t::mutex because some - trx list assertions are triggered unnecessarily. */ + /* No other thread can access this trx object through rw_trx_hash, thus + we don't need trx_sys->mutex protection for that purpose. Still this + trx can be found through trx_sys->mysql_trx_list, which means state + change must be protected by e.g. trx->mutex. + + For now we update it without mutex protection, because original code + did it this way. */ + trx->state = TRX_STATE_ACTIVE; /* By default all transactions are in the read-only list unless they are non-locking auto-commit read only transactions or background @@ -1288,8 +1324,6 @@ static void trx_start_low( trx_sys->rw_trx_ids.push_back(trx->id); - trx_sys_rw_trx_add(trx); - ut_ad(trx->rsegs.m_redo.rseg != nullptr || srv_read_only_mode || srv_force_recovery >= SRV_FORCE_NO_TRX_UNDO); @@ -1297,12 +1331,11 @@ static void trx_start_low( ut_d(trx->in_rw_trx_list = true); - trx->state = TRX_STATE_ACTIVE; - ut_ad(trx_sys_validate_trx_list()); trx_sys_mutex_exit(); + trx_sys->rw_trx_hash.insert(trx); } else { trx->id = 0; @@ -1320,16 +1353,12 @@ static void trx_start_low( trx_sys->rw_trx_ids.push_back(trx->id); - trx_sys->rw_trx_set.insert(TrxTrack(trx->id, trx)); - trx_sys_mutex_exit(); - } - - trx->state = TRX_STATE_ACTIVE; + trx_sys->rw_trx_hash.insert(trx); + } } else { ut_ad(!read_write); - trx->state = TRX_STATE_ACTIVE; } } @@ -1733,8 +1762,6 @@ static void trx_erase_lists(trx_t *trx, bool serialised, Gtid_desc >id_desc) { } } - trx_sys->rw_trx_set.erase(TrxTrack(trx->id)); - /* Set minimal active trx id. */ trx_id_t min_id = trx_sys->rw_trx_ids.empty() ? trx_sys->max_trx_id : trx_sys->rw_trx_ids.front(); @@ -1771,31 +1798,27 @@ static void trx_release_impl_and_expl_locks(trx_t *trx, bool serialized) { --trx_sys->n_prepared_trx; } + if (trx_sys_latch_is_needed) { + trx_sys_mutex_exit(); + } + + if (trx->id > 0) { + trx_sys->rw_trx_hash.erase(trx); + } + trx_mutex_enter(trx); /* Please consider this particular point in time as the moment the trx's implicit locks become released. - This change is protected by both trx_sys->mutex and trx->mutex. - Therefore, there are two secure ways to check if the trx still can hold - implicit locks: - (1) if you only know id of the trx, then you can obtain trx_sys->mutex and - check if trx is still in rw_trx_set. This works, because the call to - trx_erase_list() which removes trx from this list several lines above is - also protected by trx_sys->mutex. We use this approach in - lock_rec_convert_impl_to_expl() by using trx_rw_is_active() - (2) if you have pointer to trx, and you know it is safe to access (say, you - hold reference to this trx which prevents it from being freed) then you - can obtain trx->mutex and check if trx->state is equal to - TRX_STATE_COMMITTED_IN_MEMORY. We use this approach in - lock_rec_convert_impl_to_expl_for_trx() when deciding for the final time - if we really want to create explicit lock on behalf of implicit lock - holder. */ + if you have pointer to trx, and you know it is safe to access (say, you + hold reference to this trx which prevents it from being freed) then you + can obtain trx->mutex and check if trx->state is equal to + TRX_STATE_COMMITTED_IN_MEMORY. We use this approach in + lock_rec_convert_impl_to_expl_for_trx() when deciding for the final time + if we really want to create explicit lock on behalf of implicit lock + holder. */ trx->state = TRX_STATE_COMMITTED_IN_MEMORY; trx_mutex_exit(trx); - if (trx_sys_latch_is_needed) { - trx_sys_mutex_exit(); - } - lock_trx_release_locks(trx); } @@ -2489,7 +2512,7 @@ void trx_print_low(FILE *f, void trx_print_latched(FILE *f, const trx_t *trx, ulint max_query_len) { /* We need exclusive access to lock_sys for lock_number_of_rows_locked(), - and accessing trx->lock fields without trx->mutex.*/ + and accessing trx->lock fields without trx->mutex. */ ut_ad(locksys::owns_exclusive_global_latch()); ut_ad(trx_sys_mutex_own()); @@ -2650,7 +2673,7 @@ static thread_local int32_t trx_latched_count = 0; static thread_local bool trx_allowed_two_latches = false; void trx_before_mutex_enter(const trx_t *trx, bool first_of_two) { - if (0 == trx_latched_count++) { + if (trx_latched_count++ == 0) { ut_a(trx_first_latched_trx == nullptr); trx_first_latched_trx = trx; if (first_of_two) { @@ -2673,8 +2696,8 @@ void trx_before_mutex_enter(const trx_t *trx, bool first_of_two) { } } void trx_before_mutex_exit(const trx_t *trx) { - ut_a(0 < trx_latched_count); - if (0 == --trx_latched_count) { + ut_a(trx_latched_count > 0); + if (--trx_latched_count == 0) { ut_a(trx_first_latched_trx == trx); trx_first_latched_trx = nullptr; trx_allowed_two_latches = false; @@ -3184,8 +3207,6 @@ void trx_set_rw_mode(trx_t *trx) /*!< in/out: transaction that is RW */ trx_sys->rw_trx_ids.push_back(trx->id); - trx_sys->rw_trx_set.insert(TrxTrack(trx->id, trx)); - /* So that we can see our own changes. */ if (MVCC::is_view_active(trx->read_view)) { MVCC::set_view_creator_trx_id(trx->read_view, trx->id); @@ -3196,6 +3217,8 @@ void trx_set_rw_mode(trx_t *trx) /*!< in/out: transaction that is RW */ ut_d(trx->in_rw_trx_list = true); mutex_exit(&trx_sys->mutex); + + trx_sys->rw_trx_hash.insert(trx); } void trx_kill_blocking(trx_t *trx) {