You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

libkvstorageclient.c 31KB


  1. /* Copyright (c) 2010, Vsevolod Stakhov
  2. * All rights reserved.
  3. *
  4. * Redistribution and use in source and binary forms, with or without
  5. * modification, are permitted provided that the following conditions are met:
  6. * * Redistributions of source code must retain the above copyright
  7. * notice, this list of conditions and the following disclaimer.
  8. * * Redistributions in binary form must reproduce the above copyright
  9. * notice, this list of conditions and the following disclaimer in the
  10. * documentation and/or other materials provided with the distribution.
  11. *
  12. * THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY
  13. * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  14. * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  15. * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY
  16. * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  17. * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  18. * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
  19. * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  20. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  21. * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  22. */
  23. #include "config.h"
  24. #include "mem_pool.h"
  25. #include "util.h"
  26. #include "libkvstorageclient.h"
  27. #define MAX_KV_LINE 1024
  28. #ifdef CRLF
  29. #undef CRLF
  30. #undef CR
  31. #undef LF
  32. #endif
  33. #define CRLF "\r\n"
  34. #define CR '\r'
  35. #define LF '\n'
  36. struct kvstorage_buf {
  37. guint pos;
  38. guint len;
  39. guint8 data[1];
  40. };
  41. struct rspamd_kvstorage_connection {
  42. gboolean asynced;
  43. gint sock;
  44. struct timeval tv;
  45. enum {
  46. KV_STATE_NONE = 0,
  47. KV_STATE_CONNECTED,
  48. KV_STATE_SET,
  49. KV_STATE_GET,
  50. KV_STATE_WRITE_DATA,
  51. KV_STATE_READ_DATA,
  52. KV_STATE_READ_ELT,
  53. KV_STATE_READ_REPLY
  54. } state;
  55. struct event ev;
  56. kvstorage_connect_cb conn_cb;
  57. kvstorage_read_cb read_cb;
  58. kvstorage_write_cb write_cb;
  59. memory_pool_t *pool;
  60. };
  61. struct rspamd_kvstorage_async_data {
  62. struct rspamd_kvstorage_connection *c;
  63. struct kvstorage_buf *buf;
  64. gchar *key;
  65. guint keylen;
  66. gpointer data;
  67. guint datalen;
  68. guint expire;
  69. gpointer ud;
  70. };
  71. /*
  72. * Buffer functions
  73. */
  74. /*
  75. * Create new kvstorage_buf
  76. */
  77. static struct kvstorage_buf *
  78. rspamd_kvstorage_buf_create (guint size, memory_pool_t *pool)
  79. {
  80. struct kvstorage_buf *new;
  81. new = memory_pool_alloc (pool, sizeof (struct kvstorage_buf) + size);
  82. new->len = size;
  83. new->pos = 0;
  84. return new;
  85. }
  86. /*
  87. * Read a single line synced or asynced
  88. */
  89. static gint
  90. rspamd_kvstorage_buf_readline (struct kvstorage_buf *buf, struct rspamd_kvstorage_connection *conn)
  91. {
  92. gint r;
  93. guint8 *p;
  94. r = read (conn->sock, buf->data, buf->len);
  95. if (r == -1) {
  96. return errno;
  97. }
  98. /* Try to parse what we have */
  99. p = buf->data;
  100. while (p - buf->data < r) {
  101. if (*p == '\r' || *p == '\n') {
  102. buf->pos = p - buf->data;
  103. return 0;
  104. }
  105. p ++;
  106. }
  107. if (r == (gint)buf->len) {
  108. /* Buffer is overflowed */
  109. return EOVERFLOW;
  110. }
  111. /* Line end not found */
  112. return EAGAIN;
  113. }
  114. /*
  115. * Read the whole buffer, return remaining characters or -1
  116. */
  117. static gint
  118. rspamd_kvstorage_buf_readall (struct kvstorage_buf *buf, struct rspamd_kvstorage_connection *conn)
  119. {
  120. gint r;
  121. if (buf->len - buf->pos == 0) {
  122. return 0;
  123. }
  124. r = read (conn->sock, buf->data + buf->pos, buf->len - buf->pos);
  125. if (r == -1) {
  126. return -1;
  127. }
  128. buf->pos += r;
  129. /* Line end not found */
  130. return buf->len - buf->pos;
  131. }
  132. /*
  133. * Write the whole buffer, return remaining characters or -1
  134. */
  135. static gint
  136. rspamd_kvstorage_buf_writeall (struct kvstorage_buf *buf, struct rspamd_kvstorage_connection *conn)
  137. {
  138. gint r;
  139. if (buf->len - buf->pos == 0) {
  140. return 0;
  141. }
  142. r = write (conn->sock, buf->data + buf->pos, buf->len - buf->pos);
  143. if (r == -1) {
  144. return -1;
  145. }
  146. buf->pos += r;
  147. /* Line end not found */
  148. return buf->len - buf->pos;
  149. }
  150. /*
  151. * Drain line from the begin of buffer, moving it from the beginning of buf
  152. */
  153. static void
  154. rspamd_kvstorage_buf_drainline (struct kvstorage_buf *buf)
  155. {
  156. guint8 *p;
  157. p = buf->data + buf->pos;
  158. /* Skip \r and \n characters */
  159. while (p - buf->data < (gint)buf->len && (*p == '\r' || *p == '\n')) {
  160. p ++;
  161. }
  162. if (p - buf->data == (gint)buf->len) {
  163. /* Do not move anything */
  164. buf->pos = 0;
  165. return;
  166. }
  167. memcpy (buf->data, p, buf->len - (p - buf->data));
  168. buf->pos = buf->len - (p - buf->data);
  169. }
  170. /* Common utility functions */
  171. /*
  172. * Parse reply line that contains an error
  173. */
  174. static enum rspamd_kvstorage_error
  175. rspamd_kvstorage_parse_reply_error (struct kvstorage_buf *buf)
  176. {
  177. guint8 *p;
  178. guint l = 0;
  179. /* Get one word */
  180. p = buf->data;
  181. while (p - buf->data < (gint)buf->pos) {
  182. if (g_ascii_isspace (*p)) {
  183. while (p - buf->data < (gint)buf->pos && g_ascii_isspace (*p)) {
  184. p ++;
  185. }
  186. break;
  187. }
  188. p ++;
  189. l ++;
  190. }
  191. /* Get common errors */
  192. if (g_ascii_strncasecmp (buf->data, "ERROR", MIN (l, sizeof("ERORR") - 1)) == 0) {
  193. return KVSTORAGE_ERROR_INTERNAL_ERROR;
  194. }
  195. else if (g_ascii_strncasecmp (buf->data, "SERVER_ERROR", MIN (l, sizeof("SERVER_ERORR") - 1)) == 0) {
  196. return KVSTORAGE_ERROR_SERVER_ERROR;
  197. }
  198. else if (g_ascii_strncasecmp (buf->data, "CLIENT_ERROR", MIN (l, sizeof("CLIENT_ERORR") - 1)) == 0) {
  199. return KVSTORAGE_ERROR_CLIENT_ERROR;
  200. }
  201. else if (g_ascii_strncasecmp (buf->data, "NOT_STORED", MIN (l, sizeof("NOT_STORED") - 1)) == 0) {
  202. return KVSTORAGE_ERROR_NOT_STORED;
  203. }
  204. else if (g_ascii_strncasecmp (buf->data, "NOT_FOUND", MIN (l, sizeof("NOT_FOUND") - 1)) == 0) {
  205. return KVSTORAGE_ERROR_NOT_FOUND;
  206. }
  207. else if (g_ascii_strncasecmp (buf->data, "EXISTS", MIN (l, sizeof("EXISTS") - 1)) == 0) {
  208. return KVSTORAGE_ERROR_EXISTS;
  209. }
  210. else if (g_ascii_strncasecmp (buf->data, "STORED", MIN (l, sizeof("STORED") - 1)) == 0) {
  211. return KVSTORAGE_ERROR_OK;
  212. }
  213. else if (g_ascii_strncasecmp (buf->data, "DELETED", MIN (l, sizeof("DELETED") - 1)) == 0) {
  214. return KVSTORAGE_ERROR_OK;
  215. }
  216. return KVSTORAGE_ERROR_INTERNAL_ERROR;
  217. }
  218. /*
  219. * Parse reply line, store element length
  220. */
  221. static enum rspamd_kvstorage_error
  222. rspamd_kvstorage_parse_get_line (struct kvstorage_buf *buf, guint *len, guint *flags)
  223. {
  224. guint8 *p, *c, *end;
  225. gboolean error = TRUE;
  226. gchar *err_str;
  227. p = buf->data;
  228. end = buf->data + buf->pos;
  229. while (p < end) {
  230. if (g_ascii_isspace (*p)) {
  231. error = FALSE;
  232. while (p - buf->data < (gint)buf->pos && g_ascii_isspace (*p)) {
  233. p ++;
  234. }
  235. break;
  236. }
  237. p ++;
  238. }
  239. /* Here we got a word or error flag */
  240. if (error) {
  241. /* Something wrong here */
  242. return KVSTORAGE_ERROR_SERVER_ERROR;
  243. }
  244. if (g_ascii_strncasecmp (buf->data, "VALUE", sizeof ("VALUE") - 1) != 0) {
  245. return rspamd_kvstorage_parse_reply_error (buf);
  246. }
  247. /* Here we got key, flags and size items */
  248. /* Skip key */
  249. error = TRUE;
  250. while (p < end) {
  251. if (g_ascii_isspace (*p)) {
  252. error = FALSE;
  253. /* Skip spaces after key */
  254. while (p < end && g_ascii_isspace (*p)) {
  255. p ++;
  256. }
  257. break;
  258. }
  259. p ++;
  260. }
  261. if (error) {
  262. /* Something wrong here */
  263. return KVSTORAGE_ERROR_SERVER_ERROR;
  264. }
  265. /* Read flags */
  266. c = p;
  267. error = TRUE;
  268. while (p < end) {
  269. if (g_ascii_isspace (*p)) {
  270. error = FALSE;
  271. /* Skip spaces after flags */
  272. while (p - buf->data < (gint)buf->pos && g_ascii_isspace (*p)) {
  273. p ++;
  274. }
  275. break;
  276. }
  277. else if (!g_ascii_isdigit (*p)) {
  278. break;
  279. }
  280. p ++;
  281. }
  282. if (error) {
  283. /* Something wrong here */
  284. return KVSTORAGE_ERROR_SERVER_ERROR;
  285. }
  286. *flags = strtoul (c, &err_str, 10);
  287. if (!g_ascii_isspace (*err_str)) {
  288. return KVSTORAGE_ERROR_SERVER_ERROR;
  289. }
  290. /* Read len */
  291. c = p;
  292. while (p < end) {
  293. if (!g_ascii_isdigit (*p)) {
  294. break;
  295. }
  296. p ++;
  297. }
  298. if (error) {
  299. /* Something wrong here */
  300. return KVSTORAGE_ERROR_SERVER_ERROR;
  301. }
  302. *len = strtoul (c, &err_str, 10);
  303. if (!g_ascii_isspace (*err_str)) {
  304. return KVSTORAGE_ERROR_SERVER_ERROR;
  305. }
  306. return KVSTORAGE_ERROR_OK;
  307. }
  308. /* Callbacks for async API */
  309. static void
  310. rspamd_kvstorage_connect_cb (int fd, short what, gpointer ud)
  311. {
  312. struct rspamd_kvstorage_async_data *d = ud;
  313. kvstorage_connect_cb cb;
  314. cb = (kvstorage_connect_cb)d->c->conn_cb;
  315. if (what == EV_TIMEOUT) {
  316. cb (KVSTORAGE_ERROR_TIMEOUT, d->c, d->ud);
  317. }
  318. else {
  319. d->c->state = KV_STATE_CONNECTED;
  320. cb (KVSTORAGE_ERROR_OK, d->c, d->ud);
  321. }
  322. }
  323. static void
  324. rspamd_kvstorage_read_cb (int fd, short what, gpointer ud)
  325. {
  326. struct rspamd_kvstorage_async_data *d = ud;
  327. kvstorage_read_cb cb;
  328. guint buflen, flags;
  329. gint r;
  330. struct kvstorage_buf *databuf;
  331. cb = (kvstorage_read_cb)d->c->read_cb;
  332. if (what == EV_TIMEOUT) {
  333. cb (KVSTORAGE_ERROR_TIMEOUT, d->key, d->keylen, NULL, 0, d->c, d->ud);
  334. return;
  335. }
  336. if (d->c->state == KV_STATE_GET) {
  337. /* Create, fill and write buffer */
  338. buflen = d->keylen + sizeof ("get " CRLF);
  339. d->buf = rspamd_kvstorage_buf_create (buflen, d->c->pool);
  340. r = rspamd_snprintf (d->buf->data, d->buf->len, "get %*s" CRLF,
  341. d->keylen, d->key);
  342. d->buf->len = r;
  343. r = rspamd_kvstorage_buf_writeall (d->buf, d->c);
  344. if (r > 0) {
  345. /* Write more data at next call of this function */
  346. d->c->state = KV_STATE_WRITE_DATA;
  347. /* Event magic */
  348. event_del (&d->c->ev);
  349. event_set (&d->c->ev, d->c->sock, EV_WRITE, rspamd_kvstorage_read_cb, d);
  350. if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
  351. event_add (&d->c->ev, &d->c->tv);
  352. }
  353. else {
  354. event_add (&d->c->ev, NULL);
  355. }
  356. }
  357. else if (r == 0) {
  358. /* We have written everything */
  359. d->buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, d->c->pool);
  360. d->c->state = KV_STATE_READ_ELT;
  361. event_del (&d->c->ev);
  362. event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_read_cb, d);
  363. if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
  364. event_add (&d->c->ev, &d->c->tv);
  365. }
  366. else {
  367. event_add (&d->c->ev, NULL);
  368. }
  369. }
  370. else {
  371. /* Error occured during writing */
  372. cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, NULL, 0, d->c, d->ud);
  373. }
  374. }
  375. else if (d->c->state == KV_STATE_WRITE_DATA) {
  376. r = rspamd_kvstorage_buf_writeall (d->buf, d->c);
  377. if (r > 0) {
  378. /* Write more data at next call of this function */
  379. d->c->state = KV_STATE_WRITE_DATA;
  380. /* Event magic */
  381. event_del (&d->c->ev);
  382. event_set (&d->c->ev, d->c->sock, EV_WRITE, rspamd_kvstorage_read_cb, d);
  383. if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
  384. event_add (&d->c->ev, &d->c->tv);
  385. }
  386. else {
  387. event_add (&d->c->ev, NULL);
  388. }
  389. }
  390. else if (r == 0) {
  391. /* We have written everything */
  392. d->c->state = KV_STATE_READ_ELT;
  393. d->buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, d->c->pool);
  394. event_del (&d->c->ev);
  395. event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_read_cb, d);
  396. if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
  397. event_add (&d->c->ev, &d->c->tv);
  398. }
  399. else {
  400. event_add (&d->c->ev, NULL);
  401. }
  402. }
  403. else {
  404. /* Error occured during writing */
  405. cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, NULL, 0, d->c, d->ud);
  406. }
  407. }
  408. else if (d->c->state == KV_STATE_READ_ELT) {
  409. /* Read element info */
  410. r = rspamd_kvstorage_buf_readline (d->buf, d->c);
  411. if (r == EAGAIN) {
  412. /* Read more data */
  413. event_del (&d->c->ev);
  414. event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_read_cb, d);
  415. if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
  416. event_add (&d->c->ev, &d->c->tv);
  417. }
  418. else {
  419. event_add (&d->c->ev, NULL);
  420. }
  421. }
  422. else if (r == 0) {
  423. /* Got all data about elt */
  424. if ((r = rspamd_kvstorage_parse_get_line (d->buf, &d->datalen, &flags)) != KVSTORAGE_ERROR_OK) {
  425. cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, NULL, 0, d->c, d->ud);
  426. return;
  427. }
  428. rspamd_kvstorage_buf_drainline (d->buf);
  429. /* Now allocate and read the data */
  430. databuf = rspamd_kvstorage_buf_create (d->datalen, d->c->pool);
  431. memcpy (databuf->data, d->buf->data, d->buf->pos);
  432. d->buf = databuf;
  433. d->c->state = KV_STATE_READ_DATA;
  434. event_del (&d->c->ev);
  435. event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_read_cb, d);
  436. if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
  437. event_add (&d->c->ev, &d->c->tv);
  438. }
  439. else {
  440. event_add (&d->c->ev, NULL);
  441. }
  442. }
  443. else {
  444. /* Error occured during reading reply line */
  445. cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, NULL, 0, d->c, d->ud);
  446. }
  447. }
  448. else if (d->c->state == KV_STATE_READ_DATA) {
  449. /* Read data to the buffer */
  450. r = rspamd_kvstorage_buf_readall (d->buf, d->c);
  451. if (r == 0) {
  452. /* All data read, read the last line */
  453. d->c->state = KV_STATE_READ_REPLY;
  454. /* Save databuf */
  455. d->data = d->buf->data;
  456. event_del (&d->c->ev);
  457. event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_read_cb, d);
  458. if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
  459. event_add (&d->c->ev, &d->c->tv);
  460. }
  461. else {
  462. event_add (&d->c->ev, NULL);
  463. }
  464. }
  465. else if (r > 0) {
  466. /* Read more data into buffer */
  467. event_del (&d->c->ev);
  468. event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_read_cb, d);
  469. if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
  470. event_add (&d->c->ev, &d->c->tv);
  471. }
  472. else {
  473. event_add (&d->c->ev, NULL);
  474. }
  475. }
  476. else {
  477. /* Error occured */
  478. cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, NULL, 0, d->c, d->ud);
  479. }
  480. }
  481. else if (d->c->state == KV_STATE_READ_REPLY) {
  482. /* Got something from server, try to read line */
  483. r = rspamd_kvstorage_buf_readline (d->buf, d->c);
  484. if (r == EAGAIN) {
  485. /* Read more data */
  486. event_del (&d->c->ev);
  487. event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_read_cb, d);
  488. if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
  489. event_add (&d->c->ev, &d->c->tv);
  490. }
  491. else {
  492. event_add (&d->c->ev, NULL);
  493. }
  494. }
  495. else if (r == 0) {
  496. d->c->state = KV_STATE_CONNECTED;
  497. cb (rspamd_kvstorage_parse_reply_error (d->buf), d->key, d->keylen, d->data, d->datalen, d->c, d->ud);
  498. }
  499. else {
  500. /* Error occured during reading reply line */
  501. cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, NULL, 0, d->c, d->ud);
  502. }
  503. }
  504. }
  505. static void
  506. rspamd_kvstorage_write_cb (int fd, short what, gpointer ud)
  507. {
  508. struct rspamd_kvstorage_async_data *d = ud;
  509. kvstorage_write_cb cb;
  510. guint buflen;
  511. gint r;
  512. cb = (kvstorage_write_cb)d->c->write_cb;
  513. if (what == EV_TIMEOUT) {
  514. cb (KVSTORAGE_ERROR_TIMEOUT, d->key, d->keylen, d->c, d->ud);
  515. return;
  516. }
  517. if (d->c->state == KV_STATE_SET) {
  518. /* Create, fill and write buffer */
  519. buflen = d->datalen + d->keylen + sizeof ("set 4294967296 4294967296 4294967296" CRLF);
  520. d->buf = rspamd_kvstorage_buf_create (buflen, d->c->pool);
  521. r = rspamd_snprintf (d->buf->data, d->buf->len, "set %*s %ud %ud %ud" CRLF "%*s",
  522. d->keylen, d->key, 0, d->expire, d->datalen, d->datalen, d->data);
  523. d->buf->len = r;
  524. r = rspamd_kvstorage_buf_writeall (d->buf, d->c);
  525. if (r > 0) {
  526. /* Write more data at next call of this function */
  527. d->c->state = KV_STATE_WRITE_DATA;
  528. /* Event magic */
  529. event_del (&d->c->ev);
  530. event_set (&d->c->ev, d->c->sock, EV_WRITE, rspamd_kvstorage_write_cb, d);
  531. if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
  532. event_add (&d->c->ev, &d->c->tv);
  533. }
  534. else {
  535. event_add (&d->c->ev, NULL);
  536. }
  537. }
  538. else if (r == 0) {
  539. /* We have written everything */
  540. d->buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, d->c->pool);
  541. d->c->state = KV_STATE_READ_REPLY;
  542. event_del (&d->c->ev);
  543. event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_write_cb, d);
  544. if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
  545. event_add (&d->c->ev, &d->c->tv);
  546. }
  547. else {
  548. event_add (&d->c->ev, NULL);
  549. }
  550. }
  551. else {
  552. /* Error occured during writing */
  553. cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, d->c, d->ud);
  554. }
  555. }
  556. else if (d->c->state == KV_STATE_WRITE_DATA) {
  557. r = rspamd_kvstorage_buf_writeall (d->buf, d->c);
  558. if (r > 0) {
  559. /* Write more data at next call of this function */
  560. d->c->state = KV_STATE_WRITE_DATA;
  561. /* Event magic */
  562. event_del (&d->c->ev);
  563. event_set (&d->c->ev, d->c->sock, EV_WRITE, rspamd_kvstorage_write_cb, d);
  564. if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
  565. event_add (&d->c->ev, &d->c->tv);
  566. }
  567. else {
  568. event_add (&d->c->ev, NULL);
  569. }
  570. }
  571. else if (r == 0) {
  572. /* We have written everything */
  573. d->c->state = KV_STATE_READ_REPLY;
  574. d->buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, d->c->pool);
  575. event_del (&d->c->ev);
  576. event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_write_cb, d);
  577. if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
  578. event_add (&d->c->ev, &d->c->tv);
  579. }
  580. else {
  581. event_add (&d->c->ev, NULL);
  582. }
  583. }
  584. else {
  585. /* Error occured during writing */
  586. cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, d->c, d->ud);
  587. }
  588. }
  589. else if (d->c->state == KV_STATE_READ_REPLY) {
  590. /* Got something from server, try to read line */
  591. r = rspamd_kvstorage_buf_readline (d->buf, d->c);
  592. if (r == EAGAIN) {
  593. /* Read more data */
  594. event_del (&d->c->ev);
  595. event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_write_cb, d);
  596. if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
  597. event_add (&d->c->ev, &d->c->tv);
  598. }
  599. else {
  600. event_add (&d->c->ev, NULL);
  601. }
  602. }
  603. else if (r == 0) {
  604. d->c->state = KV_STATE_CONNECTED;
  605. cb (rspamd_kvstorage_parse_reply_error (d->buf), d->key, d->keylen, d->c, d->ud);
  606. }
  607. else {
  608. /* Error occured during reading reply line */
  609. cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, d->c, d->ud);
  610. }
  611. }
  612. }
  613. static void
  614. rspamd_kvstorage_delete_cb (int fd, short what, gpointer ud)
  615. {
  616. struct rspamd_kvstorage_async_data *d = ud;
  617. kvstorage_write_cb cb;
  618. guint buflen;
  619. gint r;
  620. cb = (kvstorage_write_cb)d->c->write_cb;
  621. if (what == EV_TIMEOUT) {
  622. cb (KVSTORAGE_ERROR_TIMEOUT, d->key, d->keylen, d->c, d->ud);
  623. return;
  624. }
  625. if (d->c->state == KV_STATE_SET) {
  626. /* Create, fill and write buffer */
  627. buflen = MAX (MAX_KV_LINE, d->keylen + sizeof ("delete " CRLF));
  628. d->buf = rspamd_kvstorage_buf_create (buflen, d->c->pool);
  629. r = rspamd_snprintf (d->buf->data, d->buf->len, "delete %*s" CRLF,
  630. d->keylen, d->key);
  631. d->buf->len = r;
  632. r = rspamd_kvstorage_buf_writeall (d->buf, d->c);
  633. if (r > 0) {
  634. /* Write more data at next call of this function */
  635. d->c->state = KV_STATE_WRITE_DATA;
  636. /* Event magic */
  637. event_del (&d->c->ev);
  638. event_set (&d->c->ev, d->c->sock, EV_WRITE, rspamd_kvstorage_delete_cb, d);
  639. if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
  640. event_add (&d->c->ev, &d->c->tv);
  641. }
  642. else {
  643. event_add (&d->c->ev, NULL);
  644. }
  645. }
  646. else if (r == 0) {
  647. /* We have written everything */
  648. d->buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, d->c->pool);
  649. d->c->state = KV_STATE_READ_REPLY;
  650. event_del (&d->c->ev);
  651. event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_delete_cb, d);
  652. if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
  653. event_add (&d->c->ev, &d->c->tv);
  654. }
  655. else {
  656. event_add (&d->c->ev, NULL);
  657. }
  658. }
  659. else {
  660. /* Error occured during writing */
  661. cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, d->c, d->ud);
  662. }
  663. }
  664. else if (d->c->state == KV_STATE_WRITE_DATA) {
  665. r = rspamd_kvstorage_buf_writeall (d->buf, d->c);
  666. if (r > 0) {
  667. /* Write more data at next call of this function */
  668. d->c->state = KV_STATE_WRITE_DATA;
  669. /* Event magic */
  670. event_del (&d->c->ev);
  671. event_set (&d->c->ev, d->c->sock, EV_WRITE, rspamd_kvstorage_delete_cb, d);
  672. if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
  673. event_add (&d->c->ev, &d->c->tv);
  674. }
  675. else {
  676. event_add (&d->c->ev, NULL);
  677. }
  678. }
  679. else if (r == 0) {
  680. /* We have written everything */
  681. d->c->state = KV_STATE_READ_REPLY;
  682. d->buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, d->c->pool);
  683. event_del (&d->c->ev);
  684. event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_delete_cb, d);
  685. if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
  686. event_add (&d->c->ev, &d->c->tv);
  687. }
  688. else {
  689. event_add (&d->c->ev, NULL);
  690. }
  691. }
  692. else {
  693. /* Error occured during writing */
  694. cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, d->c, d->ud);
  695. }
  696. }
  697. else if (d->c->state == KV_STATE_READ_REPLY) {
  698. /* Got something from server, try to read line */
  699. r = rspamd_kvstorage_buf_readline (d->buf, d->c);
  700. if (r == EAGAIN) {
  701. /* Read more data */
  702. event_del (&d->c->ev);
  703. event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_delete_cb, d);
  704. if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
  705. event_add (&d->c->ev, &d->c->tv);
  706. }
  707. else {
  708. event_add (&d->c->ev, NULL);
  709. }
  710. }
  711. else if (r == 0) {
  712. d->c->state = KV_STATE_CONNECTED;
  713. cb (rspamd_kvstorage_parse_reply_error (d->buf), d->key, d->keylen, d->c, d->ud);
  714. }
  715. else {
  716. /* Error occured during reading reply line */
  717. cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, d->c, d->ud);
  718. }
  719. }
  720. }
  721. /**
  722. * Create async connection with rspamd
  723. * @param host hostname, ip or unix socket for a server
  724. * @param port port number in host byte order
  725. * @param tv timeout for operations
  726. * @param cb callback
  727. * @param ud user data for callback
  728. * @param conn target connection
  729. */
  730. enum rspamd_kvstorage_error
  731. rspamd_kvstorage_connect_async (const gchar *host,
  732. guint16 port, struct timeval *tv, kvstorage_connect_cb cb, gpointer ud,
  733. struct rspamd_kvstorage_connection **conn)
  734. {
  735. struct rspamd_kvstorage_connection *new;
  736. struct rspamd_kvstorage_async_data *data;
  737. gint sock;
  738. /* Here we do NOT try to resolve hostname */
  739. if ((sock = make_universal_socket (host, port, SOCK_STREAM, TRUE, FALSE, TRUE)) == -1) {
  740. return KVSTORAGE_ERROR_SERVER_ERROR;
  741. }
  742. /* Allocate new connection structure */
  743. new = g_malloc (sizeof (struct rspamd_kvstorage_connection));
  744. /* Set fields */
  745. new->sock = sock;
  746. new->state = KV_STATE_NONE;
  747. new->asynced = TRUE;
  748. if (tv != NULL) {
  749. memcpy (&new->tv, tv, sizeof (struct timeval));
  750. }
  751. else {
  752. memset (&new->tv, 0, sizeof (struct timeval));
  753. }
  754. new->conn_cb = cb;
  755. new->pool = memory_pool_new (memory_pool_get_size ());
  756. data = memory_pool_alloc (new->pool, sizeof (struct rspamd_kvstorage_async_data));
  757. data->ud = ud;
  758. data->c = new;
  759. /* Set event */
  760. event_set (&new->ev, new->sock, EV_WRITE, rspamd_kvstorage_connect_cb, data);
  761. if (tv != NULL) {
  762. event_add (&new->ev, &new->tv);
  763. }
  764. else {
  765. event_add (&new->ev, NULL);
  766. }
  767. *conn = new;
  768. return KVSTORAGE_ERROR_OK;
  769. }
  770. /**
  771. * Read key asynced
  772. * @param conn connection structure
  773. * @param key key to read
  774. * @param cb callback
  775. * @param ud user data for callback
  776. */
  777. enum rspamd_kvstorage_error
  778. rspamd_kvstorage_get_async (struct rspamd_kvstorage_connection *conn,
  779. const gpointer key, guint keylen, kvstorage_read_cb cb, gpointer ud)
  780. {
  781. struct rspamd_kvstorage_async_data *d;
  782. if (conn == NULL || conn->state != KV_STATE_CONNECTED) {
  783. return KVSTORAGE_ERROR_INTERNAL_ERROR;
  784. }
  785. else {
  786. conn->read_cb = cb;
  787. d = memory_pool_alloc (conn->pool, sizeof (struct rspamd_kvstorage_async_data));
  788. d->ud = ud;
  789. d->c = conn;
  790. d->ud = ud;
  791. d->key = memory_pool_strdup (conn->pool, key);
  792. d->keylen = keylen;
  793. conn->state = KV_STATE_GET;
  794. /* Set event */
  795. event_set (&conn->ev, conn->sock, EV_WRITE, rspamd_kvstorage_read_cb, d);
  796. if (conn->tv.tv_sec != 0) {
  797. event_add (&conn->ev, &conn->tv);
  798. }
  799. else {
  800. event_add (&conn->ev, NULL);
  801. }
  802. }
  803. return KVSTORAGE_ERROR_OK;
  804. }
  805. /**
  806. * Write key asynced
  807. * @param conn connection structure
  808. * @param key key to set
  809. * @param value data to write
  810. * @param cb callback
  811. * @param ud user data for callback
  812. */
  813. enum rspamd_kvstorage_error
  814. rspamd_kvstorage_set_async (struct rspamd_kvstorage_connection *conn,
  815. const gpointer key, guint keylen, const gpointer value, gsize len, guint expire, kvstorage_write_cb cb,
  816. gpointer ud)
  817. {
  818. struct rspamd_kvstorage_async_data *d;
  819. if (conn == NULL || conn->state != KV_STATE_CONNECTED) {
  820. return KVSTORAGE_ERROR_INTERNAL_ERROR;
  821. }
  822. else {
  823. conn->write_cb = cb;
  824. d = memory_pool_alloc (conn->pool, sizeof (struct rspamd_kvstorage_async_data));
  825. d->ud = ud;
  826. d->c = conn;
  827. d->ud = ud;
  828. d->key = memory_pool_strdup (conn->pool, key);
  829. d->keylen = keylen;
  830. d->data = value;
  831. d->datalen = len;
  832. conn->state = KV_STATE_SET;
  833. /* Set event */
  834. event_set (&conn->ev, conn->sock, EV_WRITE, rspamd_kvstorage_write_cb, d);
  835. if (conn->tv.tv_sec != 0) {
  836. event_add (&conn->ev, &conn->tv);
  837. }
  838. else {
  839. event_add (&conn->ev, NULL);
  840. }
  841. }
  842. return KVSTORAGE_ERROR_OK;
  843. }
  844. /**
  845. * Delete key asynced
  846. * @param conn connection structure
  847. * @param key key to delete
  848. * @param cb callback
  849. * @param ud user data for callback
  850. */
  851. enum rspamd_kvstorage_error
  852. rspamd_kvstorage_delete_async (struct rspamd_kvstorage_connection *conn,
  853. const gpointer key, guint keylen, kvstorage_write_cb cb, gpointer ud)
  854. {
  855. struct rspamd_kvstorage_async_data *d;
  856. if (conn == NULL || conn->state != KV_STATE_CONNECTED) {
  857. return KVSTORAGE_ERROR_INTERNAL_ERROR;
  858. }
  859. else {
  860. conn->write_cb = cb;
  861. d = memory_pool_alloc (conn->pool, sizeof (struct rspamd_kvstorage_async_data));
  862. d->ud = ud;
  863. d->c = conn;
  864. d->ud = ud;
  865. d->key = memory_pool_strdup (conn->pool, key);
  866. d->keylen = keylen;
  867. conn->state = KV_STATE_SET;
  868. /* Set event */
  869. event_set (&conn->ev, conn->sock, EV_WRITE, rspamd_kvstorage_delete_cb, d);
  870. if (conn->tv.tv_sec != 0) {
  871. event_add (&conn->ev, &conn->tv);
  872. }
  873. else {
  874. event_add (&conn->ev, NULL);
  875. }
  876. }
  877. return KVSTORAGE_ERROR_OK;
  878. }
  879. /**
  880. * Close connection
  881. * @param conn connection structure
  882. */
  883. enum rspamd_kvstorage_error
  884. rspamd_kvstorage_close_async (struct rspamd_kvstorage_connection *conn)
  885. {
  886. close (conn->sock);
  887. memory_pool_delete (conn->pool);
  888. event_del (&conn->ev);
  889. g_free (conn);
  890. return KVSTORAGE_ERROR_OK;
  891. }
  892. /* Synced API */
  893. /**
  894. * Create sync connection with rspamd
  895. * @param host hostname, ip or unix socket for a server
  896. * @param port port number in host byte order
  897. * @param tv timeout for operations
  898. * @param conn target connection
  899. */
  900. enum rspamd_kvstorage_error
  901. rspamd_kvstorage_connect_sync (const gchar *host,
  902. guint16 port, struct timeval *tv,
  903. struct rspamd_kvstorage_connection **conn)
  904. {
  905. struct rspamd_kvstorage_connection *new;
  906. gint sock;
  907. if ((sock = make_universal_socket (host, port, SOCK_STREAM, FALSE, FALSE, TRUE)) == -1) {
  908. return KVSTORAGE_ERROR_INTERNAL_ERROR;
  909. }
  910. /* Allocate new connection structure */
  911. new = g_malloc (sizeof (struct rspamd_kvstorage_connection));
  912. /* Set fields */
  913. new->sock = sock;
  914. new->state = KV_STATE_CONNECTED;
  915. new->asynced = FALSE;
  916. if (tv != NULL) {
  917. memcpy (&new->tv, tv, sizeof (struct timeval));
  918. }
  919. else {
  920. memset (&new->tv, 0, sizeof (struct timeval));
  921. }
  922. new->pool = memory_pool_new (memory_pool_get_size ());
  923. *conn = new;
  924. return KVSTORAGE_ERROR_OK;
  925. }
  926. /**
  927. * Read key synced
  928. * @param conn connection structure
  929. * @param key key to read
  930. * @param value value readed
  931. */
  932. enum rspamd_kvstorage_error
  933. rspamd_kvstorage_get_sync (struct rspamd_kvstorage_connection *conn,
  934. const gpointer key, guint keylen, gpointer **value, guint *len)
  935. {
  936. struct kvstorage_buf *buf, *databuf;
  937. gint r;
  938. guint flags;
  939. if (conn == NULL || conn->state != KV_STATE_CONNECTED) {
  940. return KVSTORAGE_ERROR_INTERNAL_ERROR;
  941. }
  942. buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, conn->pool);
  943. r = rspamd_snprintf (buf->data, buf->len, "get %*s" CRLF, keylen, key);
  944. buf->len = r;
  945. while ((r = rspamd_kvstorage_buf_writeall (buf, conn)) > 0) {
  946. poll_sync_socket (conn->sock, tv_to_msec (&conn->tv), POLL_OUT);
  947. }
  948. if (r == -1) {
  949. return KVSTORAGE_ERROR_INTERNAL_ERROR;
  950. }
  951. /* Now read reply and try to parse line */
  952. buf->len = MAX_KV_LINE;
  953. buf->pos = 0;
  954. while ((r = rspamd_kvstorage_buf_readline (buf, conn)) == EAGAIN) {
  955. poll_sync_socket (conn->sock, tv_to_msec (&conn->tv), POLL_IN);
  956. }
  957. /* A line was read */
  958. if (r == 0) {
  959. if ((r = rspamd_kvstorage_parse_get_line (buf, len, &flags)) != KVSTORAGE_ERROR_OK) {
  960. return r;
  961. }
  962. rspamd_kvstorage_buf_drainline (buf);
  963. /* Now allocate and read the data */
  964. databuf = rspamd_kvstorage_buf_create (*len, conn->pool);
  965. memcpy (databuf->data, buf->data, buf->pos);
  966. while ((r = rspamd_kvstorage_buf_readall (databuf, conn)) > 0) {
  967. poll_sync_socket (conn->sock, tv_to_msec (&conn->tv), POLL_IN);
  968. }
  969. if (r == -1) {
  970. return KVSTORAGE_ERROR_INTERNAL_ERROR;
  971. }
  972. /* Now we have data inside buffer, read the last line */
  973. buf->pos = 0;
  974. while ((r = rspamd_kvstorage_buf_readline (buf, conn)) == EAGAIN) {
  975. poll_sync_socket (conn->sock, tv_to_msec (&conn->tv), POLL_IN);
  976. }
  977. *value = (gpointer)buf->data;
  978. }
  979. return KVSTORAGE_ERROR_OK;
  980. }
  981. /**
  982. * Write key synced
  983. * @param conn connection structure
  984. * @param key key to set
  985. * @param value data to write
  986. */
  987. enum rspamd_kvstorage_error
  988. rspamd_kvstorage_set_sync (struct rspamd_kvstorage_connection *conn,
  989. const gpointer key, guint keylen, const gpointer value, gsize len, guint expire)
  990. {
  991. struct kvstorage_buf *buf;
  992. gint r, buflen;
  993. if (conn == NULL || conn->state != KV_STATE_CONNECTED) {
  994. return KVSTORAGE_ERROR_INTERNAL_ERROR;
  995. }
  996. /* Create buf */
  997. buflen = len + keylen + sizeof ("set 4294967296 4294967296 4294967296" CRLF);
  998. buf = rspamd_kvstorage_buf_create (buflen, conn->pool);
  999. r = rspamd_snprintf (buf->data, buf->len, "set %*s %ud %ud %ud" CRLF "%*s",
  1000. keylen, key, 0, expire, len, len, value);
  1001. buf->len = r;
  1002. while ((r = rspamd_kvstorage_buf_writeall (buf, conn)) > 0) {
  1003. poll_sync_socket (conn->sock, tv_to_msec (&conn->tv), POLL_OUT);
  1004. }
  1005. if (r == -1) {
  1006. return KVSTORAGE_ERROR_INTERNAL_ERROR;
  1007. }
  1008. /* Now we can read reply */
  1009. buf->pos = 0;
  1010. buf->len = buflen;
  1011. while ((r = rspamd_kvstorage_buf_readline (buf, conn)) == EAGAIN) {
  1012. poll_sync_socket (conn->sock, tv_to_msec (&conn->tv), POLL_IN);
  1013. }
  1014. return rspamd_kvstorage_parse_reply_error (buf);
  1015. }
  1016. /**
  1017. * Delete key synced
  1018. * @param conn connection structure
  1019. * @param key key to delete
  1020. */
  1021. enum rspamd_kvstorage_error
  1022. rspamd_kvstorage_delete_sync (struct rspamd_kvstorage_connection *conn,
  1023. const gpointer key, guint keylen)
  1024. {
  1025. struct kvstorage_buf *buf;
  1026. gint r, buflen;
  1027. if (conn == NULL || conn->state != KV_STATE_CONNECTED) {
  1028. return KVSTORAGE_ERROR_INTERNAL_ERROR;
  1029. }
  1030. /* Create buf */
  1031. buflen = MAX (keylen + sizeof ("delete " CRLF), MAX_KV_LINE);
  1032. buf = rspamd_kvstorage_buf_create (buflen, conn->pool);
  1033. r = rspamd_snprintf (buf->data, buf->len, "delete %*s" CRLF,
  1034. keylen, key);
  1035. buf->len = r;
  1036. while ((r = rspamd_kvstorage_buf_writeall (buf, conn)) > 0) {
  1037. poll_sync_socket (conn->sock, tv_to_msec (&conn->tv), POLL_OUT);
  1038. }
  1039. if (r == -1) {
  1040. return KVSTORAGE_ERROR_INTERNAL_ERROR;
  1041. }
  1042. /* Now we can read reply */
  1043. buf->len = buflen;
  1044. buf->pos = 0;
  1045. while ((r = rspamd_kvstorage_buf_readline (buf, conn)) == EAGAIN) {
  1046. poll_sync_socket (conn->sock, tv_to_msec (&conn->tv), POLL_IN);
  1047. }
  1048. return rspamd_kvstorage_parse_reply_error (buf);
  1049. }
  1050. /**
  1051. * Close connection
  1052. * @param conn connection structure
  1053. */
  1054. enum rspamd_kvstorage_error
  1055. rspamd_kvstorage_close_sync (struct rspamd_kvstorage_connection *conn)
  1056. {
  1057. close (conn->sock);
  1058. memory_pool_delete (conn->pool);
  1059. g_free (conn);
  1060. return KVSTORAGE_ERROR_OK;
  1061. }
  1062. const gchar*
  1063. rspamd_kvstorage_strerror (enum rspamd_kvstorage_error err)
  1064. {
  1065. switch (err) {
  1066. case KVSTORAGE_ERROR_OK:
  1067. return "operation completed";
  1068. case KVSTORAGE_ERROR_TIMEOUT:
  1069. return "operation timeout";
  1070. case KVSTORAGE_ERROR_NOT_FOUND:
  1071. return "key not found";
  1072. case KVSTORAGE_ERROR_NOT_STORED:
  1073. return "key not stored";
  1074. case KVSTORAGE_ERROR_EXISTS:
  1075. return "key exists";
  1076. case KVSTORAGE_ERROR_SERVER_ERROR:
  1077. return "server error";
  1078. case KVSTORAGE_ERROR_CLIENT_ERROR:
  1079. return "client error";
  1080. case KVSTORAGE_ERROR_INTERNAL_ERROR:
  1081. return "library error";
  1082. }
  1083. /* Not reached */
  1084. return "unknown error";
  1085. }