From 2937bf5e7612e7ed495178b8e7c2336c16c03224 Mon Sep 17 00:00:00 2001 From: Yossi Itigin Date: Tue, 22 Dec 2020 19:55:45 +0200 Subject: [PATCH] TOOLS/VFS: Implement mount daemon server loop --- src/tools/vfs/Makefile.am | 2 +- src/tools/vfs/vfs_daemon.h | 6 + src/tools/vfs/vfs_main.c | 120 +++++++++++- src/tools/vfs/vfs_server.c | 375 +++++++++++++++++++++++++++++++++++++ 4 files changed, 500 insertions(+), 3 deletions(-) create mode 100644 src/tools/vfs/vfs_server.c diff --git a/src/tools/vfs/Makefile.am b/src/tools/vfs/Makefile.am index a560c0a4de9..62408f8aea7 100644 --- a/src/tools/vfs/Makefile.am +++ b/src/tools/vfs/Makefile.am @@ -9,7 +9,7 @@ if HAVE_FUSE3 bin_PROGRAMS = ucx_vfs ucx_vfs_CPPFLAGS = $(BASE_CPPFLAGS) $(FUSE3_CPPFLAGS) ucx_vfs_CFLAGS = $(BASE_CFLAGS) -ucx_vfs_SOURCES = vfs_main.c +ucx_vfs_SOURCES = vfs_main.c vfs_server.c noinst_HEADERS = vfs_daemon.h ucx_vfs_LDADD = $(FUSE3_LIBS) \ $(top_builddir)/src/ucs/vfs/sock/libucs_vfs_sock.la diff --git a/src/tools/vfs/vfs_daemon.h b/src/tools/vfs/vfs_daemon.h index 523564d9d34..094a1e182c1 100644 --- a/src/tools/vfs/vfs_daemon.h +++ b/src/tools/vfs/vfs_daemon.h @@ -52,4 +52,10 @@ typedef struct { extern vfs_opts_t g_opts; extern const char *vfs_action_names[]; +int vfs_mount(int pid); + +int vfs_unmount(int pid); + +int vfs_server_loop(int listen_fd); + #endif diff --git a/src/tools/vfs/vfs_main.c b/src/tools/vfs/vfs_main.c index 59632087a0b..dc693b7232b 100644 --- a/src/tools/vfs/vfs_main.c +++ b/src/tools/vfs/vfs_main.c @@ -99,6 +99,122 @@ static int vfs_run_fusermount(char **extra_argv) return 0; } +static void vfs_get_mountpoint(pid_t pid, char *mountpoint, size_t max_length) +{ + snprintf(mountpoint, max_length, "%s/%d", g_opts.mountpoint_dir, pid); +} + +static const char *vfs_get_process_name(int pid, char *buf, size_t max_length) +{ + char procfs_comm[NAME_MAX]; + size_t length; + FILE *file; + char *p; + + /* open /proc//comm to read command name */ + snprintf(procfs_comm, sizeof(procfs_comm), "/proc/%d/comm", pid); + file = fopen(procfs_comm, "r"); + if (file == NULL) { + goto err; + } + + /* read command to buffer */ + if (fgets(buf, max_length, file) == NULL) { + goto err_close; + } + + /* remove trailing space/newline */ + length = strlen(buf); + for (p = &buf[length - 1]; (p >= buf) && isspace(*p); --p) { + *p = '\0'; + --length; + } + + /* append process id */ + snprintf(buf + length, max_length - length, "@pid:%d", pid); + goto out; + +err_close: + fclose(file); +err: + snprintf(buf, max_length, "pid:%d", pid); +out: + return buf; +} + +int vfs_mount(int pid) +{ + char mountpoint[PATH_MAX]; + char mountopts[1024]; + char name[NAME_MAX]; + int fuse_fd, ret; + + /* Add common mount options: + * - File system name (source) : process name and pid + * - File system type : ucx_vfs + * - Enable permissions check : yes + * - Direct IO (no caching) : yes + */ + ret = snprintf( + mountopts, sizeof(mountopts), + "fsname=%s,subtype=ucx_vfs,default_permissions,direct_io%s%s", + vfs_get_process_name(pid, name, sizeof(name)), + (strlen(g_opts.mount_opts) > 0) ? "," : "", g_opts.mount_opts); + if (ret >= sizeof(mountopts)) { + return -ENOMEM; + } + + /* Create the mount point directory, and ignore "already exists" error */ + vfs_get_mountpoint(pid, mountpoint, sizeof(mountpoint)); + ret = mkdir(mountpoint, S_IRWXU); + if ((ret < 0) && (errno != EEXIST)) { + ret = -errno; + vfs_error("failed to create directory '%s': %m", mountpoint); + return ret; + } + + /* Mount a new FUSE filesystem in the mount point directory */ + vfs_log("mounting directory '%s' with options '%s'", mountpoint, mountopts); + fuse_fd = fuse_open_channel(mountpoint, mountopts); + if (fuse_fd < 0) { + vfs_error("fuse_open_channel(%s,opts=%s) failed: %m", mountpoint, + mountopts); + return fuse_fd; + } + + vfs_log("mounted directory '%s' with fd %d", mountpoint, fuse_fd); + return fuse_fd; +} + +int vfs_unmount(int pid) +{ + char mountpoint[PATH_MAX]; + char *argv[5]; + int ret; + + /* Unmount FUSE file system */ + vfs_get_mountpoint(pid, mountpoint, sizeof(mountpoint)); + argv[0] = "-u"; + argv[1] = "-z"; + argv[2] = "--"; + argv[3] = mountpoint; + argv[4] = NULL; + ret = vfs_run_fusermount(argv); + if (ret < 0) { + return ret; + } + + /* Remove mount point directory */ + vfs_log("removing directory '%s'", mountpoint); + ret = rmdir(mountpoint); + if (ret < 0) { + vfs_error("failed to remove directory '%s': %m", mountpoint); + return ret; + } + + return 0; +} + static int vfs_unlink_socket(int silent_notexist) { int ret; @@ -155,8 +271,8 @@ static int vfs_listen(int silent_addinuse_err) goto out_unlink; } - vfs_error("closing socket, TODO implement listening for connections"); - close(listen_fd); + vfs_log("listening for connections on '%s'", g_sockaddr.sun_path); + ret = vfs_server_loop(listen_fd); out_unlink: vfs_unlink_socket(0); diff --git a/src/tools/vfs/vfs_server.c b/src/tools/vfs/vfs_server.c new file mode 100644 index 00000000000..ac724b303b2 --- /dev/null +++ b/src/tools/vfs/vfs_server.c @@ -0,0 +1,375 @@ +/** + * Copyright (C) Mellanox Technologies Ltd. 2020. ALL RIGHTS RESERVED. + * + * See file LICENSE for terms. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "vfs_daemon.h" + +#include +#include +#include + + +#define VFS_MAX_FDS 1024 + +typedef enum { + VFS_FD_STATE_LISTENING, + VFS_FD_STATE_ACCEPTED, + VFS_FD_STATE_MOUNTED, + VFS_FD_STATE_FD_SENT, + VFS_FD_STATE_CLOSED +} vfs_socket_state_t; + +typedef struct { + vfs_socket_state_t state; + pid_t pid; + int fuse_fd; +} vfs_serever_fd_state_t; + +typedef struct { + vfs_serever_fd_state_t fd_state[VFS_MAX_FDS]; + struct pollfd poll_fds[VFS_MAX_FDS]; + int nfds; + int stop; +} vfs_server_context_t; + +static vfs_server_context_t vfs_server_context; + +static const char *vfs_server_fd_state_names[] = { + [VFS_FD_STATE_LISTENING] = "LISTENING", + [VFS_FD_STATE_ACCEPTED] = "ACCEPTED", + [VFS_FD_STATE_MOUNTED] = "MOUNTED", + [VFS_FD_STATE_FD_SENT] = "FD_SENT", + [VFS_FD_STATE_CLOSED] = "CLOSED" +}; + +static void vfs_server_log_context(int events) +{ + vfs_serever_fd_state_t *fd_state; + char log_message[1024]; + struct pollfd *pfd; + char *p, *endp; + int idx; + + if (g_opts.verbose < 2) { + return; + } + + p = log_message; + endp = log_message + sizeof(log_message); + + for (idx = 0; idx < vfs_server_context.nfds; ++idx) { + pfd = &vfs_server_context.poll_fds[idx]; + fd_state = &vfs_server_context.fd_state[idx]; + snprintf(p, endp - p, "[%d]{%c %d%s%s %d} ", idx, + vfs_server_fd_state_names[fd_state->state][0], + vfs_server_context.poll_fds[idx].fd, + (events && (pfd->revents & POLLIN)) ? "i" : "", + (events && (pfd->revents & POLLOUT)) ? "o" : "", + fd_state->pid); + p += strlen(p); + } + + *(p - 1) = '\0'; + + vfs_log("%s", log_message); +} + +static int vfs_server_poll_events() +{ + int ret; + + vfs_server_log_context(0); + + ret = poll(vfs_server_context.poll_fds, vfs_server_context.nfds, -1); + if (ret < 0) { + ret = -errno; + if (errno != EINTR) { + vfs_error("poll(nfds=%d) failed: %m", vfs_server_context.nfds) + } + return ret; + } + + vfs_server_log_context(1); + return 0; +} + +static void vfs_server_close_fd(int fd) +{ + int ret = close(fd); + if (ret < 0) { + vfs_error("failed to close fd %d: %m", fd); + } +} + +static void vfs_server_log_fd(int idx, const char *message) +{ + vfs_serever_fd_state_t *fd_state = &vfs_server_context.fd_state[idx]; + struct pollfd *pfd = &vfs_server_context.poll_fds[idx]; + + vfs_log("%s fd[%d]=%d %s, pid: %d fuse_fd: %d", message, idx, pfd->fd, + vfs_server_fd_state_names[fd_state->state], fd_state->fuse_fd, + fd_state->pid); +} + +static int vfs_server_add_fd(int fd, vfs_socket_state_t state) +{ + int idx, ret; + + ret = fcntl(fd, F_GETFL); + if (ret < 0) { + vfs_error("fcntl(%d, F_GETFL) failed: %m", fd); + return -errno; + } + + ret = fcntl(fd, F_SETFL, ret | O_NONBLOCK); + if (ret < 0) { + vfs_error("fcntl(%d, F_SETFL) failed: %m", fd); + return -errno; + } + + idx = vfs_server_context.nfds++; + vfs_server_context.fd_state[idx].state = state; + vfs_server_context.fd_state[idx].pid = -1; + vfs_server_context.fd_state[idx].fuse_fd = -1; + vfs_server_context.poll_fds[idx].events = POLLIN; + vfs_server_context.poll_fds[idx].fd = fd; + vfs_server_context.poll_fds[idx].revents = 0; + + vfs_server_log_fd(idx, "added"); + return 0; +} + +static void vfs_server_remove_fd(int idx) +{ + vfs_server_log_fd(idx, "removing"); + + switch (vfs_server_context.fd_state[idx].state) { + case VFS_FD_STATE_FD_SENT: + case VFS_FD_STATE_MOUNTED: + vfs_server_close_fd(vfs_server_context.fd_state[idx].fuse_fd); + vfs_unmount(vfs_server_context.fd_state[idx].pid); + /* Fall through */ + case VFS_FD_STATE_ACCEPTED: + vfs_server_close_fd(vfs_server_context.poll_fds[idx].fd); + /* Fall through */ + default: + break; + } + + vfs_server_context.fd_state[idx].state = VFS_FD_STATE_CLOSED; + vfs_server_context.fd_state[idx].pid = -1; + vfs_server_context.fd_state[idx].fuse_fd = -1; + vfs_server_context.poll_fds[idx].events = 0; + vfs_server_context.poll_fds[idx].fd = -1; + vfs_server_context.poll_fds[idx].revents = 0; +} + +static void vfs_server_remove_all_fds() +{ + while (vfs_server_context.nfds > 0) { + vfs_server_remove_fd(--vfs_server_context.nfds); + } +} + +static void vfs_server_accept(int listen_fd) +{ + int ret, connfd; + + connfd = accept(listen_fd, NULL, NULL); + if (connfd < 0) { + vfs_error("accept(listen_fd=%d) failed: %m", listen_fd); + return; + } + + ret = ucs_vfs_sock_setopt_passcred(connfd); + if (ret < 0) { + close(connfd); + return; + } + + vfs_server_add_fd(connfd, VFS_FD_STATE_ACCEPTED); +} + +static void vfs_server_mount(int idx, pid_t pid) +{ + int fuse_fd; + + if (pid < 0) { + vfs_error("received invalid pid: %d", pid); + vfs_server_close_fd(idx); + return; + } + + fuse_fd = vfs_mount(pid); + if (fuse_fd < 0) { + vfs_server_close_fd(idx); + return; + } + + vfs_server_context.fd_state[idx].state = VFS_FD_STATE_MOUNTED; + vfs_server_context.fd_state[idx].pid = pid; + vfs_server_context.fd_state[idx].fuse_fd = fuse_fd; + vfs_server_context.poll_fds[idx].events |= POLLOUT; +} + +static void vfs_server_recv(int idx) +{ + ucs_vfs_sock_message_t vfs_msg_in; + char message[64]; + int ret; + + ret = ucs_vfs_sock_recv(vfs_server_context.poll_fds[idx].fd, &vfs_msg_in); + if (ret < 0) { + vfs_error("failed to receive a message: %d (%s)", ret, strerror(-ret)); + vfs_server_remove_fd(idx); + return; + } + + snprintf(message, sizeof(message), "got action '%s' on", + vfs_action_names[vfs_msg_in.action]); + vfs_server_log_fd(idx, message); + + switch (vfs_msg_in.action) { + case UCS_VFS_SOCK_ACTION_STOP: + vfs_server_context.stop = 1; + break; + case UCS_VFS_SOCK_ACTION_MOUNT: + vfs_server_mount(idx, vfs_msg_in.pid); + break; + case UCS_VFS_SOCK_ACTION_NOP: + vfs_server_remove_fd(idx); + break; + default: + vfs_error("ignoring invalid action %d", vfs_msg_in.action); + vfs_server_remove_fd(idx); + break; + } +} + +static void vfs_server_handle_pollin(int idx) +{ + switch (vfs_server_context.fd_state[idx].state) { + case VFS_FD_STATE_LISTENING: + vfs_server_accept(vfs_server_context.poll_fds[idx].fd); + break; + case VFS_FD_STATE_ACCEPTED: + vfs_server_recv(idx); + break; + case VFS_FD_STATE_FD_SENT: + vfs_server_remove_fd(idx); + break; + default: + vfs_server_log_fd(idx, "unexpected POLLIN event on"); + vfs_server_remove_fd(idx); + break; + } +} + +static void vfs_server_handle_pollout(int idx) +{ + ucs_vfs_sock_message_t vfs_msg_out; + int ret; + + if (vfs_server_context.fd_state[idx].state != VFS_FD_STATE_MOUNTED) { + vfs_server_log_fd(idx, "unexpected POLLOUT event on"); + vfs_server_remove_fd(idx); + return; + } + + /* Send reply with file descriptor from fuse mount */ + vfs_msg_out.action = UCS_VFS_SOCK_ACTION_MOUNT_REPLY; + vfs_msg_out.fd = vfs_server_context.fd_state[idx].fuse_fd; + ret = ucs_vfs_sock_send(vfs_server_context.poll_fds[idx].fd, &vfs_msg_out); + if (ret < 0) { + vfs_error("failed to send a message: %d", ret); + vfs_server_remove_fd(idx); + return; + } + + vfs_server_log_fd(idx, "sent fuse_fd on"); + vfs_server_context.fd_state[idx].state = VFS_FD_STATE_FD_SENT; + vfs_server_context.poll_fds[idx].events &= ~POLLOUT; +} + +static void vfs_server_copy_fd_state(int dest_idx, int src_idx) +{ + if (dest_idx != src_idx) { + vfs_server_context.fd_state[dest_idx] = + vfs_server_context.fd_state[src_idx]; + vfs_server_context.poll_fds[dest_idx] = + vfs_server_context.poll_fds[src_idx]; + } +} + +static void vfs_server_sighandler(int signo) +{ + vfs_server_context.stop = 1; +} + +static void vfs_server_set_sighandler() +{ + struct sigaction sigact; + + sigact.sa_handler = vfs_server_sighandler; + sigact.sa_flags = 0; + sigemptyset(&sigact.sa_mask); + + sigaction(SIGINT, &sigact, NULL); + sigaction(SIGHUP, &sigact, NULL); + sigaction(SIGTERM, &sigact, NULL); +} + +int vfs_server_loop(int listen_fd) +{ + int idx, valid_idx; + int ret; + + vfs_server_context.nfds = 0; + vfs_server_context.stop = 0; + + vfs_server_set_sighandler(); + + vfs_server_add_fd(listen_fd, VFS_FD_STATE_LISTENING); + + while (!vfs_server_context.stop) { + ret = vfs_server_poll_events(); + if (ret < 0) { + if (ret == -EINTR) { + continue; + } else { + return ret; + } + } + + valid_idx = 0; + for (idx = 0; idx < vfs_server_context.nfds; ++idx) { + if (vfs_server_context.poll_fds[idx].events == 0) { + vfs_server_copy_fd_state(valid_idx++, idx); + continue; + } + + if (vfs_server_context.poll_fds[idx].revents & POLLIN) { + vfs_server_handle_pollin(idx); + } + if (vfs_server_context.poll_fds[idx].revents & POLLOUT) { + vfs_server_handle_pollout(idx); + } + + if (vfs_server_context.fd_state[idx].state != VFS_FD_STATE_CLOSED) { + vfs_server_copy_fd_state(valid_idx++, idx); + } + } + + vfs_server_context.nfds = valid_idx; + } + + vfs_server_remove_all_fds(); + + return 0; +}