8000 Stability: Support calling redisAsyncCommand and redisAsyncDisconnect from the onConnected callback by kristjanvalur · Pull Request #931 · redis/hiredis · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Stability: Support calling redisAsyncCommand and redisAsyncDisconnect from the onConnected callback #931

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 54 additions & 10 deletions async.c
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,34 @@ static void __redisRunPushCallback(redisAsyncContext *ac, redisReply *reply) {
}
}

static void __redisRunConnectCallback(redisAsyncContext *ac, int status)
{
if (ac->onConnect) {
if (!(ac->c.flags & REDIS_IN_CALLBACK)) {
ac->c.flags |= REDIS_IN_CALLBACK;
ac->onConnect(ac, status);
ac->c.flags &= ~REDIS_IN_CALLBACK;
} else {
/* already in callback */
ac->onConnect(ac, status);
}
}
}

static void __redisRunDisconnectCallback(redisAsyncContext *ac, int status)
{
if (ac->onDisconnect) {
if (!(ac->c.flags & REDIS_IN_CALLBACK)) {
ac->c.flags |= REDIS_IN_CALLBACK;
ac->onDisconnect(ac, status);
ac->c.flags &= ~REDIS_IN_CALLBACK;
} else {
/* already in callback */
ac->onDisconnect(ac, status);
}
}
}

/* Helper function to free the context. */
static void __redisAsyncFree(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
Expand Down Expand Up @@ -338,12 +366,11 @@ static void __redisAsyncFree(redisAsyncContext *ac) {

/* Execute disconnect callback. When redisAsyncFree() initiated destroying
* this context, the status will always be REDIS_OK. */
if (ac->onDisconnect && (c->flags & REDIS_CONNECTED)) {
if (c->flags & REDIS_FREEING) {
ac->onDisconnect(ac,REDIS_OK);
} else {
ac->onDisconnect(ac,(ac->err == 0) ? REDIS_OK : REDIS_ERR);
}
if (c->flags & REDIS_CONNECTED) {
int status = ac->err == 0 ? REDIS_OK : REDIS_ERR;
if (c->flags & REDIS_FREEING)
status = REDIS_OK;
__redisRunDisconnectCallback(ac, status);
}

if (ac->dataCleanup) {
Expand Down Expand Up @@ -603,7 +630,7 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
}

static void __redisAsyncHandleConnectFailure(redisAsyncContext *ac) {
if (ac->onConnect) ac->onConnect(ac, REDIS_ERR);
__redisRunConnectCallback(ac, REDIS_ERR);
__redisAsyncDisconnect(ac);
}

Expand All @@ -628,8 +655,19 @@ static int __redisAsyncHandleConnect(redisAsyncContext *ac) {
return REDIS_ERR;
}

if (ac->onConnect) ac->onConnect(ac, REDIS_OK);
/* flag us as fully connect, but allow the callback
* to disconnect. For that reason, permit the function
* to delete the context here after callback return.
*/
c->flags |= REDIS_CONNECTED;
__redisRunConnectCallback(ac, REDIS_OK);
if ((ac->c.flags & REDIS_DISCONNECTING)) {
redisAsyncDisconnect(ac);
return REDIS_ERR;
} else if ((ac->c.flags & REDIS_FREEING)) {
redisAsyncFree(ac);
return REDIS_ERR;
}
return REDIS_OK;
} else {
return REDIS_OK;
Expand All @@ -653,6 +691,8 @@ void redisAsyncRead(redisAsyncContext *ac) {
*/
void redisAsyncHandleRead(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
/* must not be called from a callback */
assert(!(c->flags & REDIS_IN_CALLBACK));

if (!(c->flags & REDIS_CONNECTED)) {
/* Abort connect was not successful. */
Expand Down Expand Up @@ -686,6 +726,8 @@ void redisAsyncWrite(redisAsyncContext *ac) {

void redisAsyncHandleWrite(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
/* must not be called from a callback */
assert(!(c->flags & REDIS_IN_CALLBACK));

if (!(c->flags & REDIS_CONNECTED)) {
/* Abort connect was not successful. */
Expand All @@ -702,6 +744,8 @@ void redisAsyncHandleWrite(redisAsyncContext *ac) {
void redisAsyncHandleTimeout(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
redisCallback cb;
/* must not be called from a callback */
assert(!(c->flags & REDIS_IN_CALLBACK));

if ((c->flags & REDIS_CONNECTED)) {
if (ac->replies.head == NULL && ac->sub.replies.head == NULL) {
Expand All @@ -721,8 +765,8 @@ void redisAsyncHandleTimeout(redisAsyncContext *ac) {
__redisAsyncCopyError(ac);
}

if (!(c->flags & REDIS_CONNECTED) && ac->onConnect) {
ac->onConnect(ac, REDIS_ERR);
if (!(c->flags & REDIS_CONNECTED)) {
__redisRunConnectCallback(ac, REDIS_ERR);
}

while (__redisShiftCallback(&ac->replies, &cb) == REDIS_OK) {
Expand Down
2 changes: 1 addition & 1 deletion async.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ typedef struct redisCallbackList {

/* Connection callback prototypes */
typedef void (redisDisconnectCallback)(const struct redisAsyncContext*, int status);
typedef void (redisConnectCallback)(const struct redisAsyncContext*, int status);
typedef void (redisConnectCallback)(struct redisAsyncContext*, int status);
typedef void(redisTimerCallback)(void *timer, void *privdata);

/* Context for an async connection to Redis */
Expand Down
54 changes: 48 additions & 6 deletions test.c
Original file line number Diff line number Diff line change
Expand Up @@ -1907,7 +1907,9 @@ typedef enum astest_no
ASTEST_CONNECT=0,
ASTEST_CONN_TIMEOUT,
ASTEST_PINGPONG,
ASTEST_PINGPONG_TIMEOUT
ASTEST_PINGPONG_TIMEOUT,
ASTEST_ISSUE_931,
ASTEST_ISSUE_931_PING
}astest_no;

/* a static context for the async tests */
Expand All @@ -1918,6 +1920,7 @@ struct _astest {
int connects;
int connect_status;
int disconnects;
int pongs;
int disconnect_status;
int connected;
int err;
Expand All @@ -1941,7 +1944,9 @@ static void asCleanup(void* data)
t->ac = NULL;
}

static void connectCallback(const redisAsyncContext *c, int status) {
static void commandCallback(struct redisAsyncContext *ac, void* _reply, void* _privdata);

static void connectCallback(redisAsyncContext *c, int status) {
struct _astest *t = (struct _astest *)c->data;
assert(t == &astest);
assert(t->connects == 0);
Expand All @@ -1950,6 +1955,15 @@ static void connectCallback(const redisAsyncContext *c, int status) {
t->connects++;
t->connect_status = status;
t->connected = status == REDIS_OK ? 1 : -1;

if (t->testno == ASTEST_ISSUE_931) {
/* disconnect again */
redisAsyncDisconnect(c);
}
else if (t->testno == ASTEST_ISSUE_931_PING)
{
status = redisAsyncCommand(c, commandCallback, NULL, "PING");
}
}
static void disconnectCallback(const redisAsyncContext *c, int status) {
assert(c->data == (void*)&astest);
Expand All @@ -1969,20 +1983,22 @@ static void commandCallback(struct redisAsyncContext *ac, void* _reply, void* _p
(void)_privdata;
t->err = ac->err;
strcpy(t->errstr, ac->errstr);
if (t->testno == ASTEST_PINGPONG)
t->counter++;
if (t->testno == ASTEST_PINGPONG ||t->testno == ASTEST_ISSUE_931_PING)
{
test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS && strcmp(reply->str, "PONG") == 0);
assert(reply != NULL && reply->type == REDIS_REPLY_STATUS && strcmp(reply->str, "PONG") == 0);
t->pongs++;
redisAsyncFree(ac);
}
if (t->testno == ASTEST_PINGPONG_TIMEOUT)
{
/* two ping pongs */
assert(reply != NULL && reply->type == REDIS_REPLY_STATUS && strcmp(reply->str, "PONG") == 0);
if (++t->counter == 1) {
t->pongs++;
if (t->counter == 1) {
int status = redisAsyncCommand(ac, commandCallback, NULL, "PING");
assert(status == REDIS_OK);
} else {
test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS && strcmp(reply->str, "PONG") == 0);
redisAsyncFree(ac);
}
}
Expand Down Expand Up @@ -2089,6 +2105,7 @@ static void test_async_polling(struct config config) {
assert(status == REDIS_OK);
while(astest.ac)
redisPollTick(c, 0.1);
test_cond(astest.pongs == 1);

/* Test a ping/pong after connection that didn't time out.
* see https://github.com/redis/hiredis/issues/945
Expand All @@ -2105,8 +2122,33 @@ static void test_async_polling(struct config config) {
assert(status == REDIS_OK);
while(astest.ac)
redisPollTick(c, 0.1);
test_cond(astest.pongs == 2);
config = defaultconfig;
}

/* Test disconnect from an on_connect callback
* see https://github.com/redis/hiredis/issues/931
*/
test("Disconnect from onConnected callback (Issue #931): ");
c = do_aconnect(config, ASTEST_ISSUE_931);
while(astest.disconnects == 0)
redisPollTick(c, 0.1);
assert(astest.connected == 0);
assert(astest.connects == 1);
test_cond(astest.disconnects == 1);

/* Test ping/pong from an on_connect callback
* see https://github.com/redis/hiredis/issues/931
*/
test("Ping/Pong from onConnected callback (Issue #931): ");
c = do_aconnect(config, ASTEST_ISSUE_931_PING);
/* connect callback issues ping, reponse callback destroys context */
while(astest.ac)
redisPollTick(c, 0.1);
assert(astest.connected == 0);
assert(astest.connects == 1);
assert(astest.disconnects == 1);
test_cond(astest.pongs == 1);
}
/* End of Async polling_adapter driven tests */

Expand Down
0