aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/hiredis/hiredis.c
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/hiredis/hiredis.c')
-rw-r--r--contrib/hiredis/hiredis.c602
1 files changed, 398 insertions, 204 deletions
diff --git a/contrib/hiredis/hiredis.c b/contrib/hiredis/hiredis.c
index 0f87bc323..446ceb1e6 100644
--- a/contrib/hiredis/hiredis.c
+++ b/contrib/hiredis/hiredis.c
@@ -34,7 +34,6 @@
#include "fmacros.h"
#include <string.h>
#include <stdlib.h>
-#include <unistd.h>
#include <assert.h>
#include <errno.h>
#include <ctype.h>
@@ -42,12 +41,28 @@
#include "hiredis.h"
#include "net.h"
#include "sds.h"
+#include "async.h"
+#include "win32.h"
+
+extern int redisContextUpdateConnectTimeout(redisContext *c, const struct timeval *timeout);
+extern int redisContextUpdateCommandTimeout(redisContext *c, const struct timeval *timeout);
+
+static redisContextFuncs redisContextDefaultFuncs = {
+ .close = redisNetClose,
+ .free_privctx = NULL,
+ .async_read = redisAsyncRead,
+ .async_write = redisAsyncWrite,
+ .read = redisNetRead,
+ .write = redisNetWrite
+};
static redisReply *createReplyObject(int type);
static void *createStringObject(const redisReadTask *task, char *str, size_t len);
-static void *createArrayObject(const redisReadTask *task, int elements);
+static void *createArrayObject(const redisReadTask *task, size_t elements);
static void *createIntegerObject(const redisReadTask *task, long long value);
+static void *createDoubleObject(const redisReadTask *task, double value, char *str, size_t len);
static void *createNilObject(const redisReadTask *task);
+static void *createBoolObject(const redisReadTask *task, int bval);
/* Default set of functions to build the reply. Keep in mind that such a
* function returning NULL is interpreted as OOM. */
@@ -55,13 +70,15 @@ static redisReplyObjectFunctions defaultFunctions = {
createStringObject,
createArrayObject,
createIntegerObject,
+ createDoubleObject,
createNilObject,
+ createBoolObject,
freeReplyObject
};
/* Create a reply object */
static redisReply *createReplyObject(int type) {
- redisReply *r = calloc(1,sizeof(*r));
+ redisReply *r = hi_calloc(1,sizeof(*r));
if (r == NULL)
return NULL;
@@ -80,23 +97,29 @@ void freeReplyObject(void *reply) {
switch(r->type) {
case REDIS_REPLY_INTEGER:
+ case REDIS_REPLY_NIL:
+ case REDIS_REPLY_BOOL:
break; /* Nothing to free */
case REDIS_REPLY_ARRAY:
+ case REDIS_REPLY_MAP:
+ case REDIS_REPLY_SET:
+ case REDIS_REPLY_PUSH:
if (r->element != NULL) {
for (j = 0; j < r->elements; j++)
- if (r->element[j] != NULL)
- freeReplyObject(r->element[j]);
- free(r->element);
+ freeReplyObject(r->element[j]);
+ hi_free(r->element);
}
break;
case REDIS_REPLY_ERROR:
case REDIS_REPLY_STATUS:
case REDIS_REPLY_STRING:
- if (r->str != NULL)
- free(r->str);
+ case REDIS_REPLY_DOUBLE:
+ case REDIS_REPLY_VERB:
+ case REDIS_REPLY_BIGNUM:
+ hi_free(r->str);
break;
}
- free(r);
+ hi_free(r);
}
static void *createStringObject(const redisReadTask *task, char *str, size_t len) {
@@ -107,39 +130,56 @@ static void *createStringObject(const redisReadTask *task, char *str, size_t len
if (r == NULL)
return NULL;
- buf = malloc(len+1);
- if (buf == NULL) {
- freeReplyObject(r);
- return NULL;
- }
-
assert(task->type == REDIS_REPLY_ERROR ||
task->type == REDIS_REPLY_STATUS ||
- task->type == REDIS_REPLY_STRING);
+ task->type == REDIS_REPLY_STRING ||
+ task->type == REDIS_REPLY_VERB ||
+ task->type == REDIS_REPLY_BIGNUM);
/* Copy string value */
- memcpy(buf,str,len);
- buf[len] = '\0';
+ if (task->type == REDIS_REPLY_VERB) {
+ buf = hi_malloc(len-4+1); /* Skip 4 bytes of verbatim type header. */
+ if (buf == NULL) goto oom;
+
+ memcpy(r->vtype,str,3);
+ r->vtype[3] = '\0';
+ memcpy(buf,str+4,len-4);
+ buf[len-4] = '\0';
+ r->len = len - 4;
+ } else {
+ buf = hi_malloc(len+1);
+ if (buf == NULL) goto oom;
+
+ memcpy(buf,str,len);
+ buf[len] = '\0';
+ r->len = len;
+ }
r->str = buf;
- r->len = len;
if (task->parent) {
parent = task->parent->obj;
- assert(parent->type == REDIS_REPLY_ARRAY);
+ assert(parent->type == REDIS_REPLY_ARRAY ||
+ parent->type == REDIS_REPLY_MAP ||
+ parent->type == REDIS_REPLY_SET ||
+ parent->type == REDIS_REPLY_PUSH);
parent->element[task->idx] = r;
}
return r;
+
+oom:
+ freeReplyObject(r);
+ return NULL;
}
-static void *createArrayObject(const redisReadTask *task, int elements) {
+static void *createArrayObject(const redisReadTask *task, size_t elements) {
redisReply *r, *parent;
- r = createReplyObject(REDIS_REPLY_ARRAY);
+ r = createReplyObject(task->type);
if (r == NULL)
return NULL;
if (elements > 0) {
- r->element = calloc(elements,sizeof(redisReply*));
+ r->element = hi_calloc(elements,sizeof(redisReply*));
if (r->element == NULL) {
freeReplyObject(r);
return NULL;
@@ -150,7 +190,10 @@ static void *createArrayObject(const redisReadTask *task, int elements) {
if (task->parent) {
parent = task->parent->obj;
- assert(parent->type == REDIS_REPLY_ARRAY);
+ assert(parent->type == REDIS_REPLY_ARRAY ||
+ parent->type == REDIS_REPLY_MAP ||
+ parent->type == REDIS_REPLY_SET ||
+ parent->type == REDIS_REPLY_PUSH);
parent->element[task->idx] = r;
}
return r;
@@ -167,7 +210,47 @@ static void *createIntegerObject(const redisReadTask *task, long long value) {
if (task->parent) {
parent = task->parent->obj;
- assert(parent->type == REDIS_REPLY_ARRAY);
+ assert(parent->type == REDIS_REPLY_ARRAY ||
+ parent->type == REDIS_REPLY_MAP ||
+ parent->type == REDIS_REPLY_SET ||
+ parent->type == REDIS_REPLY_PUSH);
+ parent->element[task->idx] = r;
+ }
+ return r;
+}
+
+static void *createDoubleObject(const redisReadTask *task, double value, char *str, size_t len) {
+ redisReply *r, *parent;
+
+ if (len == SIZE_MAX) // Prevents hi_malloc(0) if len equals to SIZE_MAX
+ return NULL;
+
+ r = createReplyObject(REDIS_REPLY_DOUBLE);
+ if (r == NULL)
+ return NULL;
+
+ r->dval = value;
+ r->str = hi_malloc(len+1);
+ if (r->str == NULL) {
+ freeReplyObject(r);
+ return NULL;
+ }
+
+ /* The double reply also has the original protocol string representing a
+ * double as a null terminated string. This way the caller does not need
+ * to format back for string conversion, especially since Redis does efforts
+ * to make the string more human readable avoiding the calssical double
+ * decimal string conversion artifacts. */
+ memcpy(r->str, str, len);
+ r->str[len] = '\0';
+ r->len = len;
+
+ if (task->parent) {
+ parent = task->parent->obj;
+ assert(parent->type == REDIS_REPLY_ARRAY ||
+ parent->type == REDIS_REPLY_MAP ||
+ parent->type == REDIS_REPLY_SET ||
+ parent->type == REDIS_REPLY_PUSH);
parent->element[task->idx] = r;
}
return r;
@@ -182,7 +265,30 @@ static void *createNilObject(const redisReadTask *task) {
if (task->parent) {
parent = task->parent->obj;
- assert(parent->type == REDIS_REPLY_ARRAY);
+ assert(parent->type == REDIS_REPLY_ARRAY ||
+ parent->type == REDIS_REPLY_MAP ||
+ parent->type == REDIS_REPLY_SET ||
+ parent->type == REDIS_REPLY_PUSH);
+ parent->element[task->idx] = r;
+ }
+ return r;
+}
+
+static void *createBoolObject(const redisReadTask *task, int bval) {
+ redisReply *r, *parent;
+
+ r = createReplyObject(REDIS_REPLY_BOOL);
+ if (r == NULL)
+ return NULL;
+
+ r->integer = bval != 0;
+
+ if (task->parent) {
+ parent = task->parent->obj;
+ assert(parent->type == REDIS_REPLY_ARRAY ||
+ parent->type == REDIS_REPLY_MAP ||
+ parent->type == REDIS_REPLY_SET ||
+ parent->type == REDIS_REPLY_PUSH);
parent->element[task->idx] = r;
}
return r;
@@ -232,7 +338,7 @@ int redisvFormatCommand(char **target, const char *format, va_list ap) {
if (*c != '%' || c[1] == '\0') {
if (*c == ' ') {
if (touched) {
- newargv = realloc(curargv,sizeof(char*)*(argc+1));
+ newargv = hi_realloc(curargv,sizeof(char*)*(argc+1));
if (newargv == NULL) goto memory_err;
curargv = newargv;
curargv[argc++] = curarg;
@@ -286,17 +392,22 @@ int redisvFormatCommand(char **target, const char *format, va_list ap) {
while (*_p != '\0' && strchr(flags,*_p) != NULL) _p++;
/* Field width */
- while (*_p != '\0' && isdigit(*_p)) _p++;
+ while (*_p != '\0' && isdigit((int) *_p)) _p++;
/* Precision */
if (*_p == '.') {
_p++;
- while (*_p != '\0' && isdigit(*_p)) _p++;
+ while (*_p != '\0' && isdigit((int) *_p)) _p++;
}
/* Copy va_list before consuming with va_arg */
va_copy(_cpy,ap);
+ /* Make sure we have more characters otherwise strchr() accepts
+ * '\0' as an integer specifier. This is checked after above
+ * va_copy() to avoid UB in fmt_invalid's call to va_end(). */
+ if (*_p == '\0') goto fmt_invalid;
+
/* Integer conversion (without modifiers) */
if (strchr(intfmts,*_p) != NULL) {
va_arg(ap,int);
@@ -375,13 +486,15 @@ int redisvFormatCommand(char **target, const char *format, va_list ap) {
touched = 1;
c++;
+ if (*c == '\0')
+ break;
}
c++;
}
/* Add the last argument if needed */
if (touched) {
- newargv = realloc(curargv,sizeof(char*)*(argc+1));
+ newargv = hi_realloc(curargv,sizeof(char*)*(argc+1));
if (newargv == NULL) goto memory_err;
curargv = newargv;
curargv[argc++] = curarg;
@@ -397,7 +510,7 @@ int redisvFormatCommand(char **target, const char *format, va_list ap) {
totlen += 1+countDigits(argc)+2;
/* Build the command at protocol level */
- cmd = malloc(totlen+1);
+ cmd = hi_malloc(totlen+1);
if (cmd == NULL) goto memory_err;
pos = sprintf(cmd,"*%d\r\n",argc);
@@ -412,7 +525,7 @@ int redisvFormatCommand(char **target, const char *format, va_list ap) {
assert(pos == totlen);
cmd[pos] = '\0';
- free(curargv);
+ hi_free(curargv);
*target = cmd;
return totlen;
@@ -428,15 +541,11 @@ cleanup:
if (curargv) {
while(argc--)
sdsfree(curargv[argc]);
- free(curargv);
+ hi_free(curargv);
}
sdsfree(curarg);
-
- /* No need to check cmd since it is the last statement that can fail,
- * but do it anyway to be as defensive as possible. */
- if (cmd != NULL)
- free(cmd);
+ hi_free(cmd);
return error_type;
}
@@ -474,13 +583,12 @@ int redisFormatCommand(char **target, const char *format, ...) {
* lengths. If the latter is set to NULL, strlen will be used to compute the
* argument lengths.
*/
-int redisFormatSdsCommandArgv(sds *target, int argc, const char **argv,
- const size_t *argvlen)
+long long redisFormatSdsCommandArgv(sds *target, int argc, const char **argv,
+ const size_t *argvlen)
{
- sds cmd;
- unsigned long long totlen;
+ sds cmd, aux;
+ unsigned long long totlen, len;
int j;
- size_t len;
/* Abort on a NULL target */
if (target == NULL)
@@ -499,15 +607,19 @@ int redisFormatSdsCommandArgv(sds *target, int argc, const char **argv,
return -1;
/* We already know how much storage we need */
- cmd = sdsMakeRoomFor(cmd, totlen);
- if (cmd == NULL)
+ aux = sdsMakeRoomFor(cmd, totlen);
+ if (aux == NULL) {
+ sdsfree(cmd);
return -1;
+ }
+
+ cmd = aux;
/* Construct command */
cmd = sdscatfmt(cmd, "*%i\r\n", argc);
for (j=0; j < argc; j++) {
len = argvlen ? argvlen[j] : strlen(argv[j]);
- cmd = sdscatfmt(cmd, "$%T\r\n", len);
+ cmd = sdscatfmt(cmd, "$%U\r\n", len);
cmd = sdscatlen(cmd, argv[j], len);
cmd = sdscatlen(cmd, "\r\n", sizeof("\r\n")-1);
}
@@ -527,11 +639,11 @@ void redisFreeSdsCommand(sds cmd) {
* lengths. If the latter is set to NULL, strlen will be used to compute the
* argument lengths.
*/
-int redisFormatCommandArgv(char **target, int argc, const char **argv, const size_t *argvlen) {
+long long redisFormatCommandArgv(char **target, int argc, const char **argv, const size_t *argvlen) {
char *cmd = NULL; /* final command */
- int pos; /* position in final command */
- size_t len;
- int totlen, j;
+ size_t pos; /* position in final command */
+ size_t len, totlen;
+ int j;
/* Abort on a NULL target */
if (target == NULL)
@@ -545,7 +657,7 @@ int redisFormatCommandArgv(char **target, int argc, const char **argv, const siz
}
/* Build the command at protocol level */
- cmd = malloc(totlen+1);
+ cmd = hi_malloc(totlen+1);
if (cmd == NULL)
return -1;
@@ -566,7 +678,7 @@ int redisFormatCommandArgv(char **target, int argc, const char **argv, const siz
}
void redisFreeCommand(char *cmd) {
- free(cmd);
+ hi_free(cmd);
}
void __redisSetError(redisContext *c, int type, const char *str) {
@@ -581,7 +693,7 @@ void __redisSetError(redisContext *c, int type, const char *str) {
} else {
/* Only REDIS_ERR_IO may lack a description! */
assert(type == REDIS_ERR_IO);
- __redis_strerror_r(errno, c->errstr, sizeof(c->errstr));
+ strerror_r(errno, c->errstr, sizeof(c->errstr));
}
}
@@ -589,21 +701,23 @@ redisReader *redisReaderCreate(void) {
return redisReaderCreateWithFunctions(&defaultFunctions);
}
+static void redisPushAutoFree(void *privdata, void *reply) {
+ (void)privdata;
+ freeReplyObject(reply);
+}
+
static redisContext *redisContextInit(void) {
redisContext *c;
- c = calloc(1,sizeof(redisContext));
+ c = hi_calloc(1, sizeof(*c));
if (c == NULL)
return NULL;
- c->err = 0;
- c->errstr[0] = '\0';
+ c->funcs = &redisContextDefaultFuncs;
+
c->obuf = sdsempty();
c->reader = redisReaderCreate();
- c->tcp.host = NULL;
- c->tcp.source_addr = NULL;
- c->unix_sock.path = NULL;
- c->timeout = NULL;
+ c->fd = REDIS_INVALID_FD;
if (c->obuf == NULL || c->reader == NULL) {
redisFree(c);
@@ -616,26 +730,33 @@ static redisContext *redisContextInit(void) {
void redisFree(redisContext *c) {
if (c == NULL)
return;
- if (c->fd > 0)
- close(c->fd);
- if (c->obuf != NULL)
- sdsfree(c->obuf);
- if (c->reader != NULL)
- redisReaderFree(c->reader);
- if (c->tcp.host)
- free(c->tcp.host);
- if (c->tcp.source_addr)
- free(c->tcp.source_addr);
- if (c->unix_sock.path)
- free(c->unix_sock.path);
- if (c->timeout)
- free(c->timeout);
- free(c);
+
+ if (c->funcs && c->funcs->close) {
+ c->funcs->close(c);
+ }
+
+ sdsfree(c->obuf);
+ redisReaderFree(c->reader);
+ hi_free(c->tcp.host);
+ hi_free(c->tcp.source_addr);
+ hi_free(c->unix_sock.path);
+ hi_free(c->connect_timeout);
+ hi_free(c->command_timeout);
+ hi_free(c->saddr);
+
+ if (c->privdata && c->free_privdata)
+ c->free_privdata(c->privdata);
+
+ if (c->funcs && c->funcs->free_privctx)
+ c->funcs->free_privctx(c->privctx);
+
+ memset(c, 0xff, sizeof(*c));
+ hi_free(c);
}
-int redisFreeKeepFd(redisContext *c) {
- int fd = c->fd;
- c->fd = -1;
+redisFD redisFreeKeepFd(redisContext *c) {
+ redisFD fd = c->fd;
+ c->fd = REDIS_INVALID_FD;
redisFree(c);
return fd;
}
@@ -644,8 +765,13 @@ int redisReconnect(redisContext *c) {
c->err = 0;
memset(c->errstr, '\0', strlen(c->errstr));
- if (c->fd > 0) {
- close(c->fd);
+ if (c->privctx && c->funcs->free_privctx) {
+ c->funcs->free_privctx(c->privctx);
+ c->privctx = NULL;
+ }
+
+ if (c->funcs && c->funcs->close) {
+ c->funcs->close(c);
}
sdsfree(c->obuf);
@@ -654,126 +780,161 @@ int redisReconnect(redisContext *c) {
c->obuf = sdsempty();
c->reader = redisReaderCreate();
+ if (c->obuf == NULL || c->reader == NULL) {
+ __redisSetError(c, REDIS_ERR_OOM, "Out of memory");
+ return REDIS_ERR;
+ }
+
+ int ret = REDIS_ERR;
if (c->connection_type == REDIS_CONN_TCP) {
- return redisContextConnectBindTcp(c, c->tcp.host, c->tcp.port,
- c->timeout, c->tcp.source_addr);
+ ret = redisContextConnectBindTcp(c, c->tcp.host, c->tcp.port,
+ c->connect_timeout, c->tcp.source_addr);
} else if (c->connection_type == REDIS_CONN_UNIX) {
- return redisContextConnectUnix(c, c->unix_sock.path, c->timeout);
+ ret = redisContextConnectUnix(c, c->unix_sock.path, c->connect_timeout);
} else {
/* Something bad happened here and shouldn't have. There isn't
enough information in the context to reconnect. */
__redisSetError(c,REDIS_ERR_OTHER,"Not enough information to reconnect");
+ ret = REDIS_ERR;
}
- return REDIS_ERR;
-}
+ if (c->command_timeout != NULL && (c->flags & REDIS_BLOCK) && c->fd != REDIS_INVALID_FD) {
+ redisContextSetTimeout(c, *c->command_timeout);
+ }
-/* Connect to a Redis instance. On error the field error in the returned
- * context will be set to the return value of the error function.
- * When no set of reply functions is given, the default set will be used. */
-redisContext *redisConnect(const char *ip, int port) {
- redisContext *c;
+ return ret;
+}
- c = redisContextInit();
- if (c == NULL)
+redisContext *redisConnectWithOptions(const redisOptions *options) {
+ redisContext *c = redisContextInit();
+ if (c == NULL) {
return NULL;
+ }
+ if (!(options->options & REDIS_OPT_NONBLOCK)) {
+ c->flags |= REDIS_BLOCK;
+ }
+ if (options->options & REDIS_OPT_REUSEADDR) {
+ c->flags |= REDIS_REUSEADDR;
+ }
+ if (options->options & REDIS_OPT_NOAUTOFREE) {
+ c->flags |= REDIS_NO_AUTO_FREE;
+ }
+ if (options->options & REDIS_OPT_NOAUTOFREEREPLIES) {
+ c->flags |= REDIS_NO_AUTO_FREE_REPLIES;
+ }
+ if (options->options & REDIS_OPT_PREFER_IPV4) {
+ c->flags |= REDIS_PREFER_IPV4;
+ }
+ if (options->options & REDIS_OPT_PREFER_IPV6) {
+ c->flags |= REDIS_PREFER_IPV6;
+ }
- c->flags |= REDIS_BLOCK;
- redisContextConnectTcp(c,ip,port,NULL);
- return c;
-}
+ /* Set any user supplied RESP3 PUSH handler or use freeReplyObject
+ * as a default unless specifically flagged that we don't want one. */
+ if (options->push_cb != NULL)
+ redisSetPushCallback(c, options->push_cb);
+ else if (!(options->options & REDIS_OPT_NO_PUSH_AUTOFREE))
+ redisSetPushCallback(c, redisPushAutoFree);
-redisContext *redisConnectWithTimeout(const char *ip, int port, const struct timeval tv) {
- redisContext *c;
+ c->privdata = options->privdata;
+ c->free_privdata = options->free_privdata;
- c = redisContextInit();
- if (c == NULL)
+ if (redisContextUpdateConnectTimeout(c, options->connect_timeout) != REDIS_OK ||
+ redisContextUpdateCommandTimeout(c, options->command_timeout) != REDIS_OK) {
+ __redisSetError(c, REDIS_ERR_OOM, "Out of memory");
+ return c;
+ }
+
+ if (options->type == REDIS_CONN_TCP) {
+ redisContextConnectBindTcp(c, options->endpoint.tcp.ip,
+ options->endpoint.tcp.port, options->connect_timeout,
+ options->endpoint.tcp.source_addr);
+ } else if (options->type == REDIS_CONN_UNIX) {
+ redisContextConnectUnix(c, options->endpoint.unix_socket,
+ options->connect_timeout);
+ } else if (options->type == REDIS_CONN_USERFD) {
+ c->fd = options->endpoint.fd;
+ c->flags |= REDIS_CONNECTED;
+ } else {
+ redisFree(c);
return NULL;
+ }
+
+ if (c->err == 0 && c->fd != REDIS_INVALID_FD &&
+ options->command_timeout != NULL && (c->flags & REDIS_BLOCK))
+ {
+ redisContextSetTimeout(c, *options->command_timeout);
+ }
- c->flags |= REDIS_BLOCK;
- redisContextConnectTcp(c,ip,port,&tv);
return c;
}
-redisContext *redisConnectNonBlock(const char *ip, int port) {
- redisContext *c;
+/* Connect to a Redis instance. On error the field error in the returned
+ * context will be set to the return value of the error function.
+ * When no set of reply functions is given, the default set will be used. */
+redisContext *redisConnect(const char *ip, int port) {
+ redisOptions options = {0};
+ REDIS_OPTIONS_SET_TCP(&options, ip, port);
+ return redisConnectWithOptions(&options);
+}
- c = redisContextInit();
- if (c == NULL)
- return NULL;
+redisContext *redisConnectWithTimeout(const char *ip, int port, const struct timeval tv) {
+ redisOptions options = {0};
+ REDIS_OPTIONS_SET_TCP(&options, ip, port);
+ options.connect_timeout = &tv;
+ return redisConnectWithOptions(&options);
+}
- c->flags &= ~REDIS_BLOCK;
- redisContextConnectTcp(c,ip,port,NULL);
- return c;
+redisContext *redisConnectNonBlock(const char *ip, int port) {
+ redisOptions options = {0};
+ REDIS_OPTIONS_SET_TCP(&options, ip, port);
+ options.options |= REDIS_OPT_NONBLOCK;
+ return redisConnectWithOptions(&options);
}
redisContext *redisConnectBindNonBlock(const char *ip, int port,
const char *source_addr) {
- redisContext *c = redisContextInit();
- if (c == NULL)
- return NULL;
- c->flags &= ~REDIS_BLOCK;
- redisContextConnectBindTcp(c,ip,port,NULL,source_addr);
- return c;
+ redisOptions options = {0};
+ REDIS_OPTIONS_SET_TCP(&options, ip, port);
+ options.endpoint.tcp.source_addr = source_addr;
+ options.options |= REDIS_OPT_NONBLOCK;
+ return redisConnectWithOptions(&options);
}
redisContext *redisConnectBindNonBlockWithReuse(const char *ip, int port,
const char *source_addr) {
- redisContext *c = redisContextInit();
- if (c == NULL)
- return NULL;
- c->flags &= ~REDIS_BLOCK;
- c->flags |= REDIS_REUSEADDR;
- redisContextConnectBindTcp(c,ip,port,NULL,source_addr);
- return c;
+ redisOptions options = {0};
+ REDIS_OPTIONS_SET_TCP(&options, ip, port);
+ options.endpoint.tcp.source_addr = source_addr;
+ options.options |= REDIS_OPT_NONBLOCK|REDIS_OPT_REUSEADDR;
+ return redisConnectWithOptions(&options);
}
redisContext *redisConnectUnix(const char *path) {
- redisContext *c;
-
- c = redisContextInit();
- if (c == NULL)
- return NULL;
-
- c->flags |= REDIS_BLOCK;
- redisContextConnectUnix(c,path,NULL);
- return c;
+ redisOptions options = {0};
+ REDIS_OPTIONS_SET_UNIX(&options, path);
+ return redisConnectWithOptions(&options);
}
redisContext *redisConnectUnixWithTimeout(const char *path, const struct timeval tv) {
- redisContext *c;
-
- c = redisContextInit();
- if (c == NULL)
- return NULL;
-
- c->flags |= REDIS_BLOCK;
- redisContextConnectUnix(c,path,&tv);
- return c;
+ redisOptions options = {0};
+ REDIS_OPTIONS_SET_UNIX(&options, path);
+ options.connect_timeout = &tv;
+ return redisConnectWithOptions(&options);
}
redisContext *redisConnectUnixNonBlock(const char *path) {
- redisContext *c;
-
- c = redisContextInit();
- if (c == NULL)
- return NULL;
-
- c->flags &= ~REDIS_BLOCK;
- redisContextConnectUnix(c,path,NULL);
- return c;
+ redisOptions options = {0};
+ REDIS_OPTIONS_SET_UNIX(&options, path);
+ options.options |= REDIS_OPT_NONBLOCK;
+ return redisConnectWithOptions(&options);
}
-redisContext *redisConnectFd(int fd) {
- redisContext *c;
-
- c = redisContextInit();
- if (c == NULL)
- return NULL;
-
- c->fd = fd;
- c->flags |= REDIS_BLOCK | REDIS_CONNECTED;
- return c;
+redisContext *redisConnectFd(redisFD fd) {
+ redisOptions options = {0};
+ options.type = REDIS_CONN_USERFD;
+ options.endpoint.fd = fd;
+ return redisConnectWithOptions(&options);
}
/* Set read/write timeout on a blocking socket. */
@@ -783,17 +944,31 @@ int redisSetTimeout(redisContext *c, const struct timeval tv) {
return REDIS_ERR;
}
+int redisEnableKeepAliveWithInterval(redisContext *c, int interval) {
+ return redisKeepAlive(c, interval);
+}
+
/* Enable connection KeepAlive. */
int redisEnableKeepAlive(redisContext *c) {
- if (redisKeepAlive(c, REDIS_KEEPALIVE_INTERVAL) != REDIS_OK)
- return REDIS_ERR;
- return REDIS_OK;
+ return redisKeepAlive(c, REDIS_KEEPALIVE_INTERVAL);
+}
+
+/* Set the socket option TCP_USER_TIMEOUT. */
+int redisSetTcpUserTimeout(redisContext *c, unsigned int timeout) {
+ return redisContextSetTcpUserTimeout(c, timeout);
+}
+
+/* Set a user provided RESP3 PUSH handler and return any old one set. */
+redisPushFn *redisSetPushCallback(redisContext *c, redisPushFn *fn) {
+ redisPushFn *old = c->push_cb;
+ c->push_cb = fn;
+ return old;
}
/* Use this function to handle a read event on the descriptor. It will try
* and read some bytes from the socket and feed them to the reply parser.
*
- * After this function is called, you may use redisContextReadReply to
+ * After this function is called, you may use redisGetReplyFromReader to
* see if there is a reply available. */
int redisBufferRead(redisContext *c) {
char buf[1024*16];
@@ -803,22 +978,13 @@ int redisBufferRead(redisContext *c) {
if (c->err)
return REDIS_ERR;
- nread = read(c->fd,buf,sizeof(buf));
- if (nread == -1) {
- if ((errno == EAGAIN && !(c->flags & REDIS_BLOCK)) || (errno == EINTR)) {
- /* Try again later */
- } else {
- __redisSetError(c,REDIS_ERR_IO,NULL);
- return REDIS_ERR;
- }
- } else if (nread == 0) {
- __redisSetError(c,REDIS_ERR_EOF,"Server closed the connection");
+ nread = c->funcs->read(c, buf, sizeof(buf));
+ if (nread < 0) {
+ return REDIS_ERR;
+ }
+ if (nread > 0 && redisReaderFeed(c->reader, buf, nread) != REDIS_OK) {
+ __redisSetError(c, c->reader->err, c->reader->errstr);
return REDIS_ERR;
- } else {
- if (redisReaderFeed(c->reader,buf,nread) != REDIS_OK) {
- __redisSetError(c,c->reader->err,c->reader->errstr);
- return REDIS_ERR;
- }
}
return REDIS_OK;
}
@@ -829,45 +995,68 @@ int redisBufferRead(redisContext *c) {
* successfully written to the socket. When the buffer is empty after the
* write operation, "done" is set to 1 (if given).
*
- * Returns REDIS_ERR if an error occurred trying to write and sets
- * c->errstr to hold the appropriate error string.
+ * Returns REDIS_ERR if an unrecoverable error occurred in the underlying
+ * c->funcs->write function.
*/
int redisBufferWrite(redisContext *c, int *done) {
- int nwritten;
/* Return early when the context has seen an error. */
if (c->err)
return REDIS_ERR;
if (sdslen(c->obuf) > 0) {
- nwritten = write(c->fd,c->obuf,sdslen(c->obuf));
- if (nwritten == -1) {
- if ((errno == EAGAIN && !(c->flags & REDIS_BLOCK)) || (errno == EINTR)) {
- /* Try again later */
- } else {
- __redisSetError(c,REDIS_ERR_IO,NULL);
- return REDIS_ERR;
- }
+ ssize_t nwritten = c->funcs->write(c);
+ if (nwritten < 0) {
+ return REDIS_ERR;
} else if (nwritten > 0) {
- if (nwritten == (signed)sdslen(c->obuf)) {
+ if (nwritten == (ssize_t)sdslen(c->obuf)) {
sdsfree(c->obuf);
c->obuf = sdsempty();
+ if (c->obuf == NULL)
+ goto oom;
} else {
- sdsrange(c->obuf,nwritten,-1);
+ if (sdsrange(c->obuf,nwritten,-1) < 0) goto oom;
}
}
}
if (done != NULL) *done = (sdslen(c->obuf) == 0);
return REDIS_OK;
+
+oom:
+ __redisSetError(c, REDIS_ERR_OOM, "Out of memory");
+ return REDIS_ERR;
+}
+
+/* Internal helper that returns 1 if the reply was a RESP3 PUSH
+ * message and we handled it with a user-provided callback. */
+static int redisHandledPushReply(redisContext *c, void *reply) {
+ if (reply && c->push_cb && redisIsPushReply(reply)) {
+ c->push_cb(c->privdata, reply);
+ return 1;
+ }
+
+ return 0;
}
-/* Internal helper function to try and get a reply from the reader,
- * or set an error in the context otherwise. */
+/* Get a reply from our reader or set an error in the context. */
int redisGetReplyFromReader(redisContext *c, void **reply) {
- if (redisReaderGetReply(c->reader,reply) == REDIS_ERR) {
+ if (redisReaderGetReply(c->reader, reply) == REDIS_ERR) {
__redisSetError(c,c->reader->err,c->reader->errstr);
return REDIS_ERR;
}
+
+ return REDIS_OK;
+}
+
+/* Internal helper to get the next reply from our reader while handling
+ * any PUSH messages we encounter along the way. This is separate from
+ * redisGetReplyFromReader so as to not change its behavior. */
+static int redisNextInBandReplyFromReader(redisContext *c, void **reply) {
+ do {
+ if (redisGetReplyFromReader(c, reply) == REDIS_ERR)
+ return REDIS_ERR;
+ } while (redisHandledPushReply(c, *reply));
+
return REDIS_OK;
}
@@ -876,7 +1065,7 @@ int redisGetReply(redisContext *c, void **reply) {
void *aux = NULL;
/* Try to read pending replies */
- if (redisGetReplyFromReader(c,&aux) == REDIS_ERR)
+ if (redisNextInBandReplyFromReader(c,&aux) == REDIS_ERR)
return REDIS_ERR;
/* For the blocking context, flush output buffer and read reply */
@@ -891,13 +1080,19 @@ int redisGetReply(redisContext *c, void **reply) {
do {
if (redisBufferRead(c) == REDIS_ERR)
return REDIS_ERR;
- if (redisGetReplyFromReader(c,&aux) == REDIS_ERR)
+
+ if (redisNextInBandReplyFromReader(c,&aux) == REDIS_ERR)
return REDIS_ERR;
} while (aux == NULL);
}
- /* Set reply object */
- if (reply != NULL) *reply = aux;
+ /* Set reply or free it if we were passed NULL */
+ if (reply != NULL) {
+ *reply = aux;
+ } else {
+ freeReplyObject(aux);
+ }
+
return REDIS_OK;
}
@@ -944,11 +1139,11 @@ int redisvAppendCommand(redisContext *c, const char *format, va_list ap) {
}
if (__redisAppendCommand(c,cmd,len) != REDIS_OK) {
- free(cmd);
+ hi_free(cmd);
return REDIS_ERR;
}
- free(cmd);
+ hi_free(cmd);
return REDIS_OK;
}
@@ -964,7 +1159,7 @@ int redisAppendCommand(redisContext *c, const char *format, ...) {
int redisAppendCommandArgv(redisContext *c, int argc, const char **argv, const size_t *argvlen) {
sds cmd;
- int len;
+ long long len;
len = redisFormatSdsCommandArgv(&cmd,argc,argv,argvlen);
if (len == -1) {
@@ -1011,9 +1206,8 @@ void *redisvCommand(redisContext *c, const char *format, va_list ap) {
void *redisCommand(redisContext *c, const char *format, ...) {
va_list ap;
- void *reply = NULL;
va_start(ap,format);
- reply = redisvCommand(c,format,ap);
+ void *reply = redisvCommand(c,format,ap);
va_end(ap);
return reply;
}