Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MDEV-35465 Async replication stops working on Galera async replica node when parallel replication is enabled #456

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions mysql-test/suite/galera/r/galera_as_slave_parallel_retry.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
connection node_2;
connection node_1;
connect master, 127.0.0.1, root, , test, $NODE_MYPORT_3;
connect node_1_ctrl, 127.0.0.1, root, , test, $NODE_MYPORT_1;
connection node_1;
START SLAVE;
connection master;
CREATE TABLE t1 (f1 INT PRIMARY KEY) ENGINE=InnoDB;
connection node_1;
SET GLOBAL debug_dbug = '+d,rpl_parallel_simulate_temp_err_xid,sync.wsrep_retry_event_group';
connection master;
INSERT INTO t1 VALUES (1);
connection node_1_ctrl;
SET debug_sync = 'now WAIT_FOR sync.wsrep_retry_event_group_reached';
SET GLOBAL debug_dbug = '';
SET debug_sync = 'now SIGNAL signal.wsrep_retry_event_group';
connection node_1;
SET debug_sync = 'RESET';
connection master;
DROP TABLE t1;
connection node_1;
STOP SLAVE;
10 changes: 10 additions & 0 deletions mysql-test/suite/galera/t/galera_as_slave_parallel_retry.cnf
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
!include ../galera_2nodes_as_slave.cnf

[mysqld]
log-bin=mysqld-bin
log-slave-updates
binlog-format=ROW

[mysqld.1]
slave-parallel-threads=2
slave-parallel-mode=optimistic
52 changes: 52 additions & 0 deletions mysql-test/suite/galera/t/galera_as_slave_parallel_retry.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# MDEV-35465 Async replication stops working on Galera async replica node
# when parallel replication is enabled

--source include/have_innodb.inc
--source include/have_log_bin.inc
--source include/galera_cluster.inc
--source include/have_debug_sync.inc

# Node 3 is not a Galera node, use it as a master
--connect master, 127.0.0.1, root, , test, $NODE_MYPORT_3
--connect node_1_ctrl, 127.0.0.1, root, , test, $NODE_MYPORT_1

--connection node_1
--disable_query_log
--eval CHANGE MASTER TO MASTER_HOST='127.0.0.1', MASTER_USER='root', MASTER_PORT=$NODE_MYPORT_3;
--enable_query_log
START SLAVE;

--connection master
CREATE TABLE t1 (f1 INT PRIMARY KEY) ENGINE=InnoDB;

--connection node_1
--let $wait_condition = SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = 't1'
--source include/wait_condition.inc

#
--let debug_dbug_orig = `SELECT @@GLOBAL.debug_dbug`
SET GLOBAL debug_dbug = '+d,rpl_parallel_simulate_temp_err_xid,sync.wsrep_retry_event_group';

--connection master
INSERT INTO t1 VALUES (1);

--connection node_1_ctrl
SET debug_sync = 'now WAIT_FOR sync.wsrep_retry_event_group_reached';
--eval SET GLOBAL debug_dbug = '$debug_dbug_orig'
SET debug_sync = 'now SIGNAL signal.wsrep_retry_event_group';

--let $wait_condition = SELECT COUNT(*) = 1 FROM t1;
--source include/wait_condition.inc


--connection node_1
SET debug_sync = 'RESET';

--connection master
DROP TABLE t1;

--connection node_1
--let $wait_condition = SELECT COUNT(*) = 0 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = 't1'
--source include/wait_condition.inc

STOP SLAVE;
2 changes: 1 addition & 1 deletion sql/log_event_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5962,7 +5962,7 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
if (unlikely(open_and_lock_tables(thd, rgi->tables_to_lock, FALSE, 0)))
{
#ifdef WITH_WSREP
if (WSREP(thd))
if (WSREP(thd) && !thd->slave_thread)
{
WSREP_WARN("BF applier thread=%lu failed to open_and_lock_tables for "
"%s, fatal: %d "
Expand Down
28 changes: 19 additions & 9 deletions sql/rpl_parallel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -865,8 +865,13 @@ retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt,
err= 0;
errmsg= NULL;
#ifdef WITH_WSREP
thd->wsrep_cs().reset_error();
WSREP_DEBUG("retrying async replication event");
DBUG_EXECUTE_IF("sync.wsrep_retry_event_group", {
WSREP_INFO("sync.wsrep_retry_event_group");
const char act[]= "now "
"SIGNAL sync.wsrep_retry_event_group_reached "
"WAIT_FOR signal.wsrep_retry_event_group";
debug_sync_set_action(thd, STRING_WITH_LEN(act));
};);
#endif /* WITH_WSREP */

/*
Expand Down Expand Up @@ -1013,15 +1018,20 @@ retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt,
*/
thd->reset_killed();
#ifdef WITH_WSREP
if (wsrep_before_command(thd))
if (WSREP(thd))
{
WSREP_WARN("Parallel slave worker failed at wsrep_before_command() hook");
err= 1;
goto err;
/* Exec after statement hook to make sure that the failed transaction
* gets cleared and reset error state. */
if (wsrep_after_statement(thd))
{
WSREP_WARN("Parallel slave worker failed at wsrep_after_statement() hook");
err= 1;
goto err;
}
thd->wsrep_cs().reset_error();
wsrep_start_trx_if_not_started(thd);
WSREP_DEBUG("parallel slave retry, after trx start");
}
wsrep_start_trx_if_not_started(thd);
WSREP_DEBUG("parallel slave retry, after trx start");

#endif /* WITH_WSREP */
strmake_buf(log_name, ir->name);
if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0)
Expand Down
11 changes: 8 additions & 3 deletions sql/wsrep_mysqld.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3909,11 +3909,16 @@ enum wsrep::streaming_context::fragment_unit wsrep_fragment_unit(ulong unit)

bool THD::wsrep_parallel_slave_wait_for_prior_commit()
{
if (rgi_slave && rgi_slave->is_parallel_exec && wait_for_prior_commit())
if (rgi_slave && rgi_slave->is_parallel_exec)
{
return 1;
wait_for_pending_deadlock_kill(this, rgi_slave);
if (rgi_slave->killed_for_retry) {
my_error(ER_LOCK_DEADLOCK, MYF(0));
return true;
}
return wait_for_prior_commit();
}
return 0;
return false;
}

/***** callbacks for wsrep service ************/
Expand Down