-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Add RedisModule adapter #1182
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Add RedisModule adapter #1182
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,144 @@ | ||
#ifndef __HIREDIS_REDISMODULEAPI_H__ | ||
#define __HIREDIS_REDISMODULEAPI_H__ | ||
|
||
#include "redismodule.h" | ||
|
||
#include "../async.h" | ||
#include "../hiredis.h" | ||
|
||
#include <sys/types.h> | ||
|
||
typedef struct redisModuleEvents { | ||
redisAsyncContext *context; | ||
RedisModuleCtx *module_ctx; | ||
int fd; | ||
int reading, writing; | ||
int timer_active; | ||
RedisModuleTimerID timer_id; | ||
} redisModuleEvents; | ||
|
||
static inline void redisModuleReadEvent(int fd, void *privdata, int mask) { | ||
(void) fd; | ||
(void) mask; | ||
|
||
redisModuleEvents *e = (redisModuleEvents*)privdata; | ||
redisAsyncHandleRead(e->context); | ||
} | ||
|
||
static inline void redisModuleWriteEvent(int fd, void *privdata, int mask) { | ||
(void) fd; | ||
(void) mask; | ||
|
||
redisModuleEvents *e = (redisModuleEvents*)privdata; | ||
redisAsyncHandleWrite(e->context); | ||
} | ||
|
||
static inline void redisModuleAddRead(void *privdata) { | ||
redisModuleEvents *e = (redisModuleEvents*)privdata; | ||
if (!e->reading) { | ||
e->reading = 1; | ||
RedisModule_EventLoopAdd(e->fd, REDISMODULE_EVENTLOOP_READABLE, redisModuleReadEvent, e); | ||
} | ||
} | ||
|
||
static inline void redisModuleDelRead(void *privdata) { | ||
redisModuleEvents *e = (redisModuleEvents*)privdata; | ||
if (e->reading) { | ||
e->reading = 0; | ||
RedisModule_EventLoopDel(e->fd, REDISMODULE_EVENTLOOP_READABLE); | ||
} | ||
} | ||
|
||
static inline void redisModuleAddWrite(void *privdata) { | ||
redisModuleEvents *e = (redisModuleEvents*)privdata; | ||
if (!e->writing) { | ||
e->writing = 1; | ||
RedisModule_EventLoopAdd(e->fd, REDISMODULE_EVENTLOOP_WRITABLE, redisModuleWriteEvent, e); | ||
} | ||
} | ||
|
||
static inline void redisModuleDelWrite(void *privdata) { | ||
redisModuleEvents *e = (redisModuleEvents*)privdata; | ||
if (e->writing) { | ||
e->writing = 0; | ||
RedisModule_EventLoopDel(e->fd, REDISMODULE_EVENTLOOP_WRITABLE); | ||
} | ||
} | ||
|
||
static inline void redisModuleStopTimer(void *privdata) { | ||
redisModuleEvents *e = (redisModuleEvents*)privdata; | ||
if (e->timer_active) { | ||
RedisModule_StopTimer(e->module_ctx, e->timer_id, NULL); | ||
} | ||
e->timer_active = 0; | ||
} | ||
|
||
static inline void redisModuleCleanup(void *privdata) { | ||
redisModuleEvents *e = (redisModuleEvents*)privdata; | ||
redisModuleDelRead(privdata); | ||
redisModuleDelWrite(privdata); | ||
redisModuleStopTimer(privdata); | ||
hi_free(e); | ||
} | ||
|
||
static inline void redisModuleTimeout(RedisModuleCtx *ctx, void *privdata) { | ||
(void) ctx; | ||
|
||
redisModuleEvents *e = (redisModuleEvents*)privdata; | ||
e->timer_active = 0; | ||
redisAsyncHandleTimeout(e->context); | ||
} | ||
|
||
static inline void redisModuleSetTimeout(void *privdata, struct timeval tv) { | ||
redisModuleEvents* e = (redisModuleEvents*)privdata; | ||
|
||
redisModuleStopTimer(privdata); | ||
|
||
mstime_t millis = tv.tv_sec * 1000 + tv.tv_usec / 1000.0; | ||
e->timer_id = RedisModule_CreateTimer(e->module_ctx, millis, redisModuleTimeout, e); | ||
e->timer_active = 1; | ||
} | ||
|
||
/* Check if Redis version is compatible with the adapter. */ | ||
static inline int redisModuleCompatibilityCheck(void) { | ||
if (!RedisModule_EventLoopAdd || | ||
!RedisModule_EventLoopDel || | ||
!RedisModule_CreateTimer || | ||
!RedisModule_StopTimer) { | ||
return REDIS_ERR; | ||
} | ||
return REDIS_OK; | ||
} | ||
|
||
static inline int redisModuleAttach(redisAsyncContext *ac, RedisModuleCtx *module_ctx) { | ||
redisContext *c = &(ac->c); | ||
redisModuleEvents *e; | ||
|
||
/* Nothing should be attached when something is already attached */ | ||
if (ac->ev.data != NULL) | ||
return REDIS_ERR; | ||
|
||
/* Create container for context and r/w events */ | ||
e = (redisModuleEvents*)hi_malloc(sizeof(*e)); | ||
if (e == NULL) | ||
return REDIS_ERR; | ||
|
||
e->context = ac; | ||
e->module_ctx = module_ctx; | ||
e->fd = c->fd; | ||
e->reading = e->writing = 0; | ||
e->timer_active = 0; | ||
|
||
/* Register functions to start/stop listening for events */ | ||
ac->ev.addRead = redisModuleAddRead; | ||
ac->ev.delRead = redisModuleDelRead; | ||
ac->ev.addWrite = redisModuleAddWrite; | ||
ac->ev.delWrite = redisModuleDelWrite; | ||
ac->ev.cleanup = redisModuleCleanup; | ||
ac->ev.scheduleTimer = redisModuleSetTimeout; | ||
ac->ev.data = e; | ||
|
||
return REDIS_OK; | ||
} | ||
|
||
#endif |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
#include <stdio.h> | ||
#include <stdlib.h> | ||
#include <string.h> | ||
#include <signal.h> | ||
|
||
#include <hiredis.h> | ||
#include <async.h> | ||
#include <adapters/redismoduleapi.h> | ||
|
||
void debugCallback(redisAsyncContext *c, void *r, void *privdata) { | ||
(void)privdata; //unused | ||
redisReply *reply = r; | ||
if (reply == NULL) { | ||
/* The DEBUG SLEEP command will almost always fail, because we have set a 1 second timeout */ | ||
printf("`DEBUG SLEEP` error: %s\n", c->errstr ? c->errstr : "unknown error"); | ||
return; | ||
} | ||
/* Disconnect after receiving the reply of DEBUG SLEEP (which will not)*/ | ||
redisAsyncDisconnect(c); | ||
} | ||
|
||
void getCallback(redisAsyncContext *c, void *r, void *privdata) { | ||
redisReply *reply = r; | ||
if (reply == NULL) { | ||
if (c->errstr) { | ||
printf("errstr: %s\n", c->errstr); | ||
} | ||
return; | ||
} | ||
printf("argv[%s]: %s\n", (char*)privdata, reply->str); | ||
|
||
/* start another request that demonstrate timeout */ | ||
redisAsyncCommand(c, debugCallback, NULL, "DEBUG SLEEP %f", 1.5); | ||
} | ||
|
||
void connectCallback(const redisAsyncContext *c, int status) { | ||
if (status != REDIS_OK) { | ||
printf("Error: %s\n", c->errstr); | ||
return; | ||
} | ||
printf("Connected...\n"); | ||
} | ||
|
||
void disconnectCallback(const redisAsyncContext *c, int status) { | ||
if (status != REDIS_OK) { | ||
printf("Error: %s\n", c->errstr); | ||
return; | ||
} | ||
printf("Disconnected...\n"); | ||
} | ||
|
||
/* | ||
* This example requires Redis 7.0 or above. | ||
* | ||
* 1- Compile this file as a shared library. Directory of "redismodule.h" must | ||
* be in the include path. | ||
* gcc -fPIC -shared -I../../redis/src/ -I.. example-redismoduleapi.c -o example-redismoduleapi.so | ||
* | ||
* 2- Load module: | ||
* redis-server --loadmodule ./example-redismoduleapi.so value | ||
*/ | ||
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { | ||
|
||
int ret = RedisModule_Init(ctx, "example-redismoduleapi", 1, REDISMODULE_APIVER_1); | ||
if (ret != REDISMODULE_OK) { | ||
printf("error module init \n"); | ||
return REDISMODULE_ERR; | ||
} | ||
|
||
if (redisModuleCompatibilityCheck() != REDIS_OK) { | ||
printf("Redis 7.0 or above is required! \n"); | ||
return REDISMODULE_ERR; | ||
} | ||
|
||
redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379); | ||
if (c->err) { | ||
/* Let *c leak for now... */ | ||
printf("Error: %s\n", c->errstr); | ||
return 1; | ||
} | ||
|
||
size_t len; | ||
const char *val = RedisModule_StringPtrLen(argv[argc-1], &len); | ||
|
||
RedisModuleCtx *module_ctx = RedisModule_GetDetachedThreadSafeContext(ctx); | ||
redisModuleAttach(c, module_ctx); | ||
tezc marked this conversation as resolved.
Show resolved
Hide resolved
|
||
redisAsyncSetConnectCallback(c,connectCallback); | ||
redisAsyncSetDisconnectCallback(c,disconnectCallback); | ||
redisAsyncSetTimeout(c, (struct timeval){ .tv_sec = 1, .tv_usec = 0}); | ||
|
||
/* | ||
In this demo, we first `set key`, then `get key` to demonstrate the basic usage of the adapter. | ||
Then in `getCallback`, we start a `debug sleep` command to create 1.5 second long request. | ||
Because we have set a 1 second timeout to the connection, the command will always fail with a | ||
timeout error, which is shown in the `debugCallback`. | ||
*/ | ||
|
||
redisAsyncCommand(c, NULL, NULL, "SET key %b", val, len); | ||
redisAsyncCommand(c, getCallback, (char*)"end-1", "GET key"); | ||
return 0; | ||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.