8000 WIP: Make operations on named pipes more consistent by fwerner · Pull Request #5 · fwerner/bufio · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

WIP: Make operations on named pipes more consistent #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 15 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8000
96 changes: 80 additions & 16 deletions src/bufio.c
Original file line number Diff line number Diff line change
Expand Up @@ -347,14 +347,43 @@ static void ignore_sigpipe(int socket __attribute__((__unused__)))


// Poll which automatically restarts on EINTR and EAGAIN
static inline int safe_poll(struct pollfd fds[], nfds_t nfds, int timeout)
static inline int safe_poll(struct pollfd fds[], nfds_t nfds, int timeout, bufio_stream_type stream_type)
{
// TODO: Automatically decrement timeout, use ppoll on Linux
int rc;
int i = 0;
int num_loops = 0;
#ifdef __MACH__
if (stream_type == BUFIO_FIFO && timeout != 0) {
// Work around macOS returning 0 for named pipes even though data arrives
// within the timeout. This bug exists at least up to macOS Sonoma 14.7. We
// could simply call poll with timeout = 0 after hitting the timeout. To
// reduce the stuttering we split the timeout into periods of at most 20 ms
// and repeat poll as many times as requested (indefinitely for timeout = -1).
const int max_poll_period_msec = 20;
if (timeout == -1) {
// Poll indefinitely in short steps
num_loops = -1;
timeout = max_poll_period_msec;
} else if (timeout > 0 && timeout < 2 * max_poll_period_msec) {
// Split interval into 2 polls
num_loops = 1;
if (timeout > 1)
timeout /= 2;
} else if (timeout >= 2 * max_poll_period_msec) {
// Loop as many times as needed in short steps
num_loops = timeout / max_poll_period_msec;
timeout = max_poll_period_msec;
}
}
#else
(void) stream_type;
#endif

do {
rc = poll(fds, nfds, timeout);
} while ((rc == -1) && (errno == EINTR || errno == EAGAIN));
debug_print("rc=%d, i=%d, nl=%d, timeout=%d, events=%d, revents=%d, error=%s", rc, i, num_loops, timeout, fds[0].events, fds[0].revents, rc == -1 ? strerror(errno): "none");
} while (((rc == -1) && (errno == EINTR || errno == EAGAIN)) || (rc == 0 && num_loops > 0 && i++ < num_loops));

return rc;
}
Expand Down Expand Up @@ -439,7 +468,7 @@ static int accept_socket(bufio_stream *stream, int timeout, const char* info)
poll_in.fd = stream->fd;
poll_in.events = POLLIN;

int rc = safe_poll(&poll_in, 1, timeout);
int rc = safe_poll(&poll_in, 1, timeout, stream->type);
if (rc == 0) {
logstring(info, "listen timeout");
return 0;
Expand All @@ -464,7 +493,7 @@ static int accept_socket(bufio_stream *stream, int timeout, const char* info)
ignore_sigpipe(stream->fd);

// Enable non-blocking I/O
fcntl(stream->fd, F_SETFL, (long) (O_RDWR | O_NONBLOCK));
fcntl(stream->fd, F_SETFL, O_RDWR | O_NONBLOCK);

loginetadr(info, "connection established", sa, client_address.sin_port);

Expand Down Expand Up @@ -504,7 +533,9 @@ which are always bidirectional. Also, standard streams (stdin, stdout) are
unidirectional. If required, files are created with rw-rw-r--.

timeout specifies the time to wait for a connection in milliseconds. Specify
-1 to block indefinitely.
-1 to block indefinitely. If the target is a named pipe (created with mkfifo)
and mode is "w", bufio_open waits this amount of time for a reader to attach
to the pipe.

bufsize specifies the buffer size in Byte. If 0 a default value will be used.

Expand Down Expand Up @@ -630,8 +661,10 @@ application code does not crash during writes to a broken pipe.
stream->type = BUFIO_PIPE; // TODO: Restructure code
if (stream->mode & O_WRONLY) {
stream->fd = STDOUT_FILENO; // Write-only
fcntl(stream->fd, F_SETFL, O_WRONLY | O_NONBLOCK);
} else if ((stream->mode & O_RDWR) == 0) {
stream->fd = STDIN_FILENO; // Read-only
fcntl(stream->fd, F_SETFL, O_NONBLOCK);
} else {
// Read/write
log2string(info, "invalid mode", opt, "for standard stream");
Expand All @@ -657,11 +690,27 @@ application code does not crash during writes to a broken pipe.
} else if (!stat_rc && S_ISFIFO(statbuf.st_mode)) {
// TODO: LOCKEDFIFO?
stream->type = BUFIO_FIFO;
if (timeout < 0)
timeout = -1;
else if (timeout > 0 && timeout < 50)
timeout = 50;
}

// Open file
mode_t file_flags = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH;
if ((stream->fd = open(name, stream->mode, file_flags)) == -1) {
while ((stream->fd = open(name, stream->mode, file_flags)) == -1) {
if (stream->type != BUFIO_FIFO || !(stream->mode & O_WRONLY) || (timeout >= 0 && timeout < 50))
break;

assert(stream->type == BUFIO_FIFO && errno == ENXIO);

// For a writer on namedpipe, wait for a reading process until the timeout is reached
usleep(50000);
if (timeout != -1)
timeout -= 50;
}

if (stream->fd == -1) {
log2string(info, "failed to open file with mode", opt, name);
goto free_and_out;
}
Expand All @@ -672,6 +721,9 @@ application code does not crash during writes to a broken pipe.
goto close_free_and_out;
}

if (stream->type == BUFIO_FIFO || stream->type == BUFIO_PIPE)
ignore_sigpipe(stream->fd);

return stream;
}

Expand Down Expand Up @@ -773,7 +825,7 @@ application code does not crash during writes to a broken pipe.
}

// Enable non-blocking I/O
fcntl(stream->fd, F_SETFL, (long) (O_RDWR | O_NONBLOCK));
fcntl(stream->fd, F_SETFL, O_RDWR | O_NONBLOCK);

if (bufio_set_buffer(stream, bufsize > 0 ? bufsize : BUFIO_BUFSIZE) != 0) {
logstring(info, "can not create buffer");
Expand Down Expand Up @@ -906,6 +958,7 @@ and the status code of the stream was set.
read_vec[0].iov_base = (char *) ptr + (size - remaining_bytes);
read_vec[0].iov_len = remaining_bytes;

errno = 0;
ssize_t nbytes = readv(stream->fd, read_vec, 2);
if (nbytes == -1) {
if (errno == EAGAIN || errno == EINTR)
Expand All @@ -918,8 +971,19 @@ and the status code of the stream was set.
return size - remaining_bytes;
}

if (nbytes == 0 && poll_in.revents & POLLIN) {
// Read returns 0 to indicate EOF for files and when no writer is attached
// to a named pipe ("fifo") - or an anonymous pipe (on macOS only!); see pipe(7)
if (nbytes == 0 &&
((poll_in.revents & POLLIN) || stream->type == BUFIO_FIFO)) {
#ifdef __MACH__
if (stream->type == BUFIO_PIPE) {
debug_print("pipe error with %zu remaining bytes (%zu bytes requested)", remaining_bytes, size);
stream->status = BUFIO_EPIPE;
return size - remaining_bytes;
}
#endif
// Reached end-of-file
debug_print("eof with %zu remaining bytes (%zu bytes requested)", remaining_bytes, size);
stream->status = BUFIO_EOF;
bufio_release_read_lock(stream);
return size - remaining_bytes;
Expand All @@ -938,7 +1002,7 @@ and the status code of the stream was set.
remaining_bytes -= nbytes;
}
} while (remaining_bytes > 0 &&
(poll_rc = safe_poll(&poll_in, 1, stream->io_timeout_ms)) == 1 &&
(poll_rc = safe_poll(&poll_in, 1, stream->io_timeout_ms, stream->type)) == 1 &&
poll_in.revents & POLLIN);

bufio_release_read_lock(stream);
Expand All @@ -951,7 +1015,7 @@ and the status code of the stream was set.
else if (poll_in.revents & POLLERR)
stream->status = -EIO; // EIO comes closest to "an exceptional condition"
else if (poll_rc == 0) {
debug_print("timeout with %zu remaining bytes (%zu bytes requested)", remaining_bytes, size);
debug_print("timeout with %zu remaining bytes (%zu bytes requested, timeout=%d)", remaining_bytes, size, stream->io_timeout_ms);
stream->status = BUFIO_TIMEDOUT;
}

Expand Down Expand Up @@ -1052,7 +1116,7 @@ error has occured and the status code of the stream was set.
stream->write_lock_offset += nbytes;
ptr = (char *) ptr + nbytes;
} while (remaining_bytes > 0 &&
(poll_rc = safe_poll(&poll_out, 1, stream->io_timeout_ms)) == 1 &&
(poll_rc = safe_poll(&poll_out, 1, stream->io_timeout_ms, stream->type)) == 1 &&
poll_out.revents == POLLOUT);

if (remaining_bytes == 0)
Expand Down Expand Up @@ -1120,7 +1184,7 @@ error has occured and the status code of the stream was set.
remaining_bytes -= nbytes;
}
} while (remaining_bytes > 0 &&
(poll_rc = safe_poll(&poll_out, 1, stream->io_timeout_ms)) == 1 &&
(poll_rc = safe_poll(&poll_out, 1, stream->io_timeout_ms, stream->type)) == 1 &&
poll_out.revents == POLLOUT);

if (remaining_bytes == 0)
Expand Down Expand Up @@ -1207,7 +1271,7 @@ of the stream was set.
output_buffer_head += nbytes;
stream->write_lock_offset += nbytes;
} while (output_buffer_head != stream->output_buffer_tail &&
(poll_rc = safe_poll(&poll_out, 1, stream->io_timeout_ms)) == 1 &&
(poll_rc = safe_poll(&poll_out, 1, stream->io_timeout_ms, stream->type)) == 1 &&
poll_out.revents == POLLOUT);

bufio_release_write_lock(stream);
Expand Down Expand Up @@ -1367,10 +1431,10 @@ input buffers. If the value of timeout is -1, the poll blocks indefinitely.
return -1; // Stream error
}

// When trying a non-blocking read on a TCP connection in CLOSE_WAIT state,
// When trying a non-blocking read on a TCP connection in CLOSE_WAIT state or on a pipe where the writer hung up
// - macOS yields 0 bytes and ETIMEDOUT, while
// - Linux yields 0 bytes and EAGAIN.
if (stream->type == BUFIO_SOCKET && nbytes == 0 && (read_errno == ETIMEDOUT || read_errno == EAGAIN)) {
if ((stream->type == BUFIO_PIPE || stream->type == BUFIO_SOCKET) && nbytes == 0 && (read_errno == ETIMEDOUT || read_errno == EAGAIN)) {
stream->status = BUFIO_EPIPE;
return -1; // Stream error
}
Expand All @@ -1393,7 +1457,7 @@ input buffers. If the value of timeout is -1, the poll blocks indefinitely.
poll_in.events = POLLIN;
poll_in.revents = 0;

int rc = safe_poll(&poll_in, 1, timeout);
int rc = safe_poll(&poll_in, 1, timeout, stream->type);
if (rc == 0) {
stream->status = BUFIO_TIMEDOUT;
return 0; // Timeout
Expand Down
76 changes: 76 additions & 0 deletions tests/bufio_test_namedpipe.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#ifdef __linux__
#define _DEFAULT_SOURCE
#define _BSD_SOURCE
#define _POSIX_C_SOURCE 200809L
#else
#undef _POSIX_C_SOURCE
#endif

#include <sys/stat.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/time.h>
#include <stdlib.h>
#include <unistd.h>

#include "bufio.h"
#include "test.h"


int main(void)
{
char buf[16];

unlink("test_bufio_namedpipe.fifo");
assert(mkfifo("test_bufio_namedpipe.fifo", S_IRUSR | S_IWUSR) == 0);

// Attempt to open a writer: this should fail after the timeout because no one is listening
struct timeval before, after;
assert(gettimeofday(&before, NULL) == 0);
bufio_stream *so = bufio_open("test_bufio_namedpipe.fifo", "w", 1000, 256, "bufio_test_namedpipe");
assert(so == NULL);
assert(gettimeofday(&after, NULL) == 0);
assert(after.tv_sec + 1e-6 * after.tv_usec - before.tv_sec - 1e-6 * before.tv_usec >= 1.0);

// Open a reader
bufio_stream *si = bufio_open("test_bufio_namedpipe.fifo", "r", 1000, 256, "bufio_test_namedpipe");
assert(si != NULL);

// Open a writer: this should be much quicker than before
assert(gettimeofday(&before, NULL) == 0);
so = bufio_open("test_bufio_namedpipe.fifo", "w", 1000, 256, "bufio_test_namedpipe");
assert(so != NULL);
assert(gettimeofday(&after, NULL) == 0);
assert(after.tv_sec + 1e-6 * after.tv_usec - before.tv_sec - 1e-6 * before.tv_usec < 1.0);

// Assert no initial data
assert(bufio_read(si, buf, 16) == 0 && bufio_status(si) == BUFIO_TIMEDOUT);
assert(bufio_wait(si, 0) == 0 && bufio_status(si) == BUFIO_TIMEDOUT);
assert(bufio_wait(si, 100) == 0 && bufio_status(si) == BUFIO_TIMEDOUT);

// Test read after write
assert(bufio_write(so, buf, 4) == 4 && bufio_flush(so) == 0);
assert(bufio_read(si, buf, 4) == 4);

// Clean up
assert(bufio_close(so) == 0);
assert(bufio_close(si) == 0);
assert(unlink("test_bufio_namedpipe.fifo") == 0);

// Test again with just a reader
assert(mkfifo("test_bufio_namedpipe.fifo", S_IRUSR | S_IWUSR) == 0);
si = bufio_open("test_bufio_namedpipe.fifo", "r", 1000, 256, "bufio_test_namedpipe");
assert(si != NULL);

// Assert no initial data and EOF (when there's no writer)
assert(bufio_read(si, buf, 16) == 0 && bufio_status(si) == BUFIO_EOF);
assert(bufio_wait(si, 0) == 0 && bufio_status(si) == BUFIO_EOF);
assert(gettimeofday(&before, NULL) == 0);
assert(bufio_wait(si, 1000) == 0 && bufio_status(si) == BUFIO_EOF);
assert(gettimeofday(&after, NULL) == 0);
assert(after.tv_sec + 1e-6 * after.tv_usec - before.tv_sec - 1e-6 * before.tv_usec >= 1.0);

assert(unlink("test_bufio_namedpipe.fifo") == 0);

return 0;
}
Loading
0