From 1a3c1e46b50d4d89aaec2061613bdf3d064c75e0 Mon Sep 17 00:00:00 2001 From: Vitaliy Orazov Date: Fri, 19 Aug 2022 17:10:22 +0300 Subject: [PATCH 1/9] BareSip. Add a state update action to the main loop to unblock polling if another thread has affected the state --- src/main/main.c | 60 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/src/main/main.c b/src/main/main.c index 3584186d1..0c9ec5de2 100644 --- a/src/main/main.c +++ b/src/main/main.c @@ -105,6 +105,7 @@ struct re { thrd_t tid; /**< Thread id */ RE_ATOMIC bool thread_enter; /**< Thread enter is called */ struct re_async *async; /**< Async object */ + re_sock_t fd_refresh[2]; /**< Refresh the thread state */ }; static struct re *re_global = NULL; @@ -112,6 +113,9 @@ static tss_t key; static once_flag flag = ONCE_FLAG_INIT; static void poll_close(struct re *re); +static int init_refreshing(struct re *re); +static void close_refreshing(struct re *re); +static void refresh(struct re *re); static void re_destructor(void *arg) @@ -1075,6 +1079,10 @@ int re_main(re_signal_h *signalh) err = poll_setup(re); if (err) goto out; + + err = init_refreshing(re); + if (err) + goto out; DEBUG_INFO("Using async I/O polling method: `%s'\n", poll_method_name(re->method)); @@ -1123,6 +1131,7 @@ int re_main(re_signal_h *signalh) out: re_atomic_rlx_set(&re->polling, false); + close_refreshing(re); return err; } @@ -1355,6 +1364,7 @@ void re_thread_leave(void) re_atomic_rlx_set(&re->thread_enter, false); re_unlock(re); + refresh(re); } @@ -1542,3 +1552,53 @@ int re_thread_async(re_async_work_h *work, re_async_h *cb, void *arg) return re_async(re->async, work, cb, arg); } + +static void refreshing_handler(int flags, void *arg) { + struct re *re = arg; + int msg = 0; + + if (!(flags & FD_READ)) + return; + + read(re->fd_refresh[0], &msg, sizeof(msg)); +} + +static int init_refreshing(struct re *re) +{ + int err; + re->fd_refresh[0] = re->fd_refresh[1] = BAD_SOCK; + if (pipe(re->fd_refresh) < 0) + return ERRNO_SOCK; + + err = net_sockopt_blocking_set(re->fd_refresh[0], false); + if (err) + goto out; + + err = net_sockopt_blocking_set(re->fd_refresh[1], false); + if (err) + goto out; + + err = fd_listen(re->fd_refresh[0], FD_READ, refreshing_handler, re); + if (err) + goto out; + return 0; +out: + (void)close(re->fd_refresh[0]); + (void)close(re->fd_refresh[1]); + return err; +} + +static void close_refreshing(struct re *re) +{ + fd_close(re->fd_refresh[0]); + (void)close(re->fd_refresh[0]); + (void)close(re->fd_refresh[1]); +} + +static void refresh(struct re *re) +{ + int msg = 0; + + if (re) + write(re->fd_refresh[1], &msg, sizeof(msg)); +} \ No newline at end of file From 8a58e184eb193202d7953770ee77895e1b3f54e9 Mon Sep 17 00:00:00 2001 From: Vitaliy Orazov Date: Fri, 19 Aug 2022 18:04:58 +0300 Subject: [PATCH 2/9] fix ccheck warnings --- src/main/main.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/main.c b/src/main/main.c index 0c9ec5de2..240dc13e5 100644 --- a/src/main/main.c +++ b/src/main/main.c @@ -105,7 +105,7 @@ struct re { thrd_t tid; /**< Thread id */ RE_ATOMIC bool thread_enter; /**< Thread enter is called */ struct re_async *async; /**< Async object */ - re_sock_t fd_refresh[2]; /**< Refresh the thread state */ + re_sock_t fd_refresh[2]; /**< Refresh the thread state */ }; static struct re *re_global = NULL; @@ -1079,7 +1079,7 @@ int re_main(re_signal_h *signalh) err = poll_setup(re); if (err) goto out; - + err = init_refreshing(re); if (err) goto out; @@ -1601,4 +1601,4 @@ static void refresh(struct re *re) if (re) write(re->fd_refresh[1], &msg, sizeof(msg)); -} \ No newline at end of file +} From b6eca508e0706c4121b0b49337fde48acb12464a Mon Sep 17 00:00:00 2001 From: Vitaliy Orazov Date: Fri, 19 Aug 2022 18:44:55 +0300 Subject: [PATCH 3/9] use mqueue instead system pipe --- src/main/main.c | 52 +++++++++++++------------------------------------ 1 file changed, 13 insertions(+), 39 deletions(-) diff --git a/src/main/main.c b/src/main/main.c index 240dc13e5..9f1ca260f 100644 --- a/src/main/main.c +++ b/src/main/main.c @@ -49,6 +49,7 @@ #include #include #include +#include #include "main.h" @@ -105,7 +106,7 @@ struct re { thrd_t tid; /**< Thread id */ RE_ATOMIC bool thread_enter; /**< Thread enter is called */ struct re_async *async; /**< Async object */ - re_sock_t fd_refresh[2]; /**< Refresh the thread state */ + struct mqueue *mq_refresh; /**< Refresh the thread state */ }; static struct re *re_global = NULL; @@ -1553,52 +1554,25 @@ int re_thread_async(re_async_work_h *work, re_async_h *cb, void *arg) return re_async(re->async, work, cb, arg); } -static void refreshing_handler(int flags, void *arg) { - struct re *re = arg; - int msg = 0; - - if (!(flags & FD_READ)) - return; - - read(re->fd_refresh[0], &msg, sizeof(msg)); +static void refreshing_handler(int id, void *data, void *arg) +{ + (void)id; + (void)data; + (void)arg; } -static int init_refreshing(struct re *re) +static int init_refreshing(struct re *re) { - int err; - re->fd_refresh[0] = re->fd_refresh[1] = BAD_SOCK; - if (pipe(re->fd_refresh) < 0) - return ERRNO_SOCK; - - err = net_sockopt_blocking_set(re->fd_refresh[0], false); - if (err) - goto out; - - err = net_sockopt_blocking_set(re->fd_refresh[1], false); - if (err) - goto out; - - err = fd_listen(re->fd_refresh[0], FD_READ, refreshing_handler, re); - if (err) - goto out; - return 0; -out: - (void)close(re->fd_refresh[0]); - (void)close(re->fd_refresh[1]); - return err; + return mqueue_alloc(&re->mq_refresh, refreshing_handler, 0); } -static void close_refreshing(struct re *re) +static void close_refreshing(struct re *re) { - fd_close(re->fd_refresh[0]); - (void)close(re->fd_refresh[0]); - (void)close(re->fd_refresh[1]); + mem_deref(re->mq_refresh); } -static void refresh(struct re *re) +static void refresh(struct re *re) { - int msg = 0; - if (re) - write(re->fd_refresh[1], &msg, sizeof(msg)); + mqueue_push(re->mq_refresh, 0, NULL); } From 71832267c1d9a0806e4d2428b247c69267b3030a Mon Sep 17 00:00:00 2001 From: Vitaliy Orazov Date: Fri, 19 Aug 2022 18:49:47 +0300 Subject: [PATCH 4/9] fix ccheck warnings --- src/main/main.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/main.c b/src/main/main.c index 9f1ca260f..f77f71e56 100644 --- a/src/main/main.c +++ b/src/main/main.c @@ -1554,24 +1554,24 @@ int re_thread_async(re_async_work_h *work, re_async_h *cb, void *arg) return re_async(re->async, work, cb, arg); } -static void refreshing_handler(int id, void *data, void *arg) +static void refreshing_handler(int id, void *data, void *arg) { (void)id; (void)data; (void)arg; } -static int init_refreshing(struct re *re) +static int init_refreshing(struct re *re) { return mqueue_alloc(&re->mq_refresh, refreshing_handler, 0); } -static void close_refreshing(struct re *re) +static void close_refreshing(struct re *re) { mem_deref(re->mq_refresh); } -static void refresh(struct re *re) +static void refresh(struct re *re) { if (re) mqueue_push(re->mq_refresh, 0, NULL); From 075b7c143ad52371f9bee12e5accc8bc7b8949d6 Mon Sep 17 00:00:00 2001 From: Vitaliy Orazov Date: Sat, 20 Aug 2022 19:16:02 +0300 Subject: [PATCH 5/9] Revert "fix ccheck warnings" This reverts commit 71832267c1d9a0806e4d2428b247c69267b3030a. --- src/main/main.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/main.c b/src/main/main.c index f77f71e56..9f1ca260f 100644 --- a/src/main/main.c +++ b/src/main/main.c @@ -1554,24 +1554,24 @@ int re_thread_async(re_async_work_h *work, re_async_h *cb, void *arg) return re_async(re->async, work, cb, arg); } -static void refreshing_handler(int id, void *data, void *arg) +static void refreshing_handler(int id, void *data, void *arg) { (void)id; (void)data; (void)arg; } -static int init_refreshing(struct re *re) +static int init_refreshing(struct re *re) { return mqueue_alloc(&re->mq_refresh, refreshing_handler, 0); } -static void close_refreshing(struct re *re) +static void close_refreshing(struct re *re) { mem_deref(re->mq_refresh); } -static void refresh(struct re *re) +static void refresh(struct re *re) { if (re) mqueue_push(re->mq_refresh, 0, NULL); From c1041b29b03af5a86a73f64a64415271db140814 Mon Sep 17 00:00:00 2001 From: Vitaliy Orazov Date: Sat, 20 Aug 2022 19:16:15 +0300 Subject: [PATCH 6/9] Revert "use mqueue instead system pipe" This reverts commit b6eca508e0706c4121b0b49337fde48acb12464a. --- src/main/main.c | 52 ++++++++++++++++++++++++++++++++++++------------- 1 file changed, 39 insertions(+), 13 deletions(-) diff --git a/src/main/main.c b/src/main/main.c index 9f1ca260f..240dc13e5 100644 --- a/src/main/main.c +++ b/src/main/main.c @@ -49,7 +49,6 @@ #include #include #include -#include #include "main.h" @@ -106,7 +105,7 @@ struct re { thrd_t tid; /**< Thread id */ RE_ATOMIC bool thread_enter; /**< Thread enter is called */ struct re_async *async; /**< Async object */ - struct mqueue *mq_refresh; /**< Refresh the thread state */ + re_sock_t fd_refresh[2]; /**< Refresh the thread state */ }; static struct re *re_global = NULL; @@ -1554,25 +1553,52 @@ int re_thread_async(re_async_work_h *work, re_async_h *cb, void *arg) return re_async(re->async, work, cb, arg); } -static void refreshing_handler(int id, void *data, void *arg) -{ - (void)id; - (void)data; - (void)arg; +static void refreshing_handler(int flags, void *arg) { + struct re *re = arg; + int msg = 0; + + if (!(flags & FD_READ)) + return; + + read(re->fd_refresh[0], &msg, sizeof(msg)); } -static int init_refreshing(struct re *re) +static int init_refreshing(struct re *re) { - return mqueue_alloc(&re->mq_refresh, refreshing_handler, 0); + int err; + re->fd_refresh[0] = re->fd_refresh[1] = BAD_SOCK; + if (pipe(re->fd_refresh) < 0) + return ERRNO_SOCK; + + err = net_sockopt_blocking_set(re->fd_refresh[0], false); + if (err) + goto out; + + err = net_sockopt_blocking_set(re->fd_refresh[1], false); + if (err) + goto out; + + err = fd_listen(re->fd_refresh[0], FD_READ, refreshing_handler, re); + if (err) + goto out; + return 0; +out: + (void)close(re->fd_refresh[0]); + (void)close(re->fd_refresh[1]); + return err; } -static void close_refreshing(struct re *re) +static void close_refreshing(struct re *re) { - mem_deref(re->mq_refresh); + fd_close(re->fd_refresh[0]); + (void)close(re->fd_refresh[0]); + (void)close(re->fd_refresh[1]); } -static void refresh(struct re *re) +static void refresh(struct re *re) { + int msg = 0; + if (re) - mqueue_push(re->mq_refresh, 0, NULL); + write(re->fd_refresh[1], &msg, sizeof(msg)); } From 2e3de9c628048e842653b2dff6019086f9611aa4 Mon Sep 17 00:00:00 2001 From: Vitaliy Orazov Date: Sat, 20 Aug 2022 19:16:29 +0300 Subject: [PATCH 7/9] Revert "fix ccheck warnings" This reverts commit 8a58e184eb193202d7953770ee77895e1b3f54e9. --- src/main/main.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/main.c b/src/main/main.c index 240dc13e5..0c9ec5de2 100644 --- a/src/main/main.c +++ b/src/main/main.c @@ -105,7 +105,7 @@ struct re { thrd_t tid; /**< Thread id */ RE_ATOMIC bool thread_enter; /**< Thread enter is called */ struct re_async *async; /**< Async object */ - re_sock_t fd_refresh[2]; /**< Refresh the thread state */ + re_sock_t fd_refresh[2]; /**< Refresh the thread state */ }; static struct re *re_global = NULL; @@ -1079,7 +1079,7 @@ int re_main(re_signal_h *signalh) err = poll_setup(re); if (err) goto out; - + err = init_refreshing(re); if (err) goto out; @@ -1601,4 +1601,4 @@ static void refresh(struct re *re) if (re) write(re->fd_refresh[1], &msg, sizeof(msg)); -} +} \ No newline at end of file From 107dcb8a1c84404934ddefcfc2ce79764a6f0fb7 Mon Sep 17 00:00:00 2001 From: Vitaliy Orazov Date: Sat, 20 Aug 2022 19:16:55 +0300 Subject: [PATCH 8/9] Revert "BareSip. Add a state update action to the main loop to unblock polling if another thread has affected the state" This reverts commit 1a3c1e46b50d4d89aaec2061613bdf3d064c75e0. --- src/main/main.c | 60 ------------------------------------------------- 1 file changed, 60 deletions(-) diff --git a/src/main/main.c b/src/main/main.c index 0c9ec5de2..3584186d1 100644 --- a/src/main/main.c +++ b/src/main/main.c @@ -105,7 +105,6 @@ struct re { thrd_t tid; /**< Thread id */ RE_ATOMIC bool thread_enter; /**< Thread enter is called */ struct re_async *async; /**< Async object */ - re_sock_t fd_refresh[2]; /**< Refresh the thread state */ }; static struct re *re_global = NULL; @@ -113,9 +112,6 @@ static tss_t key; static once_flag flag = ONCE_FLAG_INIT; static void poll_close(struct re *re); -static int init_refreshing(struct re *re); -static void close_refreshing(struct re *re); -static void refresh(struct re *re); static void re_destructor(void *arg) @@ -1079,10 +1075,6 @@ int re_main(re_signal_h *signalh) err = poll_setup(re); if (err) goto out; - - err = init_refreshing(re); - if (err) - goto out; DEBUG_INFO("Using async I/O polling method: `%s'\n", poll_method_name(re->method)); @@ -1131,7 +1123,6 @@ int re_main(re_signal_h *signalh) out: re_atomic_rlx_set(&re->polling, false); - close_refreshing(re); return err; } @@ -1364,7 +1355,6 @@ void re_thread_leave(void) re_atomic_rlx_set(&re->thread_enter, false); re_unlock(re); - refresh(re); } @@ -1552,53 +1542,3 @@ int re_thread_async(re_async_work_h *work, re_async_h *cb, void *arg) return re_async(re->async, work, cb, arg); } - -static void refreshing_handler(int flags, void *arg) { - struct re *re = arg; - int msg = 0; - - if (!(flags & FD_READ)) - return; - - read(re->fd_refresh[0], &msg, sizeof(msg)); -} - -static int init_refreshing(struct re *re) -{ - int err; - re->fd_refresh[0] = re->fd_refresh[1] = BAD_SOCK; - if (pipe(re->fd_refresh) < 0) - return ERRNO_SOCK; - - err = net_sockopt_blocking_set(re->fd_refresh[0], false); - if (err) - goto out; - - err = net_sockopt_blocking_set(re->fd_refresh[1], false); - if (err) - goto out; - - err = fd_listen(re->fd_refresh[0], FD_READ, refreshing_handler, re); - if (err) - goto out; - return 0; -out: - (void)close(re->fd_refresh[0]); - (void)close(re->fd_refresh[1]); - return err; -} - -static void close_refreshing(struct re *re) -{ - fd_close(re->fd_refresh[0]); - (void)close(re->fd_refresh[0]); - (void)close(re->fd_refresh[1]); -} - -static void refresh(struct re *re) -{ - int msg = 0; - - if (re) - write(re->fd_refresh[1], &msg, sizeof(msg)); -} \ No newline at end of file From 9429082ead88bcee8d20ecc415bf5b42e23547f6 Mon Sep 17 00:00:00 2001 From: Vitaliy Orazov Date: Sun, 21 Aug 2022 23:28:35 +0300 Subject: [PATCH 9/9] ensure timers are properly handled --- src/main/main.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/main.c b/src/main/main.c index 3584186d1..57b194b85 100644 --- a/src/main/main.c +++ b/src/main/main.c @@ -1352,7 +1352,9 @@ void re_thread_leave(void) DEBUG_WARNING("re_thread_leave: re not ready\n"); return; } - + /* Dummy async event, to ensure timers are properly handled */ + if (re->async) + re_thread_async(NULL, NULL, NULL); re_atomic_rlx_set(&re->thread_enter, false); re_unlock(re); }