msg_debug_rpool ("active connection removed");
if (conn->ctx) {
- g_hash_table_remove (conn->elt->pool->elts_by_ctx, conn->ctx);
- redisAsyncFree (conn->ctx);
+ if (!(conn->ctx->c.flags & REDIS_FREEING)) {
+ redisAsyncContext *ac = conn->ctx;
+
+ conn->ctx = NULL;
+ g_hash_table_remove (conn->elt->pool->elts_by_ctx, conn->ctx);
+ ac->onDisconnect = NULL;
+ redisAsyncFree (ac);
+ }
}
- g_queue_unlink (conn->elt->active, conn->entry);
+ if (conn->entry) {
+ g_queue_unlink (conn->elt->active, conn->entry);
+ }
}
else {
msg_debug_rpool ("inactive connection removed");
redisAsyncFree (ac);
}
- g_queue_unlink (conn->elt->inactive, conn->entry);
+ if (conn->entry) {
+ g_queue_unlink (conn->elt->inactive, conn->entry);
+ }
}
- g_list_free (conn->entry);
+ if (conn->entry) {
+ g_list_free (conn->entry);
+ }
+
g_slice_free1 (sizeof (*conn), conn);
}
for (cur = elt->active->head; cur != NULL; cur = g_list_next (cur)) {
c = cur->data;
+ c->entry = NULL;
REF_RELEASE (c);
}
for (cur = elt->inactive->head; cur != NULL; cur = g_list_next (cur)) {
c = cur->data;
+ c->entry = NULL;
REF_RELEASE (c);
}
struct rspamd_redis_pool_connection *conn;
struct redisAsyncContext *ctx;
- ctx = redisAsyncConnect (ip, port);
+ if (*ip == '/' || *ip == '.') {
+ ctx = redisAsyncConnectUnix (ip);
+ }
+ else {
+ ctx = redisAsyncConnect (ip, port);
+ }
if (ctx) {
conn_entry = g_queue_pop_head_link (elt->inactive);
conn = conn_entry->data;
- if (event_get_base (&conn->timeout)) {
- event_del (&conn->timeout);
- }
-
if (conn->ctx->err == REDIS_OK) {
+ if (event_get_base (&conn->timeout)) {
+ event_del (&conn->timeout);
+ }
+
conn->active = TRUE;
g_queue_push_tail_link (elt->active, conn_entry);
msg_debug_rpool ("reused existing connection to %s:%d", ip, port);
}
else {
+ g_list_free (conn->entry);
+ conn->entry = NULL;
REF_RELEASE (conn);
conn = rspamd_redis_pool_new_connection (pool, elt,
db, password, ip, port);
REF_RELEASE (conn);
}
else {
- /* Just move it to the inactive queue */
- g_queue_unlink (conn->elt->active, conn->entry);
- g_queue_push_head_link (conn->elt->inactive, conn->entry);
- conn->active = FALSE;
- rspamd_redis_pool_schedule_timeout (conn);
- msg_debug_rpool ("mark connection inactive");
+ /* Ensure that there are no callbacks attached to this conn */
+ if (ctx->replies.head == NULL) {
+ /* Just move it to the inactive queue */
+ g_queue_unlink (conn->elt->active, conn->entry);
+ g_queue_push_head_link (conn->elt->inactive, conn->entry);
+ conn->active = FALSE;
+ rspamd_redis_pool_schedule_timeout (conn);
+ msg_debug_rpool ("mark connection inactive");
+ }
+ else {
+ msg_debug_rpool ("closed connection due to callbacks leftover");
+ REF_RELEASE (conn);
+ }
}
REF_RELEASE (conn);