diff options
Diffstat (limited to 'contrib/hiredis/hiredis.c')
-rw-r--r-- | contrib/hiredis/hiredis.c | 602 |
1 files changed, 398 insertions, 204 deletions
diff --git a/contrib/hiredis/hiredis.c b/contrib/hiredis/hiredis.c index 0f87bc323..446ceb1e6 100644 --- a/contrib/hiredis/hiredis.c +++ b/contrib/hiredis/hiredis.c @@ -34,7 +34,6 @@ #include "fmacros.h" #include <string.h> #include <stdlib.h> -#include <unistd.h> #include <assert.h> #include <errno.h> #include <ctype.h> @@ -42,12 +41,28 @@ #include "hiredis.h" #include "net.h" #include "sds.h" +#include "async.h" +#include "win32.h" + +extern int redisContextUpdateConnectTimeout(redisContext *c, const struct timeval *timeout); +extern int redisContextUpdateCommandTimeout(redisContext *c, const struct timeval *timeout); + +static redisContextFuncs redisContextDefaultFuncs = { + .close = redisNetClose, + .free_privctx = NULL, + .async_read = redisAsyncRead, + .async_write = redisAsyncWrite, + .read = redisNetRead, + .write = redisNetWrite +}; static redisReply *createReplyObject(int type); static void *createStringObject(const redisReadTask *task, char *str, size_t len); -static void *createArrayObject(const redisReadTask *task, int elements); +static void *createArrayObject(const redisReadTask *task, size_t elements); static void *createIntegerObject(const redisReadTask *task, long long value); +static void *createDoubleObject(const redisReadTask *task, double value, char *str, size_t len); static void *createNilObject(const redisReadTask *task); +static void *createBoolObject(const redisReadTask *task, int bval); /* Default set of functions to build the reply. Keep in mind that such a * function returning NULL is interpreted as OOM. */ @@ -55,13 +70,15 @@ static redisReplyObjectFunctions defaultFunctions = { createStringObject, createArrayObject, createIntegerObject, + createDoubleObject, createNilObject, + createBoolObject, freeReplyObject }; /* Create a reply object */ static redisReply *createReplyObject(int type) { - redisReply *r = calloc(1,sizeof(*r)); + redisReply *r = hi_calloc(1,sizeof(*r)); if (r == NULL) return NULL; @@ -80,23 +97,29 @@ void freeReplyObject(void *reply) { switch(r->type) { case REDIS_REPLY_INTEGER: + case REDIS_REPLY_NIL: + case REDIS_REPLY_BOOL: break; /* Nothing to free */ case REDIS_REPLY_ARRAY: + case REDIS_REPLY_MAP: + case REDIS_REPLY_SET: + case REDIS_REPLY_PUSH: if (r->element != NULL) { for (j = 0; j < r->elements; j++) - if (r->element[j] != NULL) - freeReplyObject(r->element[j]); - free(r->element); + freeReplyObject(r->element[j]); + hi_free(r->element); } break; case REDIS_REPLY_ERROR: case REDIS_REPLY_STATUS: case REDIS_REPLY_STRING: - if (r->str != NULL) - free(r->str); + case REDIS_REPLY_DOUBLE: + case REDIS_REPLY_VERB: + case REDIS_REPLY_BIGNUM: + hi_free(r->str); break; } - free(r); + hi_free(r); } static void *createStringObject(const redisReadTask *task, char *str, size_t len) { @@ -107,39 +130,56 @@ static void *createStringObject(const redisReadTask *task, char *str, size_t len if (r == NULL) return NULL; - buf = malloc(len+1); - if (buf == NULL) { - freeReplyObject(r); - return NULL; - } - assert(task->type == REDIS_REPLY_ERROR || task->type == REDIS_REPLY_STATUS || - task->type == REDIS_REPLY_STRING); + task->type == REDIS_REPLY_STRING || + task->type == REDIS_REPLY_VERB || + task->type == REDIS_REPLY_BIGNUM); /* Copy string value */ - memcpy(buf,str,len); - buf[len] = '\0'; + if (task->type == REDIS_REPLY_VERB) { + buf = hi_malloc(len-4+1); /* Skip 4 bytes of verbatim type header. */ + if (buf == NULL) goto oom; + + memcpy(r->vtype,str,3); + r->vtype[3] = '\0'; + memcpy(buf,str+4,len-4); + buf[len-4] = '\0'; + r->len = len - 4; + } else { + buf = hi_malloc(len+1); + if (buf == NULL) goto oom; + + memcpy(buf,str,len); + buf[len] = '\0'; + r->len = len; + } r->str = buf; - r->len = len; if (task->parent) { parent = task->parent->obj; - assert(parent->type == REDIS_REPLY_ARRAY); + assert(parent->type == REDIS_REPLY_ARRAY || + parent->type == REDIS_REPLY_MAP || + parent->type == REDIS_REPLY_SET || + parent->type == REDIS_REPLY_PUSH); parent->element[task->idx] = r; } return r; + +oom: + freeReplyObject(r); + return NULL; } -static void *createArrayObject(const redisReadTask *task, int elements) { +static void *createArrayObject(const redisReadTask *task, size_t elements) { redisReply *r, *parent; - r = createReplyObject(REDIS_REPLY_ARRAY); + r = createReplyObject(task->type); if (r == NULL) return NULL; if (elements > 0) { - r->element = calloc(elements,sizeof(redisReply*)); + r->element = hi_calloc(elements,sizeof(redisReply*)); if (r->element == NULL) { freeReplyObject(r); return NULL; @@ -150,7 +190,10 @@ static void *createArrayObject(const redisReadTask *task, int elements) { if (task->parent) { parent = task->parent->obj; - assert(parent->type == REDIS_REPLY_ARRAY); + assert(parent->type == REDIS_REPLY_ARRAY || + parent->type == REDIS_REPLY_MAP || + parent->type == REDIS_REPLY_SET || + parent->type == REDIS_REPLY_PUSH); parent->element[task->idx] = r; } return r; @@ -167,7 +210,47 @@ static void *createIntegerObject(const redisReadTask *task, long long value) { if (task->parent) { parent = task->parent->obj; - assert(parent->type == REDIS_REPLY_ARRAY); + assert(parent->type == REDIS_REPLY_ARRAY || + parent->type == REDIS_REPLY_MAP || + parent->type == REDIS_REPLY_SET || + parent->type == REDIS_REPLY_PUSH); + parent->element[task->idx] = r; + } + return r; +} + +static void *createDoubleObject(const redisReadTask *task, double value, char *str, size_t len) { + redisReply *r, *parent; + + if (len == SIZE_MAX) // Prevents hi_malloc(0) if len equals to SIZE_MAX + return NULL; + + r = createReplyObject(REDIS_REPLY_DOUBLE); + if (r == NULL) + return NULL; + + r->dval = value; + r->str = hi_malloc(len+1); + if (r->str == NULL) { + freeReplyObject(r); + return NULL; + } + + /* The double reply also has the original protocol string representing a + * double as a null terminated string. This way the caller does not need + * to format back for string conversion, especially since Redis does efforts + * to make the string more human readable avoiding the calssical double + * decimal string conversion artifacts. */ + memcpy(r->str, str, len); + r->str[len] = '\0'; + r->len = len; + + if (task->parent) { + parent = task->parent->obj; + assert(parent->type == REDIS_REPLY_ARRAY || + parent->type == REDIS_REPLY_MAP || + parent->type == REDIS_REPLY_SET || + parent->type == REDIS_REPLY_PUSH); parent->element[task->idx] = r; } return r; @@ -182,7 +265,30 @@ static void *createNilObject(const redisReadTask *task) { if (task->parent) { parent = task->parent->obj; - assert(parent->type == REDIS_REPLY_ARRAY); + assert(parent->type == REDIS_REPLY_ARRAY || + parent->type == REDIS_REPLY_MAP || + parent->type == REDIS_REPLY_SET || + parent->type == REDIS_REPLY_PUSH); + parent->element[task->idx] = r; + } + return r; +} + +static void *createBoolObject(const redisReadTask *task, int bval) { + redisReply *r, *parent; + + r = createReplyObject(REDIS_REPLY_BOOL); + if (r == NULL) + return NULL; + + r->integer = bval != 0; + + if (task->parent) { + parent = task->parent->obj; + assert(parent->type == REDIS_REPLY_ARRAY || + parent->type == REDIS_REPLY_MAP || + parent->type == REDIS_REPLY_SET || + parent->type == REDIS_REPLY_PUSH); parent->element[task->idx] = r; } return r; @@ -232,7 +338,7 @@ int redisvFormatCommand(char **target, const char *format, va_list ap) { if (*c != '%' || c[1] == '\0') { if (*c == ' ') { if (touched) { - newargv = realloc(curargv,sizeof(char*)*(argc+1)); + newargv = hi_realloc(curargv,sizeof(char*)*(argc+1)); if (newargv == NULL) goto memory_err; curargv = newargv; curargv[argc++] = curarg; @@ -286,17 +392,22 @@ int redisvFormatCommand(char **target, const char *format, va_list ap) { while (*_p != '\0' && strchr(flags,*_p) != NULL) _p++; /* Field width */ - while (*_p != '\0' && isdigit(*_p)) _p++; + while (*_p != '\0' && isdigit((int) *_p)) _p++; /* Precision */ if (*_p == '.') { _p++; - while (*_p != '\0' && isdigit(*_p)) _p++; + while (*_p != '\0' && isdigit((int) *_p)) _p++; } /* Copy va_list before consuming with va_arg */ va_copy(_cpy,ap); + /* Make sure we have more characters otherwise strchr() accepts + * '\0' as an integer specifier. This is checked after above + * va_copy() to avoid UB in fmt_invalid's call to va_end(). */ + if (*_p == '\0') goto fmt_invalid; + /* Integer conversion (without modifiers) */ if (strchr(intfmts,*_p) != NULL) { va_arg(ap,int); @@ -375,13 +486,15 @@ int redisvFormatCommand(char **target, const char *format, va_list ap) { touched = 1; c++; + if (*c == '\0') + break; } c++; } /* Add the last argument if needed */ if (touched) { - newargv = realloc(curargv,sizeof(char*)*(argc+1)); + newargv = hi_realloc(curargv,sizeof(char*)*(argc+1)); if (newargv == NULL) goto memory_err; curargv = newargv; curargv[argc++] = curarg; @@ -397,7 +510,7 @@ int redisvFormatCommand(char **target, const char *format, va_list ap) { totlen += 1+countDigits(argc)+2; /* Build the command at protocol level */ - cmd = malloc(totlen+1); + cmd = hi_malloc(totlen+1); if (cmd == NULL) goto memory_err; pos = sprintf(cmd,"*%d\r\n",argc); @@ -412,7 +525,7 @@ int redisvFormatCommand(char **target, const char *format, va_list ap) { assert(pos == totlen); cmd[pos] = '\0'; - free(curargv); + hi_free(curargv); *target = cmd; return totlen; @@ -428,15 +541,11 @@ cleanup: if (curargv) { while(argc--) sdsfree(curargv[argc]); - free(curargv); + hi_free(curargv); } sdsfree(curarg); - - /* No need to check cmd since it is the last statement that can fail, - * but do it anyway to be as defensive as possible. */ - if (cmd != NULL) - free(cmd); + hi_free(cmd); return error_type; } @@ -474,13 +583,12 @@ int redisFormatCommand(char **target, const char *format, ...) { * lengths. If the latter is set to NULL, strlen will be used to compute the * argument lengths. */ -int redisFormatSdsCommandArgv(sds *target, int argc, const char **argv, - const size_t *argvlen) +long long redisFormatSdsCommandArgv(sds *target, int argc, const char **argv, + const size_t *argvlen) { - sds cmd; - unsigned long long totlen; + sds cmd, aux; + unsigned long long totlen, len; int j; - size_t len; /* Abort on a NULL target */ if (target == NULL) @@ -499,15 +607,19 @@ int redisFormatSdsCommandArgv(sds *target, int argc, const char **argv, return -1; /* We already know how much storage we need */ - cmd = sdsMakeRoomFor(cmd, totlen); - if (cmd == NULL) + aux = sdsMakeRoomFor(cmd, totlen); + if (aux == NULL) { + sdsfree(cmd); return -1; + } + + cmd = aux; /* Construct command */ cmd = sdscatfmt(cmd, "*%i\r\n", argc); for (j=0; j < argc; j++) { len = argvlen ? argvlen[j] : strlen(argv[j]); - cmd = sdscatfmt(cmd, "$%T\r\n", len); + cmd = sdscatfmt(cmd, "$%U\r\n", len); cmd = sdscatlen(cmd, argv[j], len); cmd = sdscatlen(cmd, "\r\n", sizeof("\r\n")-1); } @@ -527,11 +639,11 @@ void redisFreeSdsCommand(sds cmd) { * lengths. If the latter is set to NULL, strlen will be used to compute the * argument lengths. */ -int redisFormatCommandArgv(char **target, int argc, const char **argv, const size_t *argvlen) { +long long redisFormatCommandArgv(char **target, int argc, const char **argv, const size_t *argvlen) { char *cmd = NULL; /* final command */ - int pos; /* position in final command */ - size_t len; - int totlen, j; + size_t pos; /* position in final command */ + size_t len, totlen; + int j; /* Abort on a NULL target */ if (target == NULL) @@ -545,7 +657,7 @@ int redisFormatCommandArgv(char **target, int argc, const char **argv, const siz } /* Build the command at protocol level */ - cmd = malloc(totlen+1); + cmd = hi_malloc(totlen+1); if (cmd == NULL) return -1; @@ -566,7 +678,7 @@ int redisFormatCommandArgv(char **target, int argc, const char **argv, const siz } void redisFreeCommand(char *cmd) { - free(cmd); + hi_free(cmd); } void __redisSetError(redisContext *c, int type, const char *str) { @@ -581,7 +693,7 @@ void __redisSetError(redisContext *c, int type, const char *str) { } else { /* Only REDIS_ERR_IO may lack a description! */ assert(type == REDIS_ERR_IO); - __redis_strerror_r(errno, c->errstr, sizeof(c->errstr)); + strerror_r(errno, c->errstr, sizeof(c->errstr)); } } @@ -589,21 +701,23 @@ redisReader *redisReaderCreate(void) { return redisReaderCreateWithFunctions(&defaultFunctions); } +static void redisPushAutoFree(void *privdata, void *reply) { + (void)privdata; + freeReplyObject(reply); +} + static redisContext *redisContextInit(void) { redisContext *c; - c = calloc(1,sizeof(redisContext)); + c = hi_calloc(1, sizeof(*c)); if (c == NULL) return NULL; - c->err = 0; - c->errstr[0] = '\0'; + c->funcs = &redisContextDefaultFuncs; + c->obuf = sdsempty(); c->reader = redisReaderCreate(); - c->tcp.host = NULL; - c->tcp.source_addr = NULL; - c->unix_sock.path = NULL; - c->timeout = NULL; + c->fd = REDIS_INVALID_FD; if (c->obuf == NULL || c->reader == NULL) { redisFree(c); @@ -616,26 +730,33 @@ static redisContext *redisContextInit(void) { void redisFree(redisContext *c) { if (c == NULL) return; - if (c->fd > 0) - close(c->fd); - if (c->obuf != NULL) - sdsfree(c->obuf); - if (c->reader != NULL) - redisReaderFree(c->reader); - if (c->tcp.host) - free(c->tcp.host); - if (c->tcp.source_addr) - free(c->tcp.source_addr); - if (c->unix_sock.path) - free(c->unix_sock.path); - if (c->timeout) - free(c->timeout); - free(c); + + if (c->funcs && c->funcs->close) { + c->funcs->close(c); + } + + sdsfree(c->obuf); + redisReaderFree(c->reader); + hi_free(c->tcp.host); + hi_free(c->tcp.source_addr); + hi_free(c->unix_sock.path); + hi_free(c->connect_timeout); + hi_free(c->command_timeout); + hi_free(c->saddr); + + if (c->privdata && c->free_privdata) + c->free_privdata(c->privdata); + + if (c->funcs && c->funcs->free_privctx) + c->funcs->free_privctx(c->privctx); + + memset(c, 0xff, sizeof(*c)); + hi_free(c); } -int redisFreeKeepFd(redisContext *c) { - int fd = c->fd; - c->fd = -1; +redisFD redisFreeKeepFd(redisContext *c) { + redisFD fd = c->fd; + c->fd = REDIS_INVALID_FD; redisFree(c); return fd; } @@ -644,8 +765,13 @@ int redisReconnect(redisContext *c) { c->err = 0; memset(c->errstr, '\0', strlen(c->errstr)); - if (c->fd > 0) { - close(c->fd); + if (c->privctx && c->funcs->free_privctx) { + c->funcs->free_privctx(c->privctx); + c->privctx = NULL; + } + + if (c->funcs && c->funcs->close) { + c->funcs->close(c); } sdsfree(c->obuf); @@ -654,126 +780,161 @@ int redisReconnect(redisContext *c) { c->obuf = sdsempty(); c->reader = redisReaderCreate(); + if (c->obuf == NULL || c->reader == NULL) { + __redisSetError(c, REDIS_ERR_OOM, "Out of memory"); + return REDIS_ERR; + } + + int ret = REDIS_ERR; if (c->connection_type == REDIS_CONN_TCP) { - return redisContextConnectBindTcp(c, c->tcp.host, c->tcp.port, - c->timeout, c->tcp.source_addr); + ret = redisContextConnectBindTcp(c, c->tcp.host, c->tcp.port, + c->connect_timeout, c->tcp.source_addr); } else if (c->connection_type == REDIS_CONN_UNIX) { - return redisContextConnectUnix(c, c->unix_sock.path, c->timeout); + ret = redisContextConnectUnix(c, c->unix_sock.path, c->connect_timeout); } else { /* Something bad happened here and shouldn't have. There isn't enough information in the context to reconnect. */ __redisSetError(c,REDIS_ERR_OTHER,"Not enough information to reconnect"); + ret = REDIS_ERR; } - return REDIS_ERR; -} + if (c->command_timeout != NULL && (c->flags & REDIS_BLOCK) && c->fd != REDIS_INVALID_FD) { + redisContextSetTimeout(c, *c->command_timeout); + } -/* Connect to a Redis instance. On error the field error in the returned - * context will be set to the return value of the error function. - * When no set of reply functions is given, the default set will be used. */ -redisContext *redisConnect(const char *ip, int port) { - redisContext *c; + return ret; +} - c = redisContextInit(); - if (c == NULL) +redisContext *redisConnectWithOptions(const redisOptions *options) { + redisContext *c = redisContextInit(); + if (c == NULL) { return NULL; + } + if (!(options->options & REDIS_OPT_NONBLOCK)) { + c->flags |= REDIS_BLOCK; + } + if (options->options & REDIS_OPT_REUSEADDR) { + c->flags |= REDIS_REUSEADDR; + } + if (options->options & REDIS_OPT_NOAUTOFREE) { + c->flags |= REDIS_NO_AUTO_FREE; + } + if (options->options & REDIS_OPT_NOAUTOFREEREPLIES) { + c->flags |= REDIS_NO_AUTO_FREE_REPLIES; + } + if (options->options & REDIS_OPT_PREFER_IPV4) { + c->flags |= REDIS_PREFER_IPV4; + } + if (options->options & REDIS_OPT_PREFER_IPV6) { + c->flags |= REDIS_PREFER_IPV6; + } - c->flags |= REDIS_BLOCK; - redisContextConnectTcp(c,ip,port,NULL); - return c; -} + /* Set any user supplied RESP3 PUSH handler or use freeReplyObject + * as a default unless specifically flagged that we don't want one. */ + if (options->push_cb != NULL) + redisSetPushCallback(c, options->push_cb); + else if (!(options->options & REDIS_OPT_NO_PUSH_AUTOFREE)) + redisSetPushCallback(c, redisPushAutoFree); -redisContext *redisConnectWithTimeout(const char *ip, int port, const struct timeval tv) { - redisContext *c; + c->privdata = options->privdata; + c->free_privdata = options->free_privdata; - c = redisContextInit(); - if (c == NULL) + if (redisContextUpdateConnectTimeout(c, options->connect_timeout) != REDIS_OK || + redisContextUpdateCommandTimeout(c, options->command_timeout) != REDIS_OK) { + __redisSetError(c, REDIS_ERR_OOM, "Out of memory"); + return c; + } + + if (options->type == REDIS_CONN_TCP) { + redisContextConnectBindTcp(c, options->endpoint.tcp.ip, + options->endpoint.tcp.port, options->connect_timeout, + options->endpoint.tcp.source_addr); + } else if (options->type == REDIS_CONN_UNIX) { + redisContextConnectUnix(c, options->endpoint.unix_socket, + options->connect_timeout); + } else if (options->type == REDIS_CONN_USERFD) { + c->fd = options->endpoint.fd; + c->flags |= REDIS_CONNECTED; + } else { + redisFree(c); return NULL; + } + + if (c->err == 0 && c->fd != REDIS_INVALID_FD && + options->command_timeout != NULL && (c->flags & REDIS_BLOCK)) + { + redisContextSetTimeout(c, *options->command_timeout); + } - c->flags |= REDIS_BLOCK; - redisContextConnectTcp(c,ip,port,&tv); return c; } -redisContext *redisConnectNonBlock(const char *ip, int port) { - redisContext *c; +/* Connect to a Redis instance. On error the field error in the returned + * context will be set to the return value of the error function. + * When no set of reply functions is given, the default set will be used. */ +redisContext *redisConnect(const char *ip, int port) { + redisOptions options = {0}; + REDIS_OPTIONS_SET_TCP(&options, ip, port); + return redisConnectWithOptions(&options); +} - c = redisContextInit(); - if (c == NULL) - return NULL; +redisContext *redisConnectWithTimeout(const char *ip, int port, const struct timeval tv) { + redisOptions options = {0}; + REDIS_OPTIONS_SET_TCP(&options, ip, port); + options.connect_timeout = &tv; + return redisConnectWithOptions(&options); +} - c->flags &= ~REDIS_BLOCK; - redisContextConnectTcp(c,ip,port,NULL); - return c; +redisContext *redisConnectNonBlock(const char *ip, int port) { + redisOptions options = {0}; + REDIS_OPTIONS_SET_TCP(&options, ip, port); + options.options |= REDIS_OPT_NONBLOCK; + return redisConnectWithOptions(&options); } redisContext *redisConnectBindNonBlock(const char *ip, int port, const char *source_addr) { - redisContext *c = redisContextInit(); - if (c == NULL) - return NULL; - c->flags &= ~REDIS_BLOCK; - redisContextConnectBindTcp(c,ip,port,NULL,source_addr); - return c; + redisOptions options = {0}; + REDIS_OPTIONS_SET_TCP(&options, ip, port); + options.endpoint.tcp.source_addr = source_addr; + options.options |= REDIS_OPT_NONBLOCK; + return redisConnectWithOptions(&options); } redisContext *redisConnectBindNonBlockWithReuse(const char *ip, int port, const char *source_addr) { - redisContext *c = redisContextInit(); - if (c == NULL) - return NULL; - c->flags &= ~REDIS_BLOCK; - c->flags |= REDIS_REUSEADDR; - redisContextConnectBindTcp(c,ip,port,NULL,source_addr); - return c; + redisOptions options = {0}; + REDIS_OPTIONS_SET_TCP(&options, ip, port); + options.endpoint.tcp.source_addr = source_addr; + options.options |= REDIS_OPT_NONBLOCK|REDIS_OPT_REUSEADDR; + return redisConnectWithOptions(&options); } redisContext *redisConnectUnix(const char *path) { - redisContext *c; - - c = redisContextInit(); - if (c == NULL) - return NULL; - - c->flags |= REDIS_BLOCK; - redisContextConnectUnix(c,path,NULL); - return c; + redisOptions options = {0}; + REDIS_OPTIONS_SET_UNIX(&options, path); + return redisConnectWithOptions(&options); } redisContext *redisConnectUnixWithTimeout(const char *path, const struct timeval tv) { - redisContext *c; - - c = redisContextInit(); - if (c == NULL) - return NULL; - - c->flags |= REDIS_BLOCK; - redisContextConnectUnix(c,path,&tv); - return c; + redisOptions options = {0}; + REDIS_OPTIONS_SET_UNIX(&options, path); + options.connect_timeout = &tv; + return redisConnectWithOptions(&options); } redisContext *redisConnectUnixNonBlock(const char *path) { - redisContext *c; - - c = redisContextInit(); - if (c == NULL) - return NULL; - - c->flags &= ~REDIS_BLOCK; - redisContextConnectUnix(c,path,NULL); - return c; + redisOptions options = {0}; + REDIS_OPTIONS_SET_UNIX(&options, path); + options.options |= REDIS_OPT_NONBLOCK; + return redisConnectWithOptions(&options); } -redisContext *redisConnectFd(int fd) { - redisContext *c; - - c = redisContextInit(); - if (c == NULL) - return NULL; - - c->fd = fd; - c->flags |= REDIS_BLOCK | REDIS_CONNECTED; - return c; +redisContext *redisConnectFd(redisFD fd) { + redisOptions options = {0}; + options.type = REDIS_CONN_USERFD; + options.endpoint.fd = fd; + return redisConnectWithOptions(&options); } /* Set read/write timeout on a blocking socket. */ @@ -783,17 +944,31 @@ int redisSetTimeout(redisContext *c, const struct timeval tv) { return REDIS_ERR; } +int redisEnableKeepAliveWithInterval(redisContext *c, int interval) { + return redisKeepAlive(c, interval); +} + /* Enable connection KeepAlive. */ int redisEnableKeepAlive(redisContext *c) { - if (redisKeepAlive(c, REDIS_KEEPALIVE_INTERVAL) != REDIS_OK) - return REDIS_ERR; - return REDIS_OK; + return redisKeepAlive(c, REDIS_KEEPALIVE_INTERVAL); +} + +/* Set the socket option TCP_USER_TIMEOUT. */ +int redisSetTcpUserTimeout(redisContext *c, unsigned int timeout) { + return redisContextSetTcpUserTimeout(c, timeout); +} + +/* Set a user provided RESP3 PUSH handler and return any old one set. */ +redisPushFn *redisSetPushCallback(redisContext *c, redisPushFn *fn) { + redisPushFn *old = c->push_cb; + c->push_cb = fn; + return old; } /* Use this function to handle a read event on the descriptor. It will try * and read some bytes from the socket and feed them to the reply parser. * - * After this function is called, you may use redisContextReadReply to + * After this function is called, you may use redisGetReplyFromReader to * see if there is a reply available. */ int redisBufferRead(redisContext *c) { char buf[1024*16]; @@ -803,22 +978,13 @@ int redisBufferRead(redisContext *c) { if (c->err) return REDIS_ERR; - nread = read(c->fd,buf,sizeof(buf)); - if (nread == -1) { - if ((errno == EAGAIN && !(c->flags & REDIS_BLOCK)) || (errno == EINTR)) { - /* Try again later */ - } else { - __redisSetError(c,REDIS_ERR_IO,NULL); - return REDIS_ERR; - } - } else if (nread == 0) { - __redisSetError(c,REDIS_ERR_EOF,"Server closed the connection"); + nread = c->funcs->read(c, buf, sizeof(buf)); + if (nread < 0) { + return REDIS_ERR; + } + if (nread > 0 && redisReaderFeed(c->reader, buf, nread) != REDIS_OK) { + __redisSetError(c, c->reader->err, c->reader->errstr); return REDIS_ERR; - } else { - if (redisReaderFeed(c->reader,buf,nread) != REDIS_OK) { - __redisSetError(c,c->reader->err,c->reader->errstr); - return REDIS_ERR; - } } return REDIS_OK; } @@ -829,45 +995,68 @@ int redisBufferRead(redisContext *c) { * successfully written to the socket. When the buffer is empty after the * write operation, "done" is set to 1 (if given). * - * Returns REDIS_ERR if an error occurred trying to write and sets - * c->errstr to hold the appropriate error string. + * Returns REDIS_ERR if an unrecoverable error occurred in the underlying + * c->funcs->write function. */ int redisBufferWrite(redisContext *c, int *done) { - int nwritten; /* Return early when the context has seen an error. */ if (c->err) return REDIS_ERR; if (sdslen(c->obuf) > 0) { - nwritten = write(c->fd,c->obuf,sdslen(c->obuf)); - if (nwritten == -1) { - if ((errno == EAGAIN && !(c->flags & REDIS_BLOCK)) || (errno == EINTR)) { - /* Try again later */ - } else { - __redisSetError(c,REDIS_ERR_IO,NULL); - return REDIS_ERR; - } + ssize_t nwritten = c->funcs->write(c); + if (nwritten < 0) { + return REDIS_ERR; } else if (nwritten > 0) { - if (nwritten == (signed)sdslen(c->obuf)) { + if (nwritten == (ssize_t)sdslen(c->obuf)) { sdsfree(c->obuf); c->obuf = sdsempty(); + if (c->obuf == NULL) + goto oom; } else { - sdsrange(c->obuf,nwritten,-1); + if (sdsrange(c->obuf,nwritten,-1) < 0) goto oom; } } } if (done != NULL) *done = (sdslen(c->obuf) == 0); return REDIS_OK; + +oom: + __redisSetError(c, REDIS_ERR_OOM, "Out of memory"); + return REDIS_ERR; +} + +/* Internal helper that returns 1 if the reply was a RESP3 PUSH + * message and we handled it with a user-provided callback. */ +static int redisHandledPushReply(redisContext *c, void *reply) { + if (reply && c->push_cb && redisIsPushReply(reply)) { + c->push_cb(c->privdata, reply); + return 1; + } + + return 0; } -/* Internal helper function to try and get a reply from the reader, - * or set an error in the context otherwise. */ +/* Get a reply from our reader or set an error in the context. */ int redisGetReplyFromReader(redisContext *c, void **reply) { - if (redisReaderGetReply(c->reader,reply) == REDIS_ERR) { + if (redisReaderGetReply(c->reader, reply) == REDIS_ERR) { __redisSetError(c,c->reader->err,c->reader->errstr); return REDIS_ERR; } + + return REDIS_OK; +} + +/* Internal helper to get the next reply from our reader while handling + * any PUSH messages we encounter along the way. This is separate from + * redisGetReplyFromReader so as to not change its behavior. */ +static int redisNextInBandReplyFromReader(redisContext *c, void **reply) { + do { + if (redisGetReplyFromReader(c, reply) == REDIS_ERR) + return REDIS_ERR; + } while (redisHandledPushReply(c, *reply)); + return REDIS_OK; } @@ -876,7 +1065,7 @@ int redisGetReply(redisContext *c, void **reply) { void *aux = NULL; /* Try to read pending replies */ - if (redisGetReplyFromReader(c,&aux) == REDIS_ERR) + if (redisNextInBandReplyFromReader(c,&aux) == REDIS_ERR) return REDIS_ERR; /* For the blocking context, flush output buffer and read reply */ @@ -891,13 +1080,19 @@ int redisGetReply(redisContext *c, void **reply) { do { if (redisBufferRead(c) == REDIS_ERR) return REDIS_ERR; - if (redisGetReplyFromReader(c,&aux) == REDIS_ERR) + + if (redisNextInBandReplyFromReader(c,&aux) == REDIS_ERR) return REDIS_ERR; } while (aux == NULL); } - /* Set reply object */ - if (reply != NULL) *reply = aux; + /* Set reply or free it if we were passed NULL */ + if (reply != NULL) { + *reply = aux; + } else { + freeReplyObject(aux); + } + return REDIS_OK; } @@ -944,11 +1139,11 @@ int redisvAppendCommand(redisContext *c, const char *format, va_list ap) { } if (__redisAppendCommand(c,cmd,len) != REDIS_OK) { - free(cmd); + hi_free(cmd); return REDIS_ERR; } - free(cmd); + hi_free(cmd); return REDIS_OK; } @@ -964,7 +1159,7 @@ int redisAppendCommand(redisContext *c, const char *format, ...) { int redisAppendCommandArgv(redisContext *c, int argc, const char **argv, const size_t *argvlen) { sds cmd; - int len; + long long len; len = redisFormatSdsCommandArgv(&cmd,argc,argv,argvlen); if (len == -1) { @@ -1011,9 +1206,8 @@ void *redisvCommand(redisContext *c, const char *format, va_list ap) { void *redisCommand(redisContext *c, const char *format, ...) { va_list ap; - void *reply = NULL; va_start(ap,format); - reply = redisvCommand(c,format,ap); + void *reply = redisvCommand(c,format,ap); va_end(ap); return reply; } |