Skip to content

Commit

Permalink
Share file offset across processes
Browse files Browse the repository at this point in the history
This change ensures that if a file descriptor for an open disk file gets
shared by multiple processes within a process tree, then lseek() changes
will be visible across processes, and read() / write() are synchronized.
Note this only applies to Windows, because UNIX kernels already do this.
  • Loading branch information
jart committed Aug 3, 2024
1 parent a80ab3f commit 761c6ad
Show file tree
Hide file tree
Showing 15 changed files with 256 additions and 63 deletions.
5 changes: 4 additions & 1 deletion libc/calls/close-nt.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
│ PERFORMANCE OF THIS SOFTWARE. │
╚─────────────────────────────────────────────────────────────────────────────*/
#include "libc/calls/internal.h"
#include "libc/intrin/fds.h"
#include "libc/calls/syscall-nt.internal.h"
#include "libc/calls/syscall_support-nt.internal.h"
#include "libc/intrin/fds.h"
#include "libc/intrin/weaken.h"
#include "libc/nt/enum/filetype.h"
#include "libc/nt/files.h"
#include "libc/nt/runtime.h"
#include "libc/runtime/runtime.h"
#include "libc/runtime/zipos.internal.h"
#include "libc/sock/syscall_fd.internal.h"
#include "libc/sysv/consts/o.h"
Expand Down Expand Up @@ -64,5 +65,7 @@ textwindows int sys_close_nt(int fd, int fildes) {
default:
break;
}
if (f->shared && !f->isdup)
munmap(f->shared, sizeof(struct Cursor));
return CloseHandle(f->handle) ? 0 : __winerr();
}
2 changes: 2 additions & 0 deletions libc/calls/dup-nt.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ static textwindows int sys_dup_nt_impl(int oldfd, int newfd, int flags,

g_fds.p[newfd] = g_fds.p[oldfd];
g_fds.p[newfd].handle = handle;
g_fds.p[newfd].isdup = true;
g_fds.p[oldfd].isdup = true; // TODO(jart): is it possible to avoid leak?
if (flags & _O_CLOEXEC) {
g_fds.p[newfd].flags |= _O_CLOEXEC;
} else {
Expand Down
15 changes: 10 additions & 5 deletions libc/calls/fcntl-nt.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
#include "libc/calls/calls.h"
#include "libc/calls/createfileflags.internal.h"
#include "libc/calls/internal.h"
#include "libc/intrin/fds.h"
#include "libc/calls/struct/flock.h"
#include "libc/calls/struct/sigset.internal.h"
#include "libc/calls/syscall-nt.internal.h"
#include "libc/calls/syscall_support-nt.internal.h"
#include "libc/calls/wincrash.internal.h"
#include "libc/errno.h"
#include "libc/intrin/fds.h"
#include "libc/intrin/kprintf.h"
#include "libc/intrin/weaken.h"
#include "libc/limits.h"
Expand Down Expand Up @@ -151,7 +151,7 @@ static textwindows int sys_fcntl_nt_lock(struct Fd *f, int fd, int cmd,
case SEEK_SET:
break;
case SEEK_CUR:
off = f->pointer + off;
off = f->shared->pointer + off;
break;
case SEEK_END: {
int64_t size;
Expand Down Expand Up @@ -351,9 +351,14 @@ textwindows int sys_fcntl_nt(int fd, int cmd, uintptr_t arg) {
}
rc = 0;
} else if (cmd == F_SETLK || cmd == F_SETLKW || cmd == F_GETLK) {
pthread_mutex_lock(&g_locks.mu);
rc = sys_fcntl_nt_lock(g_fds.p + fd, fd, cmd, arg);
pthread_mutex_unlock(&g_locks.mu);
struct Fd *f = g_fds.p + fd;
if (f->shared) {
pthread_mutex_lock(&g_locks.mu);
rc = sys_fcntl_nt_lock(f, fd, cmd, arg);
pthread_mutex_unlock(&g_locks.mu);
} else {
rc = ebadf();
}
} else if (cmd == F_DUPFD || cmd == F_DUPFD_CLOEXEC) {
rc = sys_fcntl_nt_dupfd(fd, cmd, arg);
} else {
Expand Down
13 changes: 8 additions & 5 deletions libc/calls/lseek-nt.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ static textwindows int64_t GetPosition(struct Fd *f, int whence) {
case SEEK_SET:
return 0;
case SEEK_CUR:
return f->pointer;
return f->shared->pointer;
case SEEK_END: {
struct NtByHandleFileInformation wst;
if (!GetFileInformationByHandle(f->handle, &wst)) {
Expand Down Expand Up @@ -67,11 +67,14 @@ textwindows int64_t sys_lseek_nt(int fd, int64_t offset, int whence) {
} else if (__isfdkind(fd, kFdFile)) {
struct Fd *f = g_fds.p + fd;
int filetype = GetFileType(f->handle);
if (filetype != kNtFileTypePipe && filetype != kNtFileTypeChar) {
if (filetype != kNtFileTypePipe && //
filetype != kNtFileTypeChar && //
f->shared) {
int64_t res;
if ((res = Seek(f, offset, whence)) != -1) {
f->pointer = res;
}
__fd_lock(f);
if ((res = Seek(f, offset, whence)) != -1)
f->shared->pointer = res;
__fd_unlock(f);
return res;
} else {
return espipe();
Expand Down
5 changes: 3 additions & 2 deletions libc/calls/open-nt.c
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ static textwindows int sys_open_nt_file(int dirfd, const char *file,
int64_t handle;
if ((handle = sys_open_nt_impl(dirfd, file, flags, mode,
kNtFileFlagOverlapped)) != -1) {
g_fds.p[fd].shared = __cursor_new();
g_fds.p[fd].handle = handle;
g_fds.p[fd].kind = kFdFile;
g_fds.p[fd].flags = flags;
Expand Down Expand Up @@ -170,14 +171,14 @@ static textwindows int sys_open_nt_no_handle(int fd, int flags, int mode,

static textwindows int sys_open_nt_dup(int fd, int flags, int mode, int oldfd) {
int64_t handle;
if (!__isfdopen(oldfd)) {
if (!__isfdopen(oldfd))
return enoent();
}
if (DuplicateHandle(GetCurrentProcess(), g_fds.p[oldfd].handle,
GetCurrentProcess(), &handle, 0, true,
kNtDuplicateSameAccess)) {
g_fds.p[fd] = g_fds.p[oldfd];
g_fds.p[fd].handle = handle;
g_fds.p[fd].isdup = true;
g_fds.p[fd].mode = mode;
if (!sys_fcntl_nt_setfl(fd, flags)) {
return fd;
Expand Down
51 changes: 31 additions & 20 deletions libc/calls/readwrite-nt.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
#include "libc/calls/createfileflags.internal.h"
#include "libc/calls/internal.h"
#include "libc/calls/sig.internal.h"
#include "libc/intrin/fds.h"
#include "libc/calls/struct/sigset.h"
#include "libc/calls/syscall_support-nt.internal.h"
#include "libc/intrin/fds.h"
#include "libc/intrin/weaken.h"
#include "libc/nt/enum/filetype.h"
#include "libc/nt/errors.h"
Expand Down Expand Up @@ -51,29 +51,28 @@ sys_readwrite_nt(int fd, void *data, size_t size, ssize_t offset,
uint32_t exchanged;
struct Fd *f = g_fds.p + fd;

// win32 i/o apis generally take 32-bit values thus we implicitly
// truncate outrageously large sizes. linux actually does it too!
size = MIN(size, 0x7ffff000);

// pread() and pwrite() perform an implicit lseek() operation, so
// similar to the lseek() system call, they too raise ESPIPE when
// operating on a non-seekable file.
bool pwriting = offset != -1;
bool seekable =
(f->kind == kFdFile && GetFileType(handle) == kNtFileTypeDisk) ||
f->kind == kFdDevNull || f->kind == kFdDevRandom;
if (pwriting && !seekable) {
bool isdisk = f->kind == kFdFile && GetFileType(handle) == kNtFileTypeDisk;
bool seekable = isdisk || f->kind == kFdDevNull || f->kind == kFdDevRandom;
if (pwriting && !seekable)
return espipe();
}

// determine if we need to lock a file descriptor across processes
bool locked = isdisk && !pwriting && f->shared;
if (locked)
__fd_lock(f);

// when a file is opened in overlapped mode win32 requires that we
// take over full responsibility for managing our own file pointer
// which is fine, because the one win32 has was never very good in
// the sense that it behaves so differently from linux, that using
// win32 i/o required more compatibilty toil than doing it by hand
if (!pwriting) {
if (seekable) {
offset = f->pointer;
if (seekable && f->shared) {
offset = f->shared->pointer;
} else {
offset = 0;
}
Expand All @@ -82,8 +81,11 @@ sys_readwrite_nt(int fd, void *data, size_t size, ssize_t offset,
RestartOperation:
bool eagained = false;
// check for signals and cancelation
if (_check_cancel() == -1)
if (_check_cancel() == -1) {
if (locked)
__fd_unlock(f);
return -1; // ECANCELED
}
if (_weaken(__sig_get) && (sig = _weaken(__sig_get)(waitmask))) {
goto HandleInterrupt;
}
Expand Down Expand Up @@ -114,40 +116,49 @@ sys_readwrite_nt(int fd, void *data, size_t size, ssize_t offset,
}
ok = true;
}
if (ok) {
if (ok)
ok = GetOverlappedResult(handle, &overlap, &exchanged, true);
}
CloseHandle(overlap.hEvent);

// if i/o succeeded then return its result
if (ok) {
if (!pwriting && seekable) {
f->pointer = offset + exchanged;
}
if (!pwriting && seekable && f->shared)
f->shared->pointer = offset + exchanged;
if (locked)
__fd_unlock(f);
return exchanged;
}

// only raise EINTR or EAGAIN if I/O got canceled
if (GetLastError() == kNtErrorOperationAborted) {
// raise EAGAIN if it's due to O_NONBLOCK mmode
if (eagained) {
if (locked)
__fd_unlock(f);
return eagain();
}
// otherwise it must be due to a kill() via __sig_cancel()
if (_weaken(__sig_relay) && (sig = _weaken(__sig_get)(waitmask))) {
HandleInterrupt:
if (locked)
__fd_unlock(f);
int handler_was_called = _weaken(__sig_relay)(sig, SI_KERNEL, waitmask);
if (_check_cancel() == -1)
return -1; // possible if we SIGTHR'd
if (locked)
__fd_lock(f);
// read() is @restartable unless non-SA_RESTART hands were called
if (!(handler_was_called & SIG_HANDLED_NO_RESTART)) {
if (!(handler_was_called & SIG_HANDLED_NO_RESTART))
goto RestartOperation;
}
}
if (locked)
__fd_unlock(f);
return eintr();
}

// read() and write() have generally different error-handling paths
if (locked)
__fd_unlock(f);
return -2;
}

Expand Down
22 changes: 20 additions & 2 deletions libc/intrin/fds.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
│ TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR │
│ PERFORMANCE OF THIS SOFTWARE. │
╚─────────────────────────────────────────────────────────────────────────────*/
#include "libc/intrin/fds.h"
#include "libc/calls/internal.h"
#include "libc/calls/state.internal.h"
#include "libc/calls/ttydefaults.h"
#include "libc/dce.h"
#include "libc/intrin/atomic.h"
#include "libc/intrin/extend.h"
#include "libc/intrin/fds.h"
#include "libc/intrin/kprintf.h"
#include "libc/intrin/nomultics.h"
#include "libc/intrin/pushpop.h"
Expand All @@ -40,6 +40,7 @@
#include "libc/sock/sock.h"
#include "libc/sysv/consts/map.h"
#include "libc/sysv/consts/o.h"
#include "libc/sysv/consts/prot.h"
#include "libc/thread/thread.h"

#define OPEN_MAX 16
Expand Down Expand Up @@ -156,12 +157,29 @@ textstartup void __init_fds(int argc, char **argv, char **envp) {
f->kind = kind;
f->flags = flags;
f->mode = mode;
f->pointer = pointer;
f->type = type;
f->family = family;
f->protocol = protocol;
atomic_store_explicit(&fds->f, fd + 1, memory_order_relaxed);

//
// - v1 abi: This field was originally the file pointer.
//
// - v2 abi: This field is the negated shared memory address.
//
if (f->kind == kFdFile) {
if (pointer < 0) {
f->shared = (struct Cursor *)(uintptr_t)-pointer;
} else if ((f->shared = __cursor_new())) {
f->shared->pointer = pointer;
}
}
}
}
for (int i = 0; i < 3; ++i) {
struct Fd *f = fds->p + i;
if (f->kind == kFdFile && !f->shared)
f->shared = __cursor_new();
}
}
}
14 changes: 13 additions & 1 deletion libc/intrin/fds.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#ifndef COSMOPOLITAN_LIBC_CALLS_STRUCT_FD_INTERNAL_H_
#define COSMOPOLITAN_LIBC_CALLS_STRUCT_FD_INTERNAL_H_
#include "libc/atomic.h"
#include "libc/thread/thread.h"
COSMOPOLITAN_C_START_

#define kFdEmpty 0
Expand All @@ -13,19 +15,25 @@ COSMOPOLITAN_C_START_
#define kFdDevNull 9
#define kFdDevRandom 10

struct Cursor {
pthread_mutex_t lock;
long pointer;
};

struct Fd {
char kind;
bool isdup;
bool isbound;
unsigned flags;
unsigned mode;
long handle;
long pointer;
int family;
int type;
int protocol;
unsigned rcvtimeo; /* millis; 0 means wait forever */
unsigned sndtimeo; /* millis; 0 means wait forever */
void *connect_op;
struct Cursor *shared;
};

struct Fds {
Expand All @@ -34,5 +42,9 @@ struct Fds {
struct Fd *p, *e;
};

void __fd_lock(struct Fd *);
void __fd_unlock(struct Fd *);
struct Cursor *__cursor_new(void);

COSMOPOLITAN_C_END_
#endif /* COSMOPOLITAN_LIBC_CALLS_STRUCT_FD_INTERNAL_H_ */
22 changes: 22 additions & 0 deletions libc/intrin/fds_lock.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
│ PERFORMANCE OF THIS SOFTWARE. │
╚─────────────────────────────────────────────────────────────────────────────*/
#include "libc/calls/state.internal.h"
#include "libc/intrin/fds.h"
#include "libc/runtime/runtime.h"
#include "libc/thread/thread.h"

void __fds_lock(void) {
Expand All @@ -26,3 +28,23 @@ void __fds_lock(void) {
void __fds_unlock(void) {
pthread_mutex_unlock(&__fds_lock_obj);
}

void __fd_lock(struct Fd *f) {
pthread_mutex_lock(&f->shared->lock);
}

void __fd_unlock(struct Fd *f) {
pthread_mutex_unlock(&f->shared->lock);
}

struct Cursor *__cursor_new(void) {
struct Cursor *c;
if ((c = _mapshared(sizeof(struct Cursor)))) {
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
pthread_mutex_init(&c->lock, &attr);
pthread_mutexattr_destroy(&attr);
}
return c;
}
File renamed without changes.
Loading

0 comments on commit 761c6ad

Please sign in to comment.