From 327ac04dd530388d147aa2ea01cf33e4ee18a8f1 Mon Sep 17 00:00:00 2001 From: Felix Werner Date: Fri, 9 Aug 2024 17:10:08 +0200 Subject: [PATCH 01/15] Test timeout behaviour when opening a named pipe without an attached reader. --- tests/bufio_test_namedpipe.c | 57 ++++++++++++++++++++++++++++++++++++ tests/meson.build | 2 ++ 2 files changed, 59 insertions(+) create mode 100644 tests/bufio_test_namedpipe.c diff --git a/tests/bufio_test_namedpipe.c b/tests/bufio_test_namedpipe.c new file mode 100644 index 0000000..c1f6225 --- /dev/null +++ b/tests/bufio_test_namedpipe.c @@ -0,0 +1,57 @@ +#ifdef __linux__ +#define _DEFAULT_SOURCE +#define _BSD_SOURCE +#define _POSIX_C_SOURCE 200809L +#else +#undef _POSIX_C_SOURCE +#endif + +#include +#include +#include +#include +#include +#include + +#include "bufio.h" +#include "test.h" + + +int main(void) +{ + char buf[16]; + + unlink("test_bufio_namedpipe.fifo"); + assert(mkfifo("test_bufio_namedpipe.fifo", S_IRUSR | S_IWUSR) == 0); + + // Attempt to open a writer: this should fail after the timeout because no one is listening + struct timeval before, after; + assert(gettimeofday(&before, NULL) == 0); + bufio_stream *so = bufio_open("test_bufio_namedpipe.fifo", "w", 1000, 256, "bufio_test_namedpipe"); + assert(so == NULL); + assert(gettimeofday(&after, NULL) == 0); + assert(after.tv_sec + 1e-6 * after.tv_usec - before.tv_sec - 1e-6 * before.tv_usec >= 1.0); + + // Open a reader + bufio_stream *si = bufio_open("test_bufio_namedpipe.fifo", "r", 1000, 256, "bufio_test_namedpipe"); + assert(si != NULL); + + // Open a writer: this should be much quicker than before + assert(gettimeofday(&before, NULL) == 0); + so = bufio_open("test_bufio_namedpipe.fifo", "w", 1000, 256, "bufio_test_namedpipe"); + assert(so != NULL); + assert(gettimeofday(&after, NULL) == 0); + assert(after.tv_sec + 1e-6 * after.tv_usec - before.tv_sec - 1e-6 * before.tv_usec < 1.0); + + // Assert no initial data + assert(bufio_read(si, buf, 16) == 0 && bufio_status(si) == BUFIO_TIMEDOUT); + assert(bufio_wait(si, 0) == 0 && bufio_status(si) == BUFIO_TIMEDOUT); + assert(bufio_wait(si, 100) == 0 && bufio_status(si) == BUFIO_TIMEDOUT); + + // Clean up + assert(bufio_close(so) == 0); + assert(bufio_close(si) == 0); + assert(unlink("test_bufio_namedpipe.fifo") == 0); + + return 0; +} diff --git a/tests/meson.build b/tests/meson.build index 02fceac..527fb9f 100644 --- a/tests/meson.build +++ b/tests/meson.build @@ -4,6 +4,7 @@ bufio_test_tcp_connect = executable('bufio_test_tcp_connect', 'bufio_test_tcp_co bufio_test_delayed_tcp_connect = executable('bufio_test_delayed_tcp_connect', 'bufio_test_delayed_tcp_connect.c', include_directories : bufio_inc, link_with : bufio_lib) bufio_test_lockedfile = executable('bufio_test_lockedfile', 'bufio_test_lockedfile.c', include_directories : bufio_inc, link_with : bufio_lib) bufio_test_file = executable('bufio_test_file', 'bufio_test_file.c', include_directories : bufio_inc, link_with : bufio_lib) +bufio_test_namedpipe = executable('bufio_test_namedpipe', 'bufio_test_namedpipe.c', include_directories : bufio_inc, link_with : bufio_lib) bufio_test_wait_on_tcpclose_with_pending_data = executable('bufio_test_wait_on_tcpclose_with_pending_data', 'bufio_test_wait_on_tcpclose_with_pending_data.c', include_directories : bufio_inc, link_with : bufio_lib) bufio_test_wait_on_tcpclose = executable('bufio_test_wait_on_tcpclose', 'bufio_test_wait_on_tcpclose.c', include_directories : bufio_inc, link_with : bufio_lib) bufio_test_mem_interface = executable('bufio_test_mem_interface', 'bufio_test_mem_interface.c', include_directories : bufio_inc, link_with : bufio_lib) @@ -14,6 +15,7 @@ test('bufio_test_tcp_connect', bufio_test_tcp_connect, is_parallel : false, suit test('bufio_test_delayed_tcp_connect', bufio_test_delayed_tcp_connect, is_parallel : false, suite: ['all', 'default']) test('bufio_test_lockedfile', bufio_test_lockedfile, is_parallel : false, suite: ['all', 'default']) test('bufio_test_file', bufio_test_file, is_parallel : false, suite: ['all', 'default']) +test('bufio_test_namedpipe', bufio_test_namedpipe, is_parallel : false, suite: ['all', 'default']) test('bufio_test_mem_interface', bufio_test_mem_interface, is_parallel : false, suite: ['all', 'default']) test('bufio_test_wait_on_tcpclose', bufio_test_wait_on_tcpclose, is_parallel : false, timeout : 120, suite: ['all', 'extensive']) From 34a81d8902a15e8a9913acf5295ed0dd8f9cf2aa Mon Sep 17 00:00:00 2001 From: Felix Werner Date: Fri, 9 Aug 2024 17:18:34 +0200 Subject: [PATCH 02/15] When opening named pipe in write mode, wait until timeout is reached before failing. --- src/bufio.c | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/src/bufio.c b/src/bufio.c index 31a9112..afa074c 100644 --- a/src/bufio.c +++ b/src/bufio.c @@ -504,7 +504,9 @@ which are always bidirectional. Also, standard streams (stdin, stdout) are unidirectional. If required, files are created with rw-rw-r--. timeout specifies the time to wait for a connection in milliseconds. Specify --1 to block indefinitely. +-1 to block indefinitely. If the target is a named pipe (created with mkfifo) +and mode is "w", bufio_open waits this amount of time for a reader to attach +to the pipe. bufsize specifies the buffer size in Byte. If 0 a default value will be used. @@ -657,11 +659,27 @@ application code does not crash during writes to a broken pipe. } else if (!stat_rc && S_ISFIFO(statbuf.st_mode)) { // TODO: LOCKEDFIFO? stream->type = BUFIO_FIFO; + if (timeout < 0) + timeout = -1; + else if (timeout > 0 && timeout < 50) + timeout = 50; } // Open file mode_t file_flags = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH; - if ((stream->fd = open(name, stream->mode, file_flags)) == -1) { + while ((stream->fd = open(name, stream->mode, file_flags)) == -1) { + if (stream->type != BUFIO_FIFO || !(stream->mode & O_WRONLY) || (timeout >= 0 && timeout < 50)) + break; + + assert(stream->type == BUFIO_FIFO && errno == ENXIO); + + // For a writer on namedpipe, wait for a reading process until the timeout is reached + usleep(50000); + if (timeout != -1) + timeout -= 50; + } + + if (stream->fd == -1) { log2string(info, "failed to open file with mode", opt, name); goto free_and_out; } From 9eee26fa403f85bf70828561f45ed4ab6a21203a Mon Sep 17 00:00:00 2001 From: Felix Werner Date: Fri, 9 Aug 2024 18:02:17 +0200 Subject: [PATCH 03/15] Test stream status after bufio_read/wait on a named pipe when no writer is attached. --- tests/bufio_test_namedpipe.c | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/tests/bufio_test_namedpipe.c b/tests/bufio_test_namedpipe.c index c1f6225..554ea01 100644 --- a/tests/bufio_test_namedpipe.c +++ b/tests/bufio_test_namedpipe.c @@ -48,10 +48,24 @@ int main(void) assert(bufio_wait(si, 0) == 0 && bufio_status(si) == BUFIO_TIMEDOUT); assert(bufio_wait(si, 100) == 0 && bufio_status(si) == BUFIO_TIMEDOUT); + // Test read after write + assert(bufio_write(so, buf, 4) == 4 && bufio_flush(so) == 0); + assert(bufio_read(si, buf, 4) == 4); + // Clean up assert(bufio_close(so) == 0); assert(bufio_close(si) == 0); assert(unlink("test_bufio_namedpipe.fifo") == 0); + // Test again with just a reader + assert(mkfifo("test_bufio_namedpipe.fifo", S_IRUSR | S_IWUSR) == 0); + si = bufio_open("test_bufio_namedpipe.fifo", "r", 1000, 256, "bufio_test_namedpipe"); + assert(si != NULL); + + // Assert no initial data and timeout (not EOF, even when there's no writer) + assert(bufio_read(si, buf, 16) == 0 && bufio_status(si) == BUFIO_TIMEDOUT); + assert(bufio_wait(si, 0) == 0 && bufio_status(si) == BUFIO_TIMEDOUT); + assert(bufio_wait(si, 100) == 0 && bufio_status(si) == BUFIO_TIMEDOUT); + return 0; } From 271899e19babb6b4d0d8d65920149fd9c1a3a441 Mon Sep 17 00:00:00 2001 From: Felix Werner Date: Fri, 9 Aug 2024 18:15:16 +0200 Subject: [PATCH 04/15] Document wanted behaviour. --- tests/bufio_test_namedpipe.c | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tests/bufio_test_namedpipe.c b/tests/bufio_test_namedpipe.c index 554ea01..49a3fa9 100644 --- a/tests/bufio_test_namedpipe.c +++ b/tests/bufio_test_namedpipe.c @@ -63,9 +63,12 @@ int main(void) assert(si != NULL); // Assert no initial data and timeout (not EOF, even when there's no writer) - assert(bufio_read(si, buf, 16) == 0 && bufio_status(si) == BUFIO_TIMEDOUT); - assert(bufio_wait(si, 0) == 0 && bufio_status(si) == BUFIO_TIMEDOUT); - assert(bufio_wait(si, 100) == 0 && bufio_status(si) == BUFIO_TIMEDOUT); + assert(bufio_read(si, buf, 16) == 0 && bufio_status(si) == BUFIO_EOF); + assert(bufio_wait(si, 0) == 0 && bufio_status(si) == BUFIO_EOF); + assert(gettimeofday(&before, NULL) == 0); + assert(bufio_wait(si, 1000) == 0 && bufio_status(si) == BUFIO_EOF); + assert(gettimeofday(&after, NULL) == 0); + assert(after.tv_sec + 1e-6 * after.tv_usec - before.tv_sec - 1e-6 * before.tv_usec >= 1.0); return 0; } From 66533d99bb16f7948fb3e6de105576db741af6fe Mon Sep 17 00:00:00 2001 From: Felix Werner Date: Mon, 19 Aug 2024 17:52:20 +0200 Subject: [PATCH 05/15] Add test for EOF behaviour of pipes. --- tests/bufio_test_namedpipe.c | 2 +- tests/bufio_test_pipe.c | 82 ++++++++++++++++++++++++++++++++++++ tests/meson.build | 2 + 3 files changed, 85 insertions(+), 1 deletion(-) create mode 100644 tests/bufio_test_pipe.c diff --git a/tests/bufio_test_namedpipe.c b/tests/bufio_test_namedpipe.c index 49a3fa9..63825d9 100644 --- a/tests/bufio_test_namedpipe.c +++ b/tests/bufio_test_namedpipe.c @@ -62,7 +62,7 @@ int main(void) si = bufio_open("test_bufio_namedpipe.fifo", "r", 1000, 256, "bufio_test_namedpipe"); assert(si != NULL); - // Assert no initial data and timeout (not EOF, even when there's no writer) + // Assert no initial data and EOF (when there's no writer) assert(bufio_read(si, buf, 16) == 0 && bufio_status(si) == BUFIO_EOF); assert(bufio_wait(si, 0) == 0 && bufio_status(si) == BUFIO_EOF); assert(gettimeofday(&before, NULL) == 0); diff --git a/tests/bufio_test_pipe.c b/tests/bufio_test_pipe.c new file mode 100644 index 0000000..0b13424 --- /dev/null +++ b/tests/bufio_test_pipe.c @@ -0,0 +1,82 @@ +#ifdef __linux__ +#define _DEFAULT_SOURCE +#define _BSD_SOURCE +#define _POSIX_C_SOURCE 200809L +#else +#undef _POSIX_C_SOURCE +#endif + +#include +#include +#include +#include +#include +#include + +#include "bufio.h" +#include "test.h" + + +int main(void) +{ + char buf[16]; + + // Create pipe + int p[2]; + assert(pipe(p) == 0); + assert(close(STDIN_FILENO) == 0); + assert(close(STDOUT_FILENO) == 0); + + FORK_CHILD + // Redirect pipe to stdin + assert(close(p[1]) == 0); + assert(dup2(p[0], STDIN_FILENO) == STDIN_FILENO); + + // Open a reader on stdin + bufio_stream *si = bufio_open("-", "r", 100, 256, "bufio_test_pipe"); + assert(si != NULL); + + // Assert no initial data + assert(bufio_read(si, buf, 16) == 0 && bufio_status(si) == BUFIO_TIMEDOUT); + assert(bufio_wait(si, 0) == 0 && bufio_status(si) == BUFIO_TIMEDOUT); + assert(bufio_wait(si, 100) == 0 && bufio_status(si) == BUFIO_TIMEDOUT); + + sleep(1); + + // Test read after write + assert(bufio_read(si, buf, 4) == 4); + + sleep(2); + + // Assert EOF after writer hung up + struct timeval before, after; + assert(bufio_read(si, buf, 16) == 0 && bufio_status(si) == BUFIO_EOF); + assert(bufio_wait(si, 0) == 0 && bufio_status(si) == BUFIO_EOF); + assert(gettimeofday(&before, NULL) == 0); + assert(bufio_wait(si, 1000) == 0 && bufio_status(si) == BUFIO_EOF); + assert(gettimeofday(&after, NULL) == 0); + assert(after.tv_sec + 1e-6 * after.tv_usec - before.tv_sec - 1e-6 * before.tv_usec >= 1.0); + + assert(bufio_close(si) == 0); + + FORK_PARENT + // Redirect stdout to pipe + assert(close(p[0]) == 0); + assert(dup2(p[1], STDOUT_FILENO) == STDOUT_FILENO); + + // Open a writer to stdout + bufio_stream *so = bufio_open("-", "w", 100, 256, "bufio_test_pipe"); + assert(so != NULL); + + // Test read after write + sleep(1); + assert(bufio_write(so, buf, 4) == 4 && bufio_flush(so) == 0); + + // Close writer (and the duplicate fd) + sleep(1); + assert(bufio_close(so) == 0); + close(p[1]); + + FORK_JOIN + return 0; +} diff --git a/tests/meson.build b/tests/meson.build index 527fb9f..330184e 100644 --- a/tests/meson.build +++ b/tests/meson.build @@ -4,6 +4,7 @@ bufio_test_tcp_connect = executable('bufio_test_tcp_connect', 'bufio_test_tcp_co bufio_test_delayed_tcp_connect = executable('bufio_test_delayed_tcp_connect', 'bufio_test_delayed_tcp_connect.c', include_directories : bufio_inc, link_with : bufio_lib) bufio_test_lockedfile = executable('bufio_test_lockedfile', 'bufio_test_lockedfile.c', include_directories : bufio_inc, link_with : bufio_lib) bufio_test_file = executable('bufio_test_file', 'bufio_test_file.c', include_directories : bufio_inc, link_with : bufio_lib) +bufio_test_pipe = executable('bufio_test_pipe', 'bufio_test_pipe.c', include_directories : bufio_inc, link_with : bufio_lib) bufio_test_namedpipe = executable('bufio_test_namedpipe', 'bufio_test_namedpipe.c', include_directories : bufio_inc, link_with : bufio_lib) bufio_test_wait_on_tcpclose_with_pending_data = executable('bufio_test_wait_on_tcpclose_with_pending_data', 'bufio_test_wait_on_tcpclose_with_pending_data.c', include_directories : bufio_inc, link_with : bufio_lib) bufio_test_wait_on_tcpclose = executable('bufio_test_wait_on_tcpclose', 'bufio_test_wait_on_tcpclose.c', include_directories : bufio_inc, link_with : bufio_lib) @@ -15,6 +16,7 @@ test('bufio_test_tcp_connect', bufio_test_tcp_connect, is_parallel : false, suit test('bufio_test_delayed_tcp_connect', bufio_test_delayed_tcp_connect, is_parallel : false, suite: ['all', 'default']) test('bufio_test_lockedfile', bufio_test_lockedfile, is_parallel : false, suite: ['all', 'default']) test('bufio_test_file', bufio_test_file, is_parallel : false, suite: ['all', 'default']) +test('bufio_test_pipe', bufio_test_pipe, is_parallel : false, suite: ['all', 'default']) test('bufio_test_namedpipe', bufio_test_namedpipe, is_parallel : false, suite: ['all', 'default']) test('bufio_test_mem_interface', bufio_test_mem_interface, is_parallel : false, suite: ['all', 'default']) From 3d1571e39277f7ecafca4982def72d7722368cc9 Mon Sep 17 00:00:00 2001 From: Felix Werner Date: Mon, 19 Aug 2024 17:52:42 +0200 Subject: [PATCH 06/15] Fix EOF-behaviour of anonymous and named pipes. --- src/bufio.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/bufio.c b/src/bufio.c index afa074c..9f75505 100644 --- a/src/bufio.c +++ b/src/bufio.c @@ -632,8 +632,10 @@ application code does not crash during writes to a broken pipe. stream->type = BUFIO_PIPE; // TODO: Restructure code if (stream->mode & O_WRONLY) { stream->fd = STDOUT_FILENO; // Write-only + fcntl(stream->fd, F_SETFL, (long)(O_WRONLY | O_NONBLOCK)); } else if ((stream->mode & O_RDWR) == 0) { stream->fd = STDIN_FILENO; // Read-only + fcntl(stream->fd, F_SETFL, (long)(O_NONBLOCK)); } else { // Read/write log2string(info, "invalid mode", opt, "for standard stream"); @@ -936,7 +938,10 @@ and the status code of the stream was set. return size - remaining_bytes; } - if (nbytes == 0 && poll_in.revents & POLLIN) { + // Read returns 0 to indicate EOF for files and when no writer is attached + // to an anonymous or named pipe ("fifo"); see pipe(7) + if (nbytes == 0 && + ((poll_in.revents & POLLIN) || stream->type == BUFIO_FIFO)) { // Reached end-of-file stream->status = BUFIO_EOF; bufio_release_read_lock(stream); From a7a2fa937fe4efa08a52c2421c270c5316ea235c Mon Sep 17 00:00:00 2001 From: Felix Werner Date: Wed, 21 Aug 2024 14:24:16 +0200 Subject: [PATCH 07/15] Remove unneeded cast of fcntl arg. --- src/bufio.c | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/bufio.c b/src/bufio.c index 9f75505..cb00b20 100644 --- a/src/bufio.c +++ b/src/bufio.c @@ -464,7 +464,7 @@ static int accept_socket(bufio_stream *stream, int timeout, const char* info) ignore_sigpipe(stream->fd); // Enable non-blocking I/O - fcntl(stream->fd, F_SETFL, (long) (O_RDWR | O_NONBLOCK)); + fcntl(stream->fd, F_SETFL, O_RDWR | O_NONBLOCK); loginetadr(info, "connection established", sa, client_address.sin_port); @@ -632,10 +632,10 @@ application code does not crash during writes to a broken pipe. stream->type = BUFIO_PIPE; // TODO: Restructure code if (stream->mode & O_WRONLY) { stream->fd = STDOUT_FILENO; // Write-only - fcntl(stream->fd, F_SETFL, (long)(O_WRONLY | O_NONBLOCK)); + fcntl(stream->fd, F_SETFL, O_WRONLY | O_NONBLOCK); } else if ((stream->mode & O_RDWR) == 0) { stream->fd = STDIN_FILENO; // Read-only - fcntl(stream->fd, F_SETFL, (long)(O_NONBLOCK)); + fcntl(stream->fd, F_SETFL, O_NONBLOCK); } else { // Read/write log2string(info, "invalid mode", opt, "for standard stream"); @@ -793,7 +793,7 @@ application code does not crash during writes to a broken pipe. } // Enable non-blocking I/O - fcntl(stream->fd, F_SETFL, (long) (O_RDWR | O_NONBLOCK)); + fcntl(stream->fd, F_SETFL, O_RDWR | O_NONBLOCK); if (bufio_set_buffer(stream, bufsize > 0 ? bufsize : BUFIO_BUFSIZE) != 0) { logstring(info, "can not create buffer"); @@ -943,6 +943,7 @@ and the status code of the stream was set. if (nbytes == 0 && ((poll_in.revents & POLLIN) || stream->type == BUFIO_FIFO)) { // Reached end-of-file + debug_print("eof with %zu remaining bytes (%zu bytes requested)", remaining_bytes, size); stream->status = BUFIO_EOF; bufio_release_read_lock(stream); return size - remaining_bytes; From 857365045817cbd131c6e994366124c93dcc5d50 Mon Sep 17 00:00:00 2001 From: Felix Werner Date: Wed, 21 Aug 2024 14:24:37 +0200 Subject: [PATCH 08/15] Clean up fifo on success. --- tests/bufio_test_namedpipe.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/bufio_test_namedpipe.c b/tests/bufio_test_namedpipe.c index 63825d9..0a5465e 100644 --- a/tests/bufio_test_namedpipe.c +++ b/tests/bufio_test_namedpipe.c @@ -70,5 +70,7 @@ int main(void) assert(gettimeofday(&after, NULL) == 0); assert(after.tv_sec + 1e-6 * after.tv_usec - before.tv_sec - 1e-6 * before.tv_usec >= 1.0); + assert(unlink("test_bufio_namedpipe.fifo") == 0); + return 0; } From 94bce1f7048dc0f5b27677be32111185c3f38c8a Mon Sep 17 00:00:00 2001 From: Felix Werner Date: Wed, 21 Aug 2024 14:24:57 +0200 Subject: [PATCH 09/15] Add asynchronous test for named pipes. --- tests/bufio_test_namedpipe_async.c | 61 ++++++++++++++++++++++++++++++ tests/meson.build | 2 + 2 files changed, 63 insertions(+) create mode 100644 tests/bufio_test_namedpipe_async.c diff --git a/tests/bufio_test_namedpipe_async.c b/tests/bufio_test_namedpipe_async.c new file mode 100644 index 0000000..8eb1edc --- /dev/null +++ b/tests/bufio_test_namedpipe_async.c @@ -0,0 +1,61 @@ +#ifdef __linux__ +#define _DEFAULT_SOURCE +#define _BSD_SOURCE +#define _POSIX_C_SOURCE 200809L +#else +#undef _POSIX_C_SOURCE +#endif + +#include +#include +#include +#include +#include +#include + +#include "bufio.h" +#include "test.h" + + +int main(void) +{ + char buf[16]; + + unlink("test_bufio_namedpipe.fifo"); + assert(mkfifo("test_bufio_namedpipe.fifo", S_IRUSR | S_IWUSR) == 0); + + FORK_CHILD + // Open a reader + bufio_stream *si = bufio_open("test_bufio_namedpipe.fifo", "r", 1000, 256, "bufio_test_namedpipe"); + assert(si != NULL); + + // Wait until writer is connected + bufio_timeout(si, 1000); + size_t nread = 0; + while ((nread = bufio_read(si, buf, 16)) == 0) + assert(bufio_status(si) == BUFIO_EOF); + + assert(nread == 16); + + // Clean up + assert(bufio_close(si) == 0); + + FORK_PARENT + // Open a writer - this waits until a reader is present + bufio_stream *so = bufio_open("test_bufio_namedpipe.fifo", "w", 2000, 256, "bufio_test_namedpipe"); + assert(so != NULL); + + // Delay a bit such that bufio_read enters poll() + usleep(100000); + + // Write something + assert(bufio_write(so, buf, 16) == 16 && bufio_flush(so) == 0); + + // Clean up + assert(bufio_close(so) == 0); + + FORK_JOIN + + assert(unlink("test_bufio_namedpipe.fifo") == 0); + return 0; +} diff --git a/tests/meson.build b/tests/meson.build index 330184e..7c49f98 100644 --- a/tests/meson.build +++ b/tests/meson.build @@ -6,6 +6,7 @@ bufio_test_lockedfile = executable('bufio_test_lockedfile', 'bufio_test_lockedfi bufio_test_file = executable('bufio_test_file', 'bufio_test_file.c', include_directories : bufio_inc, link_with : bufio_lib) bufio_test_pipe = executable('bufio_test_pipe', 'bufio_test_pipe.c', include_directories : bufio_inc, link_with : bufio_lib) bufio_test_namedpipe = executable('bufio_test_namedpipe', 'bufio_test_namedpipe.c', include_directories : bufio_inc, link_with : bufio_lib) +bufio_test_namedpipe_async = executable('bufio_test_namedpipe_async', 'bufio_test_namedpipe_async.c', include_directories : bufio_inc, link_with : bufio_lib) bufio_test_wait_on_tcpclose_with_pending_data = executable('bufio_test_wait_on_tcpclose_with_pending_data', 'bufio_test_wait_on_tcpclose_with_pending_data.c', include_directories : bufio_inc, link_with : bufio_lib) bufio_test_wait_on_tcpclose = executable('bufio_test_wait_on_tcpclose', 'bufio_test_wait_on_tcpclose.c', include_directories : bufio_inc, link_with : bufio_lib) bufio_test_mem_interface = executable('bufio_test_mem_interface', 'bufio_test_mem_interface.c', include_directories : bufio_inc, link_with : bufio_lib) @@ -18,6 +19,7 @@ test('bufio_test_lockedfile', bufio_test_lockedfile, is_parallel : false, suite: test('bufio_test_file', bufio_test_file, is_parallel : false, suite: ['all', 'default']) test('bufio_test_pipe', bufio_test_pipe, is_parallel : false, suite: ['all', 'default']) test('bufio_test_namedpipe', bufio_test_namedpipe, is_parallel : false, suite: ['all', 'default']) +test('bufio_test_namedpipe_async', bufio_test_namedpipe_async, is_parallel : false, suite: ['all', 'default']) test('bufio_test_mem_interface', bufio_test_mem_interface, is_parallel : false, suite: ['all', 'default']) test('bufio_test_wait_on_tcpclose', bufio_test_wait_on_tcpclose, is_parallel : false, timeout : 120, suite: ['all', 'extensive']) From 203067451c67e2b8e59e83b77a67edd37f92d8de Mon Sep 17 00:00:00 2001 From: Felix Werner Date: Wed, 21 Aug 2024 14:45:44 +0200 Subject: [PATCH 10/15] Fix expectations in bufio_test_pipe: an anonymous pipe cannot expect new data after the writer hung up! Fix macOS return codes for that case. --- src/bufio.c | 15 ++++++++++++++- tests/bufio_test_pipe.c | 8 ++++---- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/src/bufio.c b/src/bufio.c index cb00b20..96d323a 100644 --- a/src/bufio.c +++ b/src/bufio.c @@ -939,9 +939,16 @@ and the status code of the stream was set. } // Read returns 0 to indicate EOF for files and when no writer is attached - // to an anonymous or named pipe ("fifo"); see pipe(7) + // to a named pipe ("fifo") - or an anonymous pipe (on macOS only!); see pipe(7) if (nbytes == 0 && ((poll_in.revents & POLLIN) || stream->type == BUFIO_FIFO)) { +#ifdef __MACH__ + if (stream->type == BUFIO_PIPE) { + debug_print("pipe error with %zu remaining bytes (%zu bytes requested)", remaining_bytes, size); + stream->status = BUFIO_EPIPE; + return size - remaining_bytes; + } +#endif // Reached end-of-file debug_print("eof with %zu remaining bytes (%zu bytes requested)", remaining_bytes, size); stream->status = BUFIO_EOF; @@ -1391,6 +1398,12 @@ input buffers. If the value of timeout is -1, the poll blocks indefinitely. return -1; // Stream error } + // When trying a non-blocking read on a pipe where the writer hung up, macOS yields 0 bytes and ETIMEDOUT + if (stream->type == BUFIO_PIPE && nbytes == 0 && read_errno == ETIMEDOUT) { + stream->status = BUFIO_EPIPE; + return -1; // Stream error + } + // When trying a non-blocking read on a TCP connection in CLOSE_WAIT state, // - macOS yields 0 bytes and ETIMEDOUT, while // - Linux yields 0 bytes and EAGAIN. diff --git a/tests/bufio_test_pipe.c b/tests/bufio_test_pipe.c index 0b13424..ac67653 100644 --- a/tests/bufio_test_pipe.c +++ b/tests/bufio_test_pipe.c @@ -50,12 +50,12 @@ int main(void) // Assert EOF after writer hung up struct timeval before, after; - assert(bufio_read(si, buf, 16) == 0 && bufio_status(si) == BUFIO_EOF); - assert(bufio_wait(si, 0) == 0 && bufio_status(si) == BUFIO_EOF); + assert(bufio_read(si, buf, 16) == 0 && bufio_status(si) == BUFIO_EPIPE); + assert(bufio_wait(si, 0) == -1 && bufio_status(si) == BUFIO_EPIPE); assert(gettimeofday(&before, NULL) == 0); - assert(bufio_wait(si, 1000) == 0 && bufio_status(si) == BUFIO_EOF); + assert(bufio_wait(si, 1000) == -1 && bufio_status(si) == BUFIO_EPIPE); assert(gettimeofday(&after, NULL) == 0); - assert(after.tv_sec + 1e-6 * after.tv_usec - before.tv_sec - 1e-6 * before.tv_usec >= 1.0); + assert(after.tv_sec + 1e-6 * after.tv_usec - before.tv_sec - 1e-6 * before.tv_usec < 1.0); assert(bufio_close(si) == 0); From 1f0347ce721cd98b4233624dc1583e35ddab5dcf Mon Sep 17 00:00:00 2001 From: Felix Werner Date: Wed, 21 Aug 2024 14:49:25 +0200 Subject: [PATCH 11/15] Aha - there's a pattern! macOS and Linux behave the same (different) ways for TCP connections in CLOSE_WAIT and pipes where the writer hung up. Neat. --- src/bufio.c | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/src/bufio.c b/src/bufio.c index 96d323a..1e4dc62 100644 --- a/src/bufio.c +++ b/src/bufio.c @@ -1398,16 +1398,10 @@ input buffers. If the value of timeout is -1, the poll blocks indefinitely. return -1; // Stream error } - // When trying a non-blocking read on a pipe where the writer hung up, macOS yields 0 bytes and ETIMEDOUT - if (stream->type == BUFIO_PIPE && nbytes == 0 && read_errno == ETIMEDOUT) { - stream->status = BUFIO_EPIPE; - return -1; // Stream error - } - - // When trying a non-blocking read on a TCP connection in CLOSE_WAIT state, + // When trying a non-blocking read on a TCP connection in CLOSE_WAIT state or on a pipe where the writer hung up // - macOS yields 0 bytes and ETIMEDOUT, while // - Linux yields 0 bytes and EAGAIN. - if (stream->type == BUFIO_SOCKET && nbytes == 0 && (read_errno == ETIMEDOUT || read_errno == EAGAIN)) { + if ((stream->type == BUFIO_PIPE || stream->type == BUFIO_SOCKET) && nbytes == 0 && (read_errno == ETIMEDOUT || read_errno == EAGAIN)) { stream->status = BUFIO_EPIPE; return -1; // Stream error } From 7baf1ac47a7fd98fd15f76980f53a47954310341 Mon Sep 17 00:00:00 2001 From: Felix Werner Date: Fri, 11 Oct 2024 09:28:05 +0200 Subject: [PATCH 12/15] Add another test for named pipes which reveals a bug in poll on macOS. --- tests/bufio_test_namedpipe_async.c | 32 ++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/tests/bufio_test_namedpipe_async.c b/tests/bufio_test_namedpipe_async.c index 8eb1edc..58b2add 100644 --- a/tests/bufio_test_namedpipe_async.c +++ b/tests/bufio_test_namedpipe_async.c @@ -56,6 +56,38 @@ int main(void) FORK_JOIN + // Test poll - which requires a workaround on macOS + FORK_CHILD + // Open a reader + bufio_stream *si = bufio_open("test_bufio_namedpipe.fifo", "r", 1000, 256, "bufio_test_namedpipe"); + assert(si != NULL); + + // Attempt to read immediately - which will enter poll() + assert(bufio_read(si, buf, 16) == 16); + + // Clean up + assert(bufio_close(si) == 0); + + FORK_PARENT + // Open a writer - this waits until a reader is present + bufio_stream *so = bufio_open("test_bufio_namedpipe.fifo", "w", 2000, 256, "bufio_test_namedpipe"); + assert(so != NULL); + + // Delay a bit such that bufio_read enters poll() + usleep(100000); + + // Write something + assert(bufio_write(so, buf, 16) == 16 && bufio_flush(so) == 0); + + // Keep open until the reader would time out (if poll were broken) + usleep(1500000); + + // Clean up + assert(bufio_close(so) == 0); + + FORK_JOIN + + assert(unlink("test_bufio_namedpipe.fifo") == 0); return 0; } From 4fe0b1637e6dba52c9a8811d5591513f7857b1cb Mon Sep 17 00:00:00 2001 From: Felix Werner Date: Fri, 11 Oct 2024 12:15:58 +0200 Subject: [PATCH 13/15] Fix testing the wrong thing. Does not reproduce the macOS bug seen when reading a stream from zstd -d. Mmmh... --- tests/bufio_test_namedpipe_async.c | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/tests/bufio_test_namedpipe_async.c b/tests/bufio_test_namedpipe_async.c index 58b2add..0565258 100644 --- a/tests/bufio_test_namedpipe_async.c +++ b/tests/bufio_test_namedpipe_async.c @@ -19,14 +19,16 @@ int main(void) { + const char fname[] = "test_bufio_namedpipe_async.fifo"; + const char testname[] = "bufio_test_namedpipe_async"; char buf[16]; - unlink("test_bufio_namedpipe.fifo"); - assert(mkfifo("test_bufio_namedpipe.fifo", S_IRUSR | S_IWUSR) == 0); + unlink(fname); + assert(mkfifo(fname, S_IRUSR | S_IWUSR) == 0); FORK_CHILD // Open a reader - bufio_stream *si = bufio_open("test_bufio_namedpipe.fifo", "r", 1000, 256, "bufio_test_namedpipe"); + bufio_stream *si = bufio_open(fname, "r", 1000, 256, testname); assert(si != NULL); // Wait until writer is connected @@ -42,7 +44,7 @@ int main(void) FORK_PARENT // Open a writer - this waits until a reader is present - bufio_stream *so = bufio_open("test_bufio_namedpipe.fifo", "w", 2000, 256, "bufio_test_namedpipe"); + bufio_stream *so = bufio_open(fname, "w", 2000, 256, testname); assert(so != NULL); // Delay a bit such that bufio_read enters poll() @@ -56,13 +58,20 @@ int main(void) FORK_JOIN + assert(unlink(fname) == 0); + assert(mkfifo(fname, S_IRUSR | S_IWUSR) == 0); + // Test poll - which requires a workaround on macOS FORK_CHILD // Open a reader - bufio_stream *si = bufio_open("test_bufio_namedpipe.fifo", "r", 1000, 256, "bufio_test_namedpipe"); + bufio_stream *si = bufio_open(fname, "r", 1000, 256, testname); assert(si != NULL); + // Wait until writer is present + usleep(100000); + // Attempt to read immediately - which will enter poll() + bufio_timeout(si, 1000); assert(bufio_read(si, buf, 16) == 16); // Clean up @@ -70,11 +79,11 @@ int main(void) FORK_PARENT // Open a writer - this waits until a reader is present - bufio_stream *so = bufio_open("test_bufio_namedpipe.fifo", "w", 2000, 256, "bufio_test_namedpipe"); + bufio_stream *so = bufio_open(fname, "w", 2000, 256, testname); assert(so != NULL); // Delay a bit such that bufio_read enters poll() - usleep(100000); + usleep(250000); // Write something assert(bufio_write(so, buf, 16) == 16 && bufio_flush(so) == 0); @@ -87,7 +96,6 @@ int main(void) FORK_JOIN - - assert(unlink("test_bufio_namedpipe.fifo") == 0); + assert(unlink(fname) == 0); return 0; } From c1d2a86cecec7c2ad5bc302d95f4ee56da7c66a4 Mon Sep 17 00:00:00 2001 From: Felix Werner Date: Fri, 11 Oct 2024 12:17:36 +0200 Subject: [PATCH 14/15] Retry polling on named pipes on macOS to work around a bug. --- src/bufio.c | 48 +++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 39 insertions(+), 9 deletions(-) diff --git a/src/bufio.c b/src/bufio.c index 1e4dc62..6544086 100644 --- a/src/bufio.c +++ b/src/bufio.c @@ -347,14 +347,43 @@ static void ignore_sigpipe(int socket __attribute__((__unused__))) // Poll which automatically restarts on EINTR and EAGAIN -static inline int safe_poll(struct pollfd fds[], nfds_t nfds, int timeout) +static inline int safe_poll(struct pollfd fds[], nfds_t nfds, int timeout, bufio_stream_type stream_type) { // TODO: Automatically decrement timeout, use ppoll on Linux int rc; + int i = 0; + int num_loops = 0; +#ifdef __MACH__ + if (stream_type == BUFIO_FIFO && timeout != 0) { + // Work around macOS returning 0 for named pipes even though data arrives + // within the timeout. This bug exists at least up to macOS Sonoma 14.7. We + // could simply call poll with timeout = 0 after hitting the timeout. To + // reduce the stuttering we split the timeout into periods of at most 20 ms + // and repeat poll as many times as requested (indefinitely for timeout = -1). + const int max_poll_period_msec = 20; + if (timeout == -1) { + // Poll indefinitely in short steps + num_loops = -1; + timeout = max_poll_period_msec; + } else if (timeout > 0 && timeout < 2 * max_poll_period_msec) { + // Split interval into 2 polls + num_loops = 1; + if (timeout > 1) + timeout /= 2; + } else if (timeout >= 2 * max_poll_period_msec) { + // Loop as many times as needed in short steps + num_loops = timeout / max_poll_period_msec; + timeout = max_poll_period_msec; + } + } +#else + (void) stream_type; +#endif do { rc = poll(fds, nfds, timeout); - } while ((rc == -1) && (errno == EINTR || errno == EAGAIN)); + debug_print("rc=%d, i=%d, nl=%d, timeout=%d, events=%d, revents=%d, error=%s", rc, i, num_loops, timeout, fds[0].events, fds[0].revents, rc == -1 ? strerror(errno): "none"); + } while (((rc == -1) && (errno == EINTR || errno == EAGAIN)) || (rc == 0 && num_loops > 0 && i++ < num_loops)); return rc; } @@ -439,7 +468,7 @@ static int accept_socket(bufio_stream *stream, int timeout, const char* info) poll_in.fd = stream->fd; poll_in.events = POLLIN; - int rc = safe_poll(&poll_in, 1, timeout); + int rc = safe_poll(&poll_in, 1, timeout, stream->type); if (rc == 0) { logstring(info, "listen timeout"); return 0; @@ -926,6 +955,7 @@ and the status code of the stream was set. read_vec[0].iov_base = (char *) ptr + (size - remaining_bytes); read_vec[0].iov_len = remaining_bytes; + errno = 0; ssize_t nbytes = readv(stream->fd, read_vec, 2); if (nbytes == -1) { if (errno == EAGAIN || errno == EINTR) @@ -969,7 +999,7 @@ and the status code of the stream was set. remaining_bytes -= nbytes; } } while (remaining_bytes > 0 && - (poll_rc = safe_poll(&poll_in, 1, stream->io_timeout_ms)) == 1 && + (poll_rc = safe_poll(&poll_in, 1, stream->io_timeout_ms, stream->type)) == 1 && poll_in.revents & POLLIN); bufio_release_read_lock(stream); @@ -982,7 +1012,7 @@ and the status code of the stream was set. else if (poll_in.revents & POLLERR) stream->status = -EIO; // EIO comes closest to "an exceptional condition" else if (poll_rc == 0) { - debug_print("timeout with %zu remaining bytes (%zu bytes requested)", remaining_bytes, size); + debug_print("timeout with %zu remaining bytes (%zu bytes requested, timeout=%d)", remaining_bytes, size, stream->io_timeout_ms); stream->status = BUFIO_TIMEDOUT; } @@ -1083,7 +1113,7 @@ error has occured and the status code of the stream was set. stream->write_lock_offset += nbytes; ptr = (char *) ptr + nbytes; } while (remaining_bytes > 0 && - (poll_rc = safe_poll(&poll_out, 1, stream->io_timeout_ms)) == 1 && + (poll_rc = safe_poll(&poll_out, 1, stream->io_timeout_ms, stream->type)) == 1 && poll_out.revents == POLLOUT); if (remaining_bytes == 0) @@ -1151,7 +1181,7 @@ error has occured and the status code of the stream was set. remaining_bytes -= nbytes; } } while (remaining_bytes > 0 && - (poll_rc = safe_poll(&poll_out, 1, stream->io_timeout_ms)) == 1 && + (poll_rc = safe_poll(&poll_out, 1, stream->io_timeout_ms, stream->type)) == 1 && poll_out.revents == POLLOUT); if (remaining_bytes == 0) @@ -1238,7 +1268,7 @@ of the stream was set. output_buffer_head += nbytes; stream->write_lock_offset += nbytes; } while (output_buffer_head != stream->output_buffer_tail && - (poll_rc = safe_poll(&poll_out, 1, stream->io_timeout_ms)) == 1 && + (poll_rc = safe_poll(&poll_out, 1, stream->io_timeout_ms, stream->type)) == 1 && poll_out.revents == POLLOUT); bufio_release_write_lock(stream); @@ -1424,7 +1454,7 @@ input buffers. If the value of timeout is -1, the poll blocks indefinitely. poll_in.events = POLLIN; poll_in.revents = 0; - int rc = safe_poll(&poll_in, 1, timeout); + int rc = safe_poll(&poll_in, 1, timeout, stream->type); if (rc == 0) { stream->status = BUFIO_TIMEDOUT; return 0; // Timeout From 314ea503949916317f939c4a34cc32e01b684991 Mon Sep 17 00:00:00 2001 From: Felix Werner Date: Fri, 11 Oct 2024 12:17:52 +0200 Subject: [PATCH 15/15] Ignore sigpipe for pipes and named pipes. --- src/bufio.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/bufio.c b/src/bufio.c index 6544086..bf3630d 100644 --- a/src/bufio.c +++ b/src/bufio.c @@ -721,6 +721,9 @@ application code does not crash during writes to a broken pipe. goto close_free_and_out; } + if (stream->type == BUFIO_FIFO || stream->type == BUFIO_PIPE) + ignore_sigpipe(stream->fd); + return stream; }