8000 Allow sending commands after sending an unsubscribe by bjosv · Pull Request #1036 · redis/hiredis · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Allow sending commands after sending an unsubscribe #1036

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion async.c
Original file line number Diff line number Diff line change
Expand Up @@ -460,8 +460,15 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,
/* Unset subscribed flag only when no pipelined pending subscribe. */
if (reply->element[2]->integer == 0
&& dictSize(ac->sub.channels) == 0
&& dictSize(ac->sub.patterns) == 0)
&& dictSize(ac->sub.patterns) == 0) {
c->flags &= ~REDIS_SUBSCRIBED;

/* Move ongoing regular command callbacks. */
redisCallback cb;
while (__redisShiftCallback(&ac->sub.replies,&cb) == REDIS_OK) {
__redisPushCallback(&ac->replies,&cb);
}
}
}
}
sdsfree(sname);
Expand Down
41 changes: 24 additions & 17 deletions test.c
Original file line number Diff line number Diff line change
Expand Up @@ -1454,8 +1454,15 @@ typedef struct TestState {
redisOptions *options;
int checkpoint;
int resp3;
int disconnect;
} TestState;

/* Helper to disconnect and stop event loop */
void async_disconnect(redisAsyncContext *ac) {
redisAsyncDisconnect(ac);
event_base_loopbreak(base);
}

/* Testcase timeout, will trigger a failure */
void timeout_cb(int fd, short event, void *arg) {
(void) fd; (void) event; (void) arg;
Expand All @@ -1480,9 +1487,18 @@ void publish_msg(redisOptions *options, const char* channel, const char* msg) {
disconnect(c, 0);
}

/* Expect a reply of type INTEGER */
void integer_cb(redisAsyncContext *ac, void *r, void *privdata) {
redisReply *reply = r;
TestState *state = privdata;
assert(reply != NULL && reply->type == REDIS_REPLY_INTEGER);
state->checkpoint++;
if (state->disconnect) async_disconnect(ac);
}

/* Subscribe callback for test_pubsub_handling and test_pubsub_handling_resp3:
* - a published message triggers an unsubscribe
* - an unsubscribe response triggers a disconnect */
* - a command is sent before the unsubscribe response is received. */
void subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) {
redisReply *reply = r;
TestState *state = privdata;
Expand All @@ -1505,13 +1521,13 @@ void subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) {
< FF63 /td> redisAsyncCommand(ac,unexpected_cb,
(void*)"unsubscribe should call subscribe_cb()",
"unsubscribe");
/* Send a regular command after unsubscribing, then disconnect */
state->disconnect = 1;
redisAsyncCommand(ac,integer_cb,state,"LPUSH mylist foo");

} else if (strcmp(reply->element[0]->str,"unsubscribe") == 0) {
assert(strcmp(reply->element[1]->str,"mychannel") == 0 &&
reply->element[2]->str == NULL);

/* Disconnect after unsubscribe */
redisAsyncDisconnect(ac);
event_base_loopbreak(base);
} else {
printf("Unexpected pubsub command: %s\n", reply->element[0]->str);
exit(1);
Expand All @@ -1520,11 +1536,11 @@ void subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) {

/* Expect a reply of type ARRAY */
void array_cb(redisAsyncContext *ac, void *r, void *privdata) {
(void) ac;
redisReply *reply = r;
TestState *state = privdata;
assert(reply != NULL && reply->type == REDIS_REPLY_ARRAY);
state->checkpoint++;
if (state->disconnect) async_disconnect(ac);
}

static void test_pubsub_handling(struct config config) {
Expand Down Expand Up @@ -1557,7 +1573,7 @@ static void test_pubsub_handling(struct config config) {
event_base_free(base);

/* Verify test checkpoints */
assert(state.checkpoint == 2);
assert(state.checkpoint == 3);
}

/* Unexpected push message, will trigger a failure */
Expand All @@ -1567,15 +1583,6 @@ void unexpected_push_cb(redisAsyncContext *ac, void *r) {
exit(1);
}

/* Expect a reply of type INTEGER */
void integer_cb(redisAsyncContext *ac, void *r, void *privdata) {
(void) ac;
redisReply *reply = r;
TestState *state = privdata;
assert(reply != NULL && reply->type == REDIS_REPLY_INTEGER);
state->checkpoint++;
}

static void test_pubsub_handling_resp3(struct config config) {
test("Subscribe, handle published message and unsubscribe using RESP3: ");
/* Setup event dispatcher with a testcase timeout */
Expand Down Expand Up @@ -1616,7 +1623,7 @@ static void test_pubsub_handling_resp3(struct config config) {
event_base_free(base);

/* Verify test checkpoints */
assert(state.checkpoint == 5);
assert(state.checkpoint == 6);
}
#endif

Expand Down
0