Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

规范代码 #140

Merged
merged 1 commit into from
Aug 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 85 additions & 94 deletions sql/msg_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ bool dbug_pq_worker_stall= 0;
MQ_RESULT MQueue_handle::send_bytes(uint32 nbytes, const void *data, uint32 *written, bool nowait)
{
uint32 used;
uint32 ringsize= m_queue->m_ring_size;
uint32 ringsize = m_queue->m_ring_size;
uint32 sent = 0, available;

/** only worker thread can send data to message queue */
THD *thd= current_thd;
THD *thd = current_thd;
DBUG_ASSERT(!m_queue->m_sender_event->PQ_caller
|| thd == m_queue->m_sender_event->PQ_caller);

Expand All @@ -53,25 +53,25 @@ MQ_RESULT MQueue_handle::send_bytes(uint32 nbytes, const void *data, uint32 *wri
{

/** atomically obtain the read position */
rb= atomic_read_u64(&m_queue->m_bytes_read);
rb = atomic_read_u64(&m_queue->m_bytes_read);
/** atomically obtain the write position */
wb= atomic_read_u64(&m_queue->m_bytes_written);
wb = atomic_read_u64(&m_queue->m_bytes_written);
DBUG_ASSERT(wb >= rb);

used= wb - rb;
used = wb - rb;
DBUG_ASSERT(used <= ringsize);
available= std::min(ringsize - used, nbytes - sent);
available = std::min(ringsize - used, nbytes - sent);

compiler_barrier();
if (m_queue->detached == MQ_HAVE_DETACHED)
{
*written= sent;
*written = sent;
return MQ_DETACHED;
}

if (m_queue->detached == MQ_TMP_DETACHED)
{
*written= sent;
*written = sent;
return MQ_SUCCESS;
}

Expand All @@ -86,9 +86,8 @@ MQ_RESULT MQueue_handle::send_bytes(uint32 nbytes, const void *data, uint32 *wri
/** notify receiver to receive data from MQ */
end_wait(m_queue->m_receiver_event);
/** blocking mode by default, i.e, nowait = false */
if (nowait)
{
*written= sent;
if (nowait) {
*written = sent;
return MQ_WOULD_BLOCK;
}
/** sender enters into the blocking wait status */
Expand All @@ -100,14 +99,14 @@ MQ_RESULT MQueue_handle::send_bytes(uint32 nbytes, const void *data, uint32 *wri
uint32 sent_once;

/** compute the real write position in ring array */
offset= MOD(wb, ringsize);
sent_once= std::min(available, ringsize - offset);
offset = MOD(wb, ringsize);
sent_once = std::min(available, ringsize - offset);

/** this barrier ensures that memcpy() is finished before end_wait() */
memory_barrier();
memcpy(&m_queue->m_buffer[offset],
reinterpret_cast<char*>(const_cast<void*>(data)) + sent, sent_once);
sent+= sent_once;
sent += sent_once;

/** atomically update the write position */
atomic_inc_bytes_written(sent_once);
Expand All @@ -121,14 +120,13 @@ MQ_RESULT MQueue_handle::send_bytes(uint32 nbytes, const void *data, uint32 *wri
#endif
}

if (thd->is_pq_error())
{
if (thd->is_pq_error()) {
set_datched_status(MQ_HAVE_DETACHED);
return MQ_DETACHED;
}

DBUG_ASSERT(sent == nbytes);
*written= sent;
*written = sent;
return MQ_SUCCESS;
}

Expand All @@ -146,23 +144,21 @@ MQ_RESULT MQueue_handle::send(const void *data, uint32 len, bool nowait)
{

MQ_RESULT res;
uint32 nbytes= len;
uint32 nbytes = len;
uint32 written;

/** (1) write the message length into MQ */
res= send_bytes(WORD_LENGTH, (char *)&nbytes, &written, nowait);
if (res != MQ_SUCCESS)
{
res = send_bytes(WORD_LENGTH, (char *)&nbytes, &written, nowait);
if (res != MQ_SUCCESS) {
DBUG_ASSERT(res == MQ_DETACHED);
return res;
}
DBUG_ASSERT((!written && m_queue->detached == MQ_TMP_DETACHED)
|| written == WORD_LENGTH);

/** (2) write the message data into MQ */
res= send_bytes(nbytes, data, &written, nowait);
if(res != MQ_SUCCESS)
{
res = send_bytes(nbytes, data, &written, nowait);
if(res != MQ_SUCCESS) {
DBUG_ASSERT(res == MQ_DETACHED);
return res;
}
Expand All @@ -174,21 +170,19 @@ MQ_RESULT MQueue_handle::send(const void *data, uint32 len, bool nowait)
/** sending message to MQ in a Field_raw_data manner */
MQ_RESULT MQueue_handle::send(Field_raw_data *fm)
{
MQ_RESULT res= MQ_SUCCESS;
MQ_RESULT res = MQ_SUCCESS;
uint32 written;

/** (s1) sending the variable-field's length_bytes */
if (fm->m_var_len)
{
res= send_bytes(1, (void *)&fm->m_var_len, &written);
if (fm->m_var_len) {
res = send_bytes(1, (void *)&fm->m_var_len, &written);
DBUG_ASSERT((res == MQ_SUCCESS && written == 1) || res == MQ_DETACHED
|| (!written && m_queue->detached == MQ_TMP_DETACHED));
}

/** (s2) sending the data of field->ptr */
if (MQ_SUCCESS == res)
{
res= send_bytes(fm->m_len, fm->m_ptr, &written);
if (MQ_SUCCESS == res) {
res = send_bytes(fm->m_len, fm->m_ptr, &written);
DBUG_ASSERT((res == MQ_SUCCESS && written == fm->m_len) || res == MQ_DETACHED
|| (!written && m_queue->detached == MQ_TMP_DETACHED));
}
Expand All @@ -208,38 +202,36 @@ MQ_RESULT MQueue_handle::receive_bytes(uint32 bytes_needed, uint32 *nbytesp, voi
uint64 rb, wb;
uint32 used, offset;

*nbytesp= 0;
uint32 ringsize= m_queue->m_ring_size;
THD* thd= current_thd;
*nbytesp = 0;
uint32 ringsize = m_queue->m_ring_size;
THD* thd = current_thd;
DBUG_ASSERT(!m_queue->m_receiver_event->PQ_caller
|| thd == m_queue->m_receiver_event->PQ_caller);
while(!thd->is_killed() && !thd->pq_error)
{
rb= atomic_read_u64(&m_queue->m_bytes_read) + m_consume_pending;
wb= atomic_read_u64(&m_queue->m_bytes_written);
while(!thd->is_killed() && !thd->pq_error) {
rb = atomic_read_u64(&m_queue->m_bytes_read) + m_consume_pending;
wb = atomic_read_u64(&m_queue->m_bytes_written);
DBUG_ASSERT(wb >= rb);

used= wb - rb;
used = wb - rb;
DBUG_ASSERT(used <= ringsize);
offset= MOD(rb, ringsize);
offset = MOD(rb, ringsize);

/** we have enough space and then directly read bytes_needed data into datap */
if (used >= bytes_needed)
{
/** (s1) read data located in [offset, ..., ringsize] */
if (offset + bytes_needed <= ringsize)
{
if (offset + bytes_needed <= ringsize) {
memcpy(datap, &m_queue->m_buffer[offset], bytes_needed);
} else {
/** (s2) read data located in [offset, ringsize], [0, bytes_needed - (ringsize - offset))] */
int part_1= ringsize - offset;
int part_2= bytes_needed - part_1;
int part_1 = ringsize - offset;
int part_2 = bytes_needed - part_1;

memcpy(datap, &m_queue->m_buffer[offset], part_1);
memcpy((char *)datap + part_1, &m_queue->m_buffer[0], part_2);
}

*nbytesp= bytes_needed;
*nbytesp = bytes_needed;
memory_barrier();

/** notify sender that there is available space in MQ */
Expand All @@ -251,10 +243,9 @@ MQ_RESULT MQueue_handle::receive_bytes(uint32 bytes_needed, uint32 *nbytesp, voi
* there are not enough data for receiver, and receiver only
* receives the data located into [offset, ringsize].
*/
if (offset + used >= ringsize)
{
if (offset + used >= ringsize) {
memcpy(datap, &m_queue->m_buffer[offset], ringsize - offset);
*nbytesp= ringsize - offset;
*nbytesp = ringsize - offset;

memory_barrier();
end_wait(m_queue->m_sender_event);
Expand All @@ -265,11 +256,11 @@ MQ_RESULT MQueue_handle::receive_bytes(uint32 bytes_needed, uint32 *nbytesp, voi
* if m_queue is detached and there are still data in m_queue,
* receiver can receive data until it reads all data.
*/
if (m_queue->detached == MQ_HAVE_DETACHED)
{
if (m_queue->detached == MQ_HAVE_DETACHED) {
read_barrier();
if(wb != atomic_read_u64(&m_queue->m_bytes_written))
if(wb != atomic_read_u64(&m_queue->m_bytes_written)) {
continue;
}
return MQ_DETACHED;
}

Expand All @@ -281,18 +272,19 @@ MQ_RESULT MQueue_handle::receive_bytes(uint32 bytes_needed, uint32 *nbytesp, voi
* }
* should be a group of atomic operation.
*/
if (m_consume_pending > 0)
{
offset= m_consume_pending;
m_consume_pending= 0;
if (m_consume_pending > 0) {
offset = m_consume_pending;
m_consume_pending = 0;

/** ensure that: consume_pending has written into memory */
memory_barrier();
atomic_inc_bytes_read(offset);
}

/** the blocking-read mode */
if (nowait) return MQ_WOULD_BLOCK;
if (nowait) {
return MQ_WOULD_BLOCK;
}

set_wait(m_queue->m_receiver_event);
reset_wait(m_queue->m_receiver_event);
Expand All @@ -310,18 +302,17 @@ MQ_RESULT MQueue_handle::receive(void **datap, uint32 *nbytesp, bool nowait)
{
MQ_RESULT res;
uint32 nbytes, offset;
uint32 rb= 0;
uint32 rb = 0;
/**
* only when m_consume_pending is greater than 1/4 * m_ring_size, we update the read position
* m_read_bytes using m_consume_pending; otherwise, the number of read bytes is firstly
* accumulated to m_consume_pending and then using (m_read_bytes + m_consume_pending) as the
* real read position in ring array.
*
*/
if (m_consume_pending > m_queue->m_ring_size / 4)
{
offset= m_consume_pending;
m_consume_pending= 0;
if (m_consume_pending > m_queue->m_ring_size / 4) {
offset = m_consume_pending;
m_consume_pending = 0;

memory_barrier();
atomic_inc_bytes_read(offset);
Expand All @@ -338,11 +329,12 @@ MQ_RESULT MQueue_handle::receive(void **datap, uint32 *nbytesp, bool nowait)
*/

/** (1) read the message length */
while (!m_length_word_complete)
{
while (!m_length_word_complete) {
DBUG_ASSERT(m_partial_bytes < WORD_LENGTH);
res= receive_bytes(WORD_LENGTH - m_partial_bytes, &rb, &m_buffer[m_partial_bytes], nowait);
if (res != MQ_SUCCESS) return res;
res = receive_bytes(WORD_LENGTH - m_partial_bytes, &rb, &m_buffer[m_partial_bytes], nowait);
if (res != MQ_SUCCESS) {
return res;
}

uint32x2_t v_a = {m_partial_bytes, m_consume_pending};
uint32x2_t v_b = {rb, rb};
Expand All @@ -351,58 +343,57 @@ MQ_RESULT MQueue_handle::receive(void **datap, uint32 *nbytesp, bool nowait)
m_partial_bytes = vget_lane_u32(v_a, 0);
m_consume_pending = vget_lane_u32(v_a, 1);

if (m_partial_bytes >= WORD_LENGTH)
{
if (m_partial_bytes >= WORD_LENGTH) {
DBUG_ASSERT(m_partial_bytes == WORD_LENGTH);
m_expected_bytes= *(uint32 *) m_buffer;
m_length_word_complete= true;
m_partial_bytes= 0;
m_expected_bytes = *(uint32 *) m_buffer;
m_length_word_complete = true;
m_partial_bytes = 0;
}
}
nbytes= m_expected_bytes;
nbytes = m_expected_bytes;

/** re-allocing local buffer when m_buffer_len is smaller than nbytes */
if (m_buffer_len < nbytes
|| DBUG_EVALUATE_IF("pq_mq_error3", true, false))
{
while (m_buffer_len < nbytes)
{
m_buffer_len*= 2;
|| DBUG_EVALUATE_IF("pq_mq_error3", true, false)) {
while (m_buffer_len < nbytes) {
m_buffer_len *= 2;
}
if (m_buffer) destroy(m_buffer);
THD *thd= current_thd;
if (m_buffer) {
destroy(m_buffer);
}
THD *thd = current_thd;
DBUG_ASSERT(!m_queue->m_receiver_event->PQ_caller
|| thd == m_queue->m_receiver_event->PQ_caller);
m_buffer= new (thd->pq_mem_root) char [m_buffer_len];
m_buffer = new (thd->pq_mem_root) char [m_buffer_len];
/** if m_buffer allocates fail, then directly return my_error */
if (!m_buffer || DBUG_EVALUATE_IF("pq_mq_error3", true, false))
{
if (m_buffer == nullptr || DBUG_EVALUATE_IF("pq_mq_error3", true, false)) {
my_error(ER_STD_BAD_ALLOC_ERROR, MYF(0), "", "(MQ::receive)");
return MQ_DETACHED;
}
}

/** (2) read data of nbytes **/
for (;;)
{
for (;;) {
size_t still_needed;
DBUG_ASSERT(m_partial_bytes <= nbytes);

still_needed= nbytes - m_partial_bytes;
res= receive_bytes(still_needed, &rb, &m_buffer[m_partial_bytes], nowait);
if (res != MQ_SUCCESS) return res;

m_partial_bytes+= rb;
m_consume_pending+= rb;
if (m_partial_bytes >= nbytes)
still_needed = nbytes - m_partial_bytes;
res = receive_bytes(still_needed, &rb, &m_buffer[m_partial_bytes], nowait);
if (res != MQ_SUCCESS) {
return res;
}
m_partial_bytes += rb;
m_consume_pending += rb;
if (m_partial_bytes >= nbytes) {
break;
}
}

/** reset for next read */
m_length_word_complete= false;
m_partial_bytes= 0;
m_length_word_complete = false;
m_partial_bytes = 0;

*nbytesp= nbytes;
*datap= m_buffer;
*nbytesp = nbytes;
*datap = m_buffer;
return MQ_SUCCESS;
}
Loading