Skip to content

Commit

Permalink
[vio] PS-269: Initial Percona Server 8.0.12 tree
Browse files Browse the repository at this point in the history
  • Loading branch information
inikep authored and dlenev committed Oct 17, 2024
1 parent c11a23b commit 259513e
Show file tree
Hide file tree
Showing 12 changed files with 494 additions and 35 deletions.
50 changes: 47 additions & 3 deletions include/violite.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@

#include "my_config.h"

#ifdef HAVE_NETINET_IN_H
#include <netinet/in.h>
#endif
#include <stddef.h>
#ifdef HAVE_SYS_SOCKET_H
#include <sys/socket.h>
Expand Down Expand Up @@ -65,6 +68,10 @@ struct Vio;
#include <atomic>
#endif

#if defined(__APPLE__)
#define s6_addr32 __u6_addr.__u6_addr32
#endif

#ifdef HAVE_PSI_INTERFACE
void init_vio_psi_keys();
#endif
Expand Down Expand Up @@ -160,7 +167,8 @@ MYSQL_VIO vio_new_win32shared_memory(HANDLE handle_file_map, HANDLE handle_map,
#endif /* _WIN32 */

void vio_delete(MYSQL_VIO vio);
int vio_shutdown(MYSQL_VIO vio);
int vio_shutdown(MYSQL_VIO vio, int how);
int vio_cancel(MYSQL_VIO vio, int how);
bool vio_reset(MYSQL_VIO vio, enum enum_vio_type type, my_socket sd, void *ssl,
uint flags);
bool vio_is_blocking(Vio *vio);
Expand All @@ -169,6 +177,20 @@ int vio_set_blocking_flag(Vio *vio, bool set_blocking_flag);
size_t vio_read(MYSQL_VIO vio, uchar *buf, size_t size);
size_t vio_read_buff(MYSQL_VIO vio, uchar *buf, size_t size);
size_t vio_write(MYSQL_VIO vio, const uchar *buf, size_t size);

struct st_vio_network {
union {
struct in_addr in;
struct in6_addr in6;
} addr;
union {
struct in_addr in;
struct in6_addr in6;
} mask;
sa_family_t family;
};

void vio_proxy_protocol_add(const st_vio_network &net) noexcept;
/* setsockopt TCP_NODELAY at IPPROTO_TCP level, when possible */
int vio_fastsend(MYSQL_VIO vio);
/* setsockopt SO_KEEPALIVE at SOL_SOCKET level, when possible */
Expand Down Expand Up @@ -198,6 +220,9 @@ ssize_t vio_pending(MYSQL_VIO vio);
#endif
/* Set timeout for a network operation. */
int vio_timeout(MYSQL_VIO vio, uint which, int timeout_sec);
extern void vio_set_wait_callback(void (*before_wait)(void),
void (*after_wait)(void));

/* Connect to a peer. */
bool vio_socket_connect(MYSQL_VIO vio, struct sockaddr *addr, socklen_t len,
bool nonblocking, int timeout,
Expand Down Expand Up @@ -293,7 +318,8 @@ void vio_end(void);
(vio)->viokeepalive(vio, set_keep_alive)
#define vio_should_retry(vio) (vio)->should_retry(vio)
#define vio_was_timeout(vio) (vio)->was_timeout(vio)
#define vio_shutdown(vio) ((vio)->vioshutdown)(vio)
#define vio_shutdown(vio, how) ((vio)->vioshutdown)(vio, how)
#define vio_cancel(vio, how) ((vio)->viocancel)(vio, how)
#define vio_peer_addr(vio, buf, prt, buflen) \
(vio)->peer_addr(vio, buf, prt, buflen)
#define vio_io_wait(vio, event, timeout) (vio)->io_wait(vio, event, timeout)
Expand All @@ -303,6 +329,18 @@ void vio_end(void);
#define vio_set_blocking_flag(vio, val) (vio)->set_blocking_flag(vio, val)
#endif /* !defined(DONT_MAP_VIO) */

#ifdef _WIN32
/*
Set thread id for io cancellation (required on Windows XP only,
and should to be removed if XP is no more supported)
*/

#define vio_set_thread_id(vio, tid) \
if (vio) vio->thread_id = tid
#else
#define vio_set_thread_id(vio, tid)
#endif

/* This enumerator is used in parser - should be always visible */
enum SSL_type {
SSL_TYPE_NOT_SPECIFIED = -1,
Expand Down Expand Up @@ -399,12 +437,18 @@ struct Vio {
further communications can take place, however any related buffers,
descriptors, handles can remain valid after a shutdown.
*/
int (*vioshutdown)(MYSQL_VIO) = {nullptr};
int (*vioshutdown)(MYSQL_VIO, int) = {nullptr};
/**
Partial shutdown. All the actions performed which shutdown performs,
but descriptor remains open and valid.
*/
int (*viocancel)(Vio *, int) = {nullptr};
bool (*is_connected)(MYSQL_VIO) = {nullptr};
bool (*has_data)(MYSQL_VIO) = {nullptr};
int (*io_wait)(MYSQL_VIO, enum enum_vio_io_event, int) = {nullptr};
bool (*connect)(MYSQL_VIO, struct sockaddr *, socklen_t, int) = {nullptr};
#ifdef _WIN32
DWORD thread_id; /* Used on XP only by vio_shutdown() */
#ifdef __clang__
OVERLAPPED overlapped = {0, 0, {{0, 0}}, nullptr};
#else
Expand Down
2 changes: 1 addition & 1 deletion plugin/x/src/ngs/vio_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ sockaddr_storage *Vio_wrapper::peer_addr(std::string *address, uint16_t *port) {

int Vio_wrapper::shutdown() {
MUTEX_LOCK(lock, m_shutdown_mutex);
return vio_shutdown(m_vio);
return vio_shutdown(m_vio, SHUT_RDWR);
}

Vio_wrapper::~Vio_wrapper() {
Expand Down
2 changes: 1 addition & 1 deletion sql/protocol_classic.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3313,7 +3313,7 @@ int Protocol_classic::shutdown(bool) {
m_thd->net.vio->thread_id = m_thd->real_id;
}
#endif /* USE_PPOLL_IN_VIO */
return m_thd->net.vio ? vio_shutdown(m_thd->net.vio) : 0;
return m_thd->net.vio ? vio_shutdown(m_thd->net.vio, SHUT_RDWR) : 0;
}

bool Protocol_classic::store_string(const char *from, size_t length,
Expand Down
4 changes: 2 additions & 2 deletions sql/sql_class.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1967,7 +1967,7 @@ void THD::shutdown_active_vio() {
// invalid thread id.
active_vio->thread_id = real_id;
#endif /* USE_PPOLL_IN_VIO */
vio_shutdown(active_vio);
vio_shutdown(active_vio, SHUT_RDWR);
active_vio = nullptr;
m_SSL = nullptr;
}
Expand All @@ -1985,7 +1985,7 @@ void THD::shutdown_clone_vio() {
// invalid thread id.
clone_vio->thread_id = real_id;
#endif /* USE_PPOLL_IN_VIO */
vio_shutdown(clone_vio);
vio_shutdown(clone_vio, SHUT_RDWR);
clone_vio = nullptr;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ TEST_F(MySQLNetworkProviderTest, NewServerConnectionTest) {
// Vio *active_vio = (Vio *)malloc(sizeof(Vio));
MYSQL_VIO active_vio = vio_new(socket_to_use, VIO_TYPE_TCPIP, 0);
active_vio->mysql_socket.fd = socket_to_use;
active_vio->vioshutdown = [](Vio *) { return 0; };
active_vio->vioshutdown = [](Vio *, int) { return 0; };

fake_thd.set_active_vio(active_vio);

Expand Down
19 changes: 15 additions & 4 deletions vio/vio.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,14 @@
PSI_memory_key key_memory_vio_ssl_fd;
PSI_memory_key key_memory_vio;
PSI_memory_key key_memory_vio_read_buffer;
PSI_memory_key key_memory_vio_proxy_networks;

#ifdef HAVE_PSI_INTERFACE
static PSI_memory_info all_vio_memory[] = {
{&key_memory_vio_ssl_fd, "ssl_fd", 0, 0, PSI_DOCUMENT_ME},
{&key_memory_vio, "vio", 0, 0, PSI_DOCUMENT_ME},
{&key_memory_vio_read_buffer, "read_buffer", 0, 0, PSI_DOCUMENT_ME},
{&key_memory_vio_proxy_networks, "proxy_networks", 0, 0, PSI_DOCUMENT_ME},
};

void init_vio_psi_keys() {
Expand Down Expand Up @@ -113,6 +115,12 @@ Vio::Vio(uint flags) {
VIO_READ_BUFFER_SIZE, MYF(MY_WME));
}

#ifdef _WIN32
bool vio_shared_memory_has_data(Vio *vio) {
return (vio->shared_memory_remain > 0);
}
#endif

Vio::~Vio() {
my_free(read_buffer);
read_buffer = nullptr;
Expand Down Expand Up @@ -235,6 +243,7 @@ static bool vio_init(Vio *vio, enum enum_vio_type type, my_socket sd,
vio->should_retry = vio_should_retry;
vio->was_timeout = vio_was_timeout;
vio->vioshutdown = vio_shutdown_pipe;
vio->viocancel = vio_cancel_pipe;
vio->peer_addr = vio_peer_addr;
vio->io_wait = no_io_wait;
vio->is_connected = vio_is_connected_pipe;
Expand All @@ -255,17 +264,17 @@ static bool vio_init(Vio *vio, enum enum_vio_type type, my_socket sd,
vio->should_retry = vio_should_retry;
vio->was_timeout = vio_was_timeout;
vio->vioshutdown = vio_shutdown_shared_memory;
vio->viocancel = vio_cancel_shared_memory;
vio->peer_addr = vio_peer_addr;
vio->io_wait = no_io_wait;
vio->is_connected = vio_is_connected_shared_memory;
vio->has_data = has_no_data;
vio->has_data = vio_shared_memory_has_data;
vio->is_blocking = vio_is_blocking;
vio->set_blocking = vio_set_blocking;
vio->set_blocking_flag = vio_set_blocking_flag;
vio->is_blocking_flag = true;
break;
#endif /* _WIN32 */

case VIO_TYPE_SSL:
vio->viodelete = vio_ssl_delete;
vio->vioerrno = vio_errno;
Expand All @@ -276,6 +285,7 @@ static bool vio_init(Vio *vio, enum enum_vio_type type, my_socket sd,
vio->should_retry = vio_should_retry;
vio->was_timeout = vio_was_timeout;
vio->vioshutdown = vio_ssl_shutdown;
vio->viocancel = vio_cancel;
vio->peer_addr = vio_peer_addr;
vio->io_wait = vio_io_wait;
vio->is_connected = vio_is_connected;
Expand All @@ -297,6 +307,7 @@ static bool vio_init(Vio *vio, enum enum_vio_type type, my_socket sd,
vio->should_retry = vio_should_retry;
vio->was_timeout = vio_was_timeout;
vio->vioshutdown = vio_shutdown;
vio->viocancel = vio_cancel;
vio->peer_addr = vio_peer_addr;
vio->io_wait = vio_io_wait;
vio->is_connected = vio_is_connected;
Expand Down Expand Up @@ -393,7 +404,7 @@ bool vio_reset(Vio *vio, enum enum_vio_type type, my_socket sd,
Close socket only when it is not equal to the new one.
*/
if (sd != mysql_socket_getfd(vio->mysql_socket)) {
if (vio->inactive == false) vio->vioshutdown(vio);
if (vio->inactive == false) vio->vioshutdown(vio, SHUT_RDWR);
}
#ifdef HAVE_KQUEUE
else {
Expand Down Expand Up @@ -546,7 +557,7 @@ int vio_timeout(Vio *vio, uint which, int timeout_sec) {

void internal_vio_delete(Vio *vio) {
if (!vio) return; /* It must be safe to delete null pointers. */
if (vio->inactive == false) vio->vioshutdown(vio);
if (vio->inactive == false) vio->vioshutdown(vio, SHUT_RDWR);
vio->~Vio();
my_free(vio);
}
Expand Down
10 changes: 7 additions & 3 deletions vio/vio_priv.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,22 @@

extern PSI_memory_key key_memory_vio;
extern PSI_memory_key key_memory_vio_read_buffer;
extern PSI_memory_key key_memory_vio_proxy_networks;

extern PSI_memory_key key_memory_vio_ssl_fd;

#ifdef _WIN32
size_t vio_read_pipe(Vio *vio, uchar *buf, size_t size);
size_t vio_write_pipe(Vio *vio, const uchar *buf, size_t size);
bool vio_is_connected_pipe(Vio *vio);
int vio_shutdown_pipe(Vio *vio);
int vio_shutdown_pipe(Vio *vio, int how);
int vio_cancel_pipe(Vio *vio, int how);

size_t vio_read_shared_memory(Vio *vio, uchar *buf, size_t size);
size_t vio_write_shared_memory(Vio *vio, const uchar *buf, size_t size);
bool vio_is_connected_shared_memory(Vio *vio);
int vio_shutdown_shared_memory(Vio *vio);
int vio_shutdown_shared_memory(Vio *vio, int how);
int vio_cancel_shared_memory(Vio *vio, int how);
void vio_delete_shared_memory(Vio *vio);
#endif /* _WIN32 */

Expand All @@ -62,7 +66,7 @@ size_t vio_ssl_read(Vio *vio, uchar *buf, size_t size);
size_t vio_ssl_write(Vio *vio, const uchar *buf, size_t size);

/* When the workday is over... */
int vio_ssl_shutdown(Vio *vio);
int vio_ssl_shutdown(Vio *vio, int how);
void vio_ssl_delete(Vio *vio);
bool vio_ssl_has_data(Vio *vio);

Expand Down
41 changes: 39 additions & 2 deletions vio/viopipe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ static size_t wait_overlapped_result(Vio *vio, int timeout) {
timeout_ms = timeout >= 0 ? timeout : INFINITE;

/* Wait for the overlapped operation to be completed. */
wait_status = WaitForSingleObject(vio->overlapped.hEvent, timeout_ms);
wait_status = WaitForSingleObjectEx(vio->overlapped.hEvent, timeout_ms, TRUE);

/* The operation might have completed, attempt to retrieve the result. */
if (wait_status == WAIT_OBJECT_0) {
Expand All @@ -54,11 +54,28 @@ static size_t wait_overlapped_result(Vio *vio, int timeout) {
return ret;
}

/*
Disable posting IO completion event to the port.
In some cases (synchronous timed IO) we want to skip IOCP notifications.
*/
static void disable_iocp_notification(OVERLAPPED *overlapped) {
HANDLE *handle = &(overlapped->hEvent);
*handle = ((HANDLE)((ULONG_PTR)*handle | 1));
}

/* Enable posting IO completion event to the port */
static void enable_iocp_notification(OVERLAPPED *overlapped) {
HANDLE *handle = &(overlapped->hEvent);
*handle = (HANDLE)((ULONG_PTR)*handle & ~1);
}

size_t vio_read_pipe(Vio *vio, uchar *buf, size_t count) {
DWORD transferred;
size_t ret = (size_t)-1;
DBUG_TRACE;

disable_iocp_notification(&vio->pipe_overlapped);

/* Attempt to read from the pipe (overlapped I/O). */
if (ReadFile(vio->hPipe, buf, (DWORD)count, &transferred, &vio->overlapped)) {
/* The operation completed immediately. */
Expand All @@ -68,6 +85,8 @@ size_t vio_read_pipe(Vio *vio, uchar *buf, size_t count) {
else if (GetLastError() == ERROR_IO_PENDING)
ret = wait_overlapped_result(vio, vio->read_timeout);

enable_iocp_notification(&vio->pipe_overlapped);

return ret;
}

Expand All @@ -76,6 +95,8 @@ size_t vio_write_pipe(Vio *vio, const uchar *buf, size_t count) {
size_t ret = (size_t)-1;
DBUG_TRACE;

disable_iocp_notification(&vio->pipe_overlapped);

/* Attempt to write to the pipe (overlapped I/O). */
if (WriteFile(vio->hPipe, buf, (DWORD)count, &transferred,
&vio->overlapped)) {
Expand All @@ -86,6 +107,8 @@ size_t vio_write_pipe(Vio *vio, const uchar *buf, size_t count) {
else if (GetLastError() == ERROR_IO_PENDING)
ret = wait_overlapped_result(vio, vio->write_timeout);

enable_iocp_notification(&vio->pipe_overlapped);

return ret;
}

Expand All @@ -95,7 +118,7 @@ bool vio_is_connected_pipe(Vio *vio) {
return (GetLastError() != ERROR_BROKEN_PIPE);
}

int vio_shutdown_pipe(Vio *vio) {
int vio_shutdown_pipe(Vio *vio, int how) {
BOOL ret = FALSE;
DBUG_TRACE;

Expand All @@ -113,3 +136,17 @@ int vio_shutdown_pipe(Vio *vio) {

return ret;
}

int vio_cancel_pipe(Vio *vio, int how) {
DBUG_ENTER("vio_shutdown_pipe");

CancelIo(vio->hPipe);
CloseHandle(vio->overlapped.hEvent);
DisconnectNamedPipe(vio->hPipe);

vio->inactive = TRUE;

DBUG_RETURN(0);
}

#endif
16 changes: 15 additions & 1 deletion vio/vioshm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ void vio_delete_shared_memory(Vio *vio) {
All handles are closed and the VIO is cleaned up when vio_delete() is
called and this completes the vio cleanup operation in its entirety.
*/
int vio_shutdown_shared_memory(Vio *vio) {
int vio_shutdown_shared_memory(Vio *vio, int how) {
DBUG_TRACE;
if (vio->inactive == false) {
/*
Expand All @@ -208,3 +208,17 @@ int vio_shutdown_shared_memory(Vio *vio) {

return 0;
}

int vio_cancel_shared_memory(Vio *vio, int how) {
DBUG_ENTER("vio_cancel_shred_memory");
if (!vio->inactive) {
/*
Set event_conn_closed for notification of both client and server that
connection is closed
*/
SetEvent(vio->event_conn_closed);
vio->inactive = true;
}

DBUG_RETURN(0);
}
Loading

0 comments on commit 259513e

Please sign in to comment.