diff options
Diffstat (limited to 'contrib/hiredis/net.c')
-rw-r--r-- | contrib/hiredis/net.c | 408 |
1 files changed, 308 insertions, 100 deletions
diff --git a/contrib/hiredis/net.c b/contrib/hiredis/net.c index 97fd42c23..1e016384f 100644 --- a/contrib/hiredis/net.c +++ b/contrib/hiredis/net.c @@ -34,45 +34,78 @@ #include "fmacros.h" #include <sys/types.h> -#include <sys/socket.h> -#include <sys/select.h> -#include <sys/un.h> -#include <netinet/in.h> -#include <netinet/tcp.h> -#include <arpa/inet.h> -#include <unistd.h> #include <fcntl.h> #include <string.h> -#include <netdb.h> #include <errno.h> #include <stdarg.h> #include <stdio.h> -#include <poll.h> #include <limits.h> #include <stdlib.h> #include "net.h" #include "sds.h" +#include "sockcompat.h" +#include "win32.h" /* Defined in hiredis.c */ void __redisSetError(redisContext *c, int type, const char *str); -static void redisContextCloseFd(redisContext *c) { - if (c && c->fd >= 0) { +int redisContextUpdateCommandTimeout(redisContext *c, const struct timeval *timeout); + +void redisNetClose(redisContext *c) { + if (c && c->fd != REDIS_INVALID_FD) { close(c->fd); - c->fd = -1; + c->fd = REDIS_INVALID_FD; + } +} + +ssize_t redisNetRead(redisContext *c, char *buf, size_t bufcap) { + ssize_t nread = recv(c->fd, buf, bufcap, 0); + if (nread == -1) { + if ((errno == EWOULDBLOCK && !(c->flags & REDIS_BLOCK)) || (errno == EINTR)) { + /* Try again later */ + return 0; + } else if(errno == ETIMEDOUT && (c->flags & REDIS_BLOCK)) { + /* especially in windows */ + __redisSetError(c, REDIS_ERR_TIMEOUT, "recv timeout"); + return -1; + } else { + __redisSetError(c, REDIS_ERR_IO, strerror(errno)); + return -1; + } + } else if (nread == 0) { + __redisSetError(c, REDIS_ERR_EOF, "Server closed the connection"); + return -1; + } else { + return nread; } } +ssize_t redisNetWrite(redisContext *c) { + ssize_t nwritten; + + nwritten = send(c->fd, c->obuf, sdslen(c->obuf), 0); + if (nwritten < 0) { + if ((errno == EWOULDBLOCK && !(c->flags & REDIS_BLOCK)) || (errno == EINTR)) { + /* Try again */ + return 0; + } else { + __redisSetError(c, REDIS_ERR_IO, strerror(errno)); + return -1; + } + } + + return nwritten; +} + static void __redisSetErrorFromErrno(redisContext *c, int type, const char *prefix) { + int errorno = errno; /* snprintf() may change errno */ char buf[128] = { 0 }; - char *p; size_t len = 0; if (prefix != NULL) len = snprintf(buf,sizeof(buf),"%s: ",prefix); - p = buf + len; - __redis_strerror_r(errno, p, sizeof(buf) - len); + strerror_r(errorno, (char *)(buf + len), sizeof(buf) - len); __redisSetError(c,type,buf); } @@ -80,15 +113,15 @@ static int redisSetReuseAddr(redisContext *c) { int on = 1; if (setsockopt(c->fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) { __redisSetErrorFromErrno(c,REDIS_ERR_IO,NULL); - redisContextCloseFd(c); + redisNetClose(c); return REDIS_ERR; } return REDIS_OK; } static int redisCreateSocket(redisContext *c, int type) { - int s; - if ((s = socket(type, SOCK_STREAM, 0)) == -1) { + redisFD s; + if ((s = socket(type, SOCK_STREAM, 0)) == REDIS_INVALID_FD) { __redisSetErrorFromErrno(c,REDIS_ERR_IO,NULL); return REDIS_ERR; } @@ -102,6 +135,7 @@ static int redisCreateSocket(redisContext *c, int type) { } static int redisSetBlocking(redisContext *c, int blocking) { +#ifndef _WIN32 int flags; /* Set the socket nonblocking. @@ -109,7 +143,7 @@ static int redisSetBlocking(redisContext *c, int blocking) { * interrupted by a signal. */ if ((flags = fcntl(c->fd, F_GETFL)) == -1) { __redisSetErrorFromErrno(c,REDIS_ERR_IO,"fcntl(F_GETFL)"); - redisContextCloseFd(c); + redisNetClose(c); return REDIS_ERR; } @@ -120,16 +154,25 @@ static int redisSetBlocking(redisContext *c, int blocking) { if (fcntl(c->fd, F_SETFL, flags) == -1) { __redisSetErrorFromErrno(c,REDIS_ERR_IO,"fcntl(F_SETFL)"); - redisContextCloseFd(c); + redisNetClose(c); return REDIS_ERR; } +#else + u_long mode = blocking ? 0 : 1; + if (ioctl(c->fd, FIONBIO, &mode) == -1) { + __redisSetErrorFromErrno(c, REDIS_ERR_IO, "ioctl(FIONBIO)"); + redisNetClose(c); + return REDIS_ERR; + } +#endif /* _WIN32 */ return REDIS_OK; } int redisKeepAlive(redisContext *c, int interval) { int val = 1; - int fd = c->fd; + redisFD fd = c->fd; +#ifndef _WIN32 if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)) == -1){ __redisSetError(c,REDIS_ERR_OTHER,strerror(errno)); return REDIS_ERR; @@ -137,14 +180,13 @@ int redisKeepAlive(redisContext *c, int interval) { val = interval; -#ifdef _OSX +#if defined(__APPLE__) && defined(__MACH__) if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPALIVE, &val, sizeof(val)) < 0) { __redisSetError(c,REDIS_ERR_OTHER,strerror(errno)); return REDIS_ERR; } #else #if defined(__GLIBC__) && !defined(__FreeBSD_kernel__) - val = interval; if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &val, sizeof(val)) < 0) { __redisSetError(c,REDIS_ERR_OTHER,strerror(errno)); return REDIS_ERR; @@ -164,35 +206,57 @@ int redisKeepAlive(redisContext *c, int interval) { } #endif #endif +#else + int res; + res = win32_redisKeepAlive(fd, interval * 1000); + if (res != 0) { + __redisSetError(c, REDIS_ERR_OTHER, strerror(res)); + return REDIS_ERR; + } +#endif return REDIS_OK; } -static int redisSetTcpNoDelay(redisContext *c) { +int redisSetTcpNoDelay(redisContext *c) { int yes = 1; if (setsockopt(c->fd, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)) == -1) { __redisSetErrorFromErrno(c,REDIS_ERR_IO,"setsockopt(TCP_NODELAY)"); - redisContextCloseFd(c); + redisNetClose(c); return REDIS_ERR; } return REDIS_OK; } -#define __MAX_MSEC (((LONG_MAX) - 999) / 1000) +int redisContextSetTcpUserTimeout(redisContext *c, unsigned int timeout) { + int res; +#ifdef TCP_USER_TIMEOUT + res = setsockopt(c->fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &timeout, sizeof(timeout)); +#else + res = -1; + errno = ENOTSUP; + (void)timeout; +#endif + if (res == -1) { + __redisSetErrorFromErrno(c,REDIS_ERR_IO,"setsockopt(TCP_USER_TIMEOUT)"); + redisNetClose(c); + return REDIS_ERR; + } + return REDIS_OK; +} -static int redisContextWaitReady(redisContext *c, const struct timeval *timeout) { - struct pollfd wfd[1]; - long msec; +#define __MAX_MSEC (((LONG_MAX) - 999) / 1000) - msec = -1; - wfd[0].fd = c->fd; - wfd[0].events = POLLOUT; +static int redisContextTimeoutMsec(redisContext *c, long *result) +{ + const struct timeval *timeout = c->connect_timeout; + long msec = -1; /* Only use timeout when not NULL. */ if (timeout != NULL) { if (timeout->tv_usec > 1000000 || timeout->tv_sec > __MAX_MSEC) { - __redisSetErrorFromErrno(c, REDIS_ERR_IO, NULL); - redisContextCloseFd(c); + __redisSetError(c, REDIS_ERR_IO, "Invalid timeout specified"); + *result = msec; return REDIS_ERR; } @@ -203,33 +267,81 @@ static int redisContextWaitReady(redisContext *c, const struct timeval *timeout) } } + *result = msec; + return REDIS_OK; +} + +static int redisContextWaitReady(redisContext *c, long msec) { + struct pollfd wfd[1]; + + wfd[0].fd = c->fd; + wfd[0].events = POLLOUT; + if (errno == EINPROGRESS) { int res; if ((res = poll(wfd, 1, msec)) == -1) { __redisSetErrorFromErrno(c, REDIS_ERR_IO, "poll(2)"); - redisContextCloseFd(c); + redisNetClose(c); return REDIS_ERR; } else if (res == 0) { errno = ETIMEDOUT; __redisSetErrorFromErrno(c,REDIS_ERR_IO,NULL); - redisContextCloseFd(c); + redisNetClose(c); return REDIS_ERR; } - if (redisCheckSocketError(c) != REDIS_OK) + if (redisCheckConnectDone(c, &res) != REDIS_OK || res == 0) { + redisCheckSocketError(c); return REDIS_ERR; + } return REDIS_OK; } __redisSetErrorFromErrno(c,REDIS_ERR_IO,NULL); - redisContextCloseFd(c); + redisNetClose(c); return REDIS_ERR; } +int redisCheckConnectDone(redisContext *c, int *completed) { + int rc = connect(c->fd, (const struct sockaddr *)c->saddr, c->addrlen); + if (rc == 0) { + *completed = 1; + return REDIS_OK; + } + int error = errno; + if (error == EINPROGRESS) { + /* must check error to see if connect failed. Get the socket error */ + int fail, so_error; + socklen_t optlen = sizeof(so_error); + fail = getsockopt(c->fd, SOL_SOCKET, SO_ERROR, &so_error, &optlen); + if (fail == 0) { + if (so_error == 0) { + /* Socket is connected! */ + *completed = 1; + return REDIS_OK; + } + /* connection error; */ + errno = so_error; + error = so_error; + } + } + switch (error) { + case EISCONN: + *completed = 1; + return REDIS_OK; + case EALREADY: + case EWOULDBLOCK: + *completed = 0; + return REDIS_OK; + default: + return REDIS_ERR; + } +} + int redisCheckSocketError(redisContext *c) { - int err = 0; + int err = 0, errno_saved = errno; socklen_t errlen = sizeof(err); if (getsockopt(c->fd, SOL_SOCKET, SO_ERROR, &err, &errlen) == -1) { @@ -237,6 +349,10 @@ int redisCheckSocketError(redisContext *c) { return REDIS_ERR; } + if (err == 0) { + err = errno_saved; + } + if (err) { errno = err; __redisSetErrorFromErrno(c,REDIS_ERR_IO,NULL); @@ -247,27 +363,69 @@ int redisCheckSocketError(redisContext *c) { } int redisContextSetTimeout(redisContext *c, const struct timeval tv) { - if (setsockopt(c->fd,SOL_SOCKET,SO_RCVTIMEO,&tv,sizeof(tv)) == -1) { + const void *to_ptr = &tv; + size_t to_sz = sizeof(tv); + + if (redisContextUpdateCommandTimeout(c, &tv) != REDIS_OK) { + __redisSetError(c, REDIS_ERR_OOM, "Out of memory"); + return REDIS_ERR; + } + if (setsockopt(c->fd,SOL_SOCKET,SO_RCVTIMEO,to_ptr,to_sz) == -1) { __redisSetErrorFromErrno(c,REDIS_ERR_IO,"setsockopt(SO_RCVTIMEO)"); return REDIS_ERR; } - if (setsockopt(c->fd,SOL_SOCKET,SO_SNDTIMEO,&tv,sizeof(tv)) == -1) { + if (setsockopt(c->fd,SOL_SOCKET,SO_SNDTIMEO,to_ptr,to_sz) == -1) { __redisSetErrorFromErrno(c,REDIS_ERR_IO,"setsockopt(SO_SNDTIMEO)"); return REDIS_ERR; } return REDIS_OK; } +int redisContextUpdateConnectTimeout(redisContext *c, const struct timeval *timeout) { + /* Same timeval struct, short circuit */ + if (c->connect_timeout == timeout) + return REDIS_OK; + + /* Allocate context timeval if we need to */ + if (c->connect_timeout == NULL) { + c->connect_timeout = hi_malloc(sizeof(*c->connect_timeout)); + if (c->connect_timeout == NULL) + return REDIS_ERR; + } + + memcpy(c->connect_timeout, timeout, sizeof(*c->connect_timeout)); + return REDIS_OK; +} + +int redisContextUpdateCommandTimeout(redisContext *c, const struct timeval *timeout) { + /* Same timeval struct, short circuit */ + if (c->command_timeout == timeout) + return REDIS_OK; + + /* Allocate context timeval if we need to */ + if (c->command_timeout == NULL) { + c->command_timeout = hi_malloc(sizeof(*c->command_timeout)); + if (c->command_timeout == NULL) + return REDIS_ERR; + } + + memcpy(c->command_timeout, timeout, sizeof(*c->command_timeout)); + return REDIS_OK; +} + static int _redisContextConnectTcp(redisContext *c, const char *addr, int port, const struct timeval *timeout, const char *source_addr) { - int s, rv, n; + redisFD s; + int rv, n; char _port[6]; /* strlen("65535"); */ struct addrinfo hints, *servinfo, *bservinfo, *p, *b; int blocking = (c->flags & REDIS_BLOCK); int reuseaddr = (c->flags & REDIS_REUSEADDR); int reuses = 0; + long timeout_msec = -1; + servinfo = NULL; c->connection_type = REDIS_CONN_TCP; c->tcp.port = port; @@ -279,57 +437,61 @@ static int _redisContextConnectTcp(redisContext *c, const char *addr, int port, * This is a bit ugly, but atleast it works and doesn't leak memory. **/ if (c->tcp.host != addr) { - if (c->tcp.host) - free(c->tcp.host); + hi_free(c->tcp.host); - c->tcp.host = strdup(addr); + c->tcp.host = hi_strdup(addr); + if (c->tcp.host == NULL) + goto oom; } if (timeout) { - if (c->timeout != timeout) { - if (c->timeout == NULL) - c->timeout = malloc(sizeof(struct timeval)); - - memcpy(c->timeout, timeout, sizeof(struct timeval)); - } + if (redisContextUpdateConnectTimeout(c, timeout) == REDIS_ERR) + goto oom; } else { - if (c->timeout) - free(c->timeout); - c->timeout = NULL; + hi_free(c->connect_timeout); + c->connect_timeout = NULL; + } + + if (redisContextTimeoutMsec(c, &timeout_msec) != REDIS_OK) { + goto error; } if (source_addr == NULL) { - free(c->tcp.source_addr); + hi_free(c->tcp.source_addr); c->tcp.source_addr = NULL; } else if (c->tcp.source_addr != source_addr) { - free(c->tcp.source_addr); - c->tcp.source_addr = strdup(source_addr); + hi_free(c->tcp.source_addr); + c->tcp.source_addr = hi_strdup(source_addr); } snprintf(_port, 6, "%d", port); memset(&hints,0,sizeof(hints)); hints.ai_family = AF_INET; hints.ai_socktype = SOCK_STREAM; - if (!blocking) { - /* Rspamd specific: never try to resolve on non-blocking conn requests */ - hints.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV; - } - - /* Try with IPv6 if no IPv4 address was found. We do it in this order since - * in a Redis client you can't afford to test if you have IPv6 connectivity - * as this would add latency to every connect. Otherwise a more sensible - * route could be: Use IPv6 if both addresses are available and there is IPv6 - * connectivity. */ - if ((rv = getaddrinfo(c->tcp.host,_port,&hints,&servinfo)) != 0) { - hints.ai_family = AF_INET6; - if ((rv = getaddrinfo(addr,_port,&hints,&servinfo)) != 0) { - __redisSetError(c,REDIS_ERR_OTHER,gai_strerror(rv)); - return REDIS_ERR; - } + + /* DNS lookup. To use dual stack, set both flags to prefer both IPv4 and + * IPv6. By default, for historical reasons, we try IPv4 first and then we + * try IPv6 only if no IPv4 address was found. */ + if (c->flags & REDIS_PREFER_IPV6 && c->flags & REDIS_PREFER_IPV4) + hints.ai_family = AF_UNSPEC; + else if (c->flags & REDIS_PREFER_IPV6) + hints.ai_family = AF_INET6; + else + hints.ai_family = AF_INET; + + rv = getaddrinfo(c->tcp.host, _port, &hints, &servinfo); + if (rv != 0 && hints.ai_family != AF_UNSPEC) { + /* Try again with the other IP version. */ + hints.ai_family = (hints.ai_family == AF_INET) ? AF_INET6 : AF_INET; + rv = getaddrinfo(c->tcp.host, _port, &hints, &servinfo); + } + if (rv != 0) { + __redisSetError(c, REDIS_ERR_OTHER, gai_strerror(rv)); + return REDIS_ERR; } for (p = servinfo; p != NULL; p = p->ai_next) { addrretry: - if ((s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)) == -1) + if ((s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)) == REDIS_INVALID_FD) continue; c->fd = s; @@ -349,6 +511,7 @@ addrretry: n = 1; if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (char*) &n, sizeof(n)) < 0) { + freeaddrinfo(bservinfo); goto error; } } @@ -367,32 +530,50 @@ addrretry: goto error; } } - if (redisSetTcpNoDelay(c) != REDIS_OK) - goto error; + + /* For repeat connection */ + hi_free(c->saddr); + c->saddr = hi_malloc(p->ai_addrlen); + if (c->saddr == NULL) + goto oom; + + memcpy(c->saddr, p->ai_addr, p->ai_addrlen); + c->addrlen = p->ai_addrlen; + if (connect(s,p->ai_addr,p->ai_addrlen) == -1) { if (errno == EHOSTUNREACH) { - redisContextCloseFd(c); + redisNetClose(c); continue; - } else if (errno == EINPROGRESS && !blocking) { - /* This is ok. */ + } else if (errno == EINPROGRESS) { + if (blocking) { + goto wait_for_ready; + } + /* This is ok. + * Note that even when it's in blocking mode, we unset blocking + * for `connect()` + */ } else if (errno == EADDRNOTAVAIL && reuseaddr) { if (++reuses >= REDIS_CONNECT_RETRIES) { goto error; } else { + redisNetClose(c); goto addrretry; } } else { - if (redisContextWaitReady(c,c->timeout) != REDIS_OK) + wait_for_ready: + if (redisContextWaitReady(c,timeout_msec) != REDIS_OK) + goto error; + if (redisSetTcpNoDelay(c) != REDIS_OK) goto error; } } + if (blocking && redisSetBlocking(c,1) != REDIS_OK) + goto error; c->flags |= REDIS_CONNECTED; rv = REDIS_OK; goto end; } - if (blocking && redisSetBlocking(c,1) != REDIS_OK) - goto error; if (p == NULL) { char buf[128]; snprintf(buf,sizeof(buf),"Can't create socket: %s",strerror(errno)); @@ -400,10 +581,15 @@ addrretry: goto error; } +oom: + __redisSetError(c, REDIS_ERR_OOM, "Out of memory"); error: rv = REDIS_ERR; end: - freeaddrinfo(servinfo); + if(servinfo) { + freeaddrinfo(servinfo); + } + return rv; // Need to return REDIS_OK if alright } @@ -419,38 +605,51 @@ int redisContextConnectBindTcp(redisContext *c, const char *addr, int port, } int redisContextConnectUnix(redisContext *c, const char *path, const struct timeval *timeout) { +#ifndef _WIN32 int blocking = (c->flags & REDIS_BLOCK); - struct sockaddr_un sa; + struct sockaddr_un *sa; + long timeout_msec = -1; - if (redisCreateSocket(c,AF_LOCAL) < 0) + if (redisCreateSocket(c,AF_UNIX) < 0) return REDIS_ERR; if (redisSetBlocking(c,0) != REDIS_OK) return REDIS_ERR; c->connection_type = REDIS_CONN_UNIX; - if (c->unix_sock.path != path) - c->unix_sock.path = strdup(path); + if (c->unix_sock.path != path) { + hi_free(c->unix_sock.path); - if (timeout) { - if (c->timeout != timeout) { - if (c->timeout == NULL) - c->timeout = malloc(sizeof(struct timeval)); + c->unix_sock.path = hi_strdup(path); + if (c->unix_sock.path == NULL) + goto oom; + } - memcpy(c->timeout, timeout, sizeof(struct timeval)); - } + if (timeout) { + if (redisContextUpdateConnectTimeout(c, timeout) == REDIS_ERR) + goto oom; } else { - if (c->timeout) - free(c->timeout); - c->timeout = NULL; + hi_free(c->connect_timeout); + c->connect_timeout = NULL; } - sa.sun_family = AF_LOCAL; - strncpy(sa.sun_path,path,sizeof(sa.sun_path)-1); - if (connect(c->fd, (struct sockaddr*)&sa, sizeof(sa)) == -1) { + if (redisContextTimeoutMsec(c,&timeout_msec) != REDIS_OK) + return REDIS_ERR; + + /* Don't leak sockaddr if we're reconnecting */ + if (c->saddr) hi_free(c->saddr); + + sa = (struct sockaddr_un*)(c->saddr = hi_malloc(sizeof(struct sockaddr_un))); + if (sa == NULL) + goto oom; + + c->addrlen = sizeof(struct sockaddr_un); + sa->sun_family = AF_UNIX; + strncpy(sa->sun_path, path, sizeof(sa->sun_path) - 1); + if (connect(c->fd, (struct sockaddr*)sa, sizeof(*sa)) == -1) { if (errno == EINPROGRESS && !blocking) { /* This is ok. */ } else { - if (redisContextWaitReady(c,c->timeout) != REDIS_OK) + if (redisContextWaitReady(c,timeout_msec) != REDIS_OK) return REDIS_ERR; } } @@ -461,4 +660,13 @@ int redisContextConnectUnix(redisContext *c, const char *path, const struct time c->flags |= REDIS_CONNECTED; return REDIS_OK; +#else + /* We currently do not support Unix sockets for Windows. */ + /* TODO(m): https://devblogs.microsoft.com/commandline/af_unix-comes-to-windows/ */ + errno = EPROTONOSUPPORT; + return REDIS_ERR; +#endif /* _WIN32 */ +oom: + __redisSetError(c, REDIS_ERR_OOM, "Out of memory"); + return REDIS_ERR; } |