Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802
  1. /*
  2. * Copyright (c) 2009-2012, Vsevolod Stakhov
  3. * All rights reserved.
  4. *
  5. * Redistribution and use in source and binary forms, with or without
  6. * modification, are permitted provided that the following conditions are met:
  7. * * Redistributions of source code must retain the above copyright
  8. * notice, this list of conditions and the following disclaimer.
  9. * * Redistributions in binary form must reproduce the above copyright
  10. * notice, this list of conditions and the following disclaimer in the
  11. * documentation and/or other materials provided with the distribution.
  12. *
  13. * THIS SOFTWARE IS PROVIDED BY AUTHOR ''AS IS'' AND ANY
  14. * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  15. * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  16. * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY
  17. * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  18. * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  19. * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
  20. * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  21. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  22. * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  23. */
  24. #include "config.h"
  25. #include "buffer.h"
  26. #include "main.h"
  27. #define G_DISPATCHER_ERROR dispatcher_error_quark()
  28. #define debug_ip(...) rspamd_conditional_debug(rspamd_main->logger, d->peer_addr, __FUNCTION__, __VA_ARGS__)
  29. static void dispatcher_cb (gint fd, short what, void *arg);
  30. static inline GQuark
  31. dispatcher_error_quark (void)
  32. {
  33. return g_quark_from_static_string ("g-dispatcher-error-quark");
  34. }
  35. static gboolean
  36. sendfile_callback (rspamd_io_dispatcher_t *d)
  37. {
  38. GError *err;
  39. #ifdef HAVE_SENDFILE
  40. # if defined(FREEBSD) || defined(DARWIN)
  41. off_t off = 0;
  42. #if defined(FREEBSD)
  43. /* FreeBSD version */
  44. if (sendfile (d->sendfile_fd, d->fd, d->offset, 0, NULL, &off, 0) != 0) {
  45. #elif defined(DARWIN)
  46. /* Darwin version */
  47. if (sendfile (d->sendfile_fd, d->fd, d->offset, &off, NULL, 0) != 0) {
  48. #endif
  49. if (errno != EAGAIN) {
  50. if (d->err_callback) {
  51. err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
  52. d->err_callback (err, d->user_data);
  53. return FALSE;
  54. }
  55. }
  56. else {
  57. debug_ip("partially write data, retry");
  58. /* Wait for other event */
  59. d->offset += off;
  60. event_del (d->ev);
  61. event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
  62. event_base_set (d->ev_base, d->ev);
  63. event_add (d->ev, d->tv);
  64. }
  65. }
  66. else {
  67. if (d->write_callback) {
  68. if (!d->write_callback (d->user_data)) {
  69. debug_ip("callback set wanna_die flag, terminating");
  70. return FALSE;
  71. }
  72. }
  73. event_del (d->ev);
  74. event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
  75. event_base_set (d->ev_base, d->ev);
  76. event_add (d->ev, d->tv);
  77. d->in_sendfile = FALSE;
  78. }
  79. # else
  80. ssize_t r;
  81. /* Linux version */
  82. r = sendfile (d->fd, d->sendfile_fd, &d->offset, d->file_size);
  83. if (r == -1) {
  84. if (errno != EAGAIN) {
  85. if (d->err_callback) {
  86. err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
  87. d->err_callback (err, d->user_data);
  88. return FALSE;
  89. }
  90. }
  91. else {
  92. debug_ip("partially write data, retry");
  93. /* Wait for other event */
  94. event_del (d->ev);
  95. event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
  96. event_base_set (d->ev_base, d->ev);
  97. event_add (d->ev, d->tv);
  98. }
  99. }
  100. else if (r + d->offset < (ssize_t)d->file_size) {
  101. debug_ip("partially write data, retry");
  102. /* Wait for other event */
  103. event_del (d->ev);
  104. event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
  105. event_base_set (d->ev_base, d->ev);
  106. event_add (d->ev, d->tv);
  107. }
  108. else {
  109. if (d->write_callback) {
  110. if (!d->write_callback (d->user_data)) {
  111. debug_ip("callback set wanna_die flag, terminating");
  112. return FALSE;
  113. }
  114. }
  115. event_del (d->ev);
  116. event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
  117. event_base_set (d->ev_base, d->ev);
  118. event_add (d->ev, d->tv);
  119. d->in_sendfile = FALSE;
  120. }
  121. # endif
  122. #else
  123. ssize_t r;
  124. r = write (d->fd, d->map, d->file_size - d->offset);
  125. if (r == -1) {
  126. if (errno != EAGAIN) {
  127. if (d->err_callback) {
  128. err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
  129. d->err_callback (err, d->user_data);
  130. return FALSE;
  131. }
  132. }
  133. else {
  134. debug_ip("partially write data, retry");
  135. /* Wait for other event */
  136. event_del (d->ev);
  137. event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
  138. event_base_set (d->ev_base, d->ev);
  139. event_add (d->ev, d->tv);
  140. }
  141. }
  142. else if (r + d->offset < d->file_size) {
  143. d->offset += r;
  144. debug_ip("partially write data, retry");
  145. /* Wait for other event */
  146. event_del (d->ev);
  147. event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
  148. event_base_set (d->ev_base, d->ev);
  149. event_add (d->ev, d->tv);
  150. }
  151. else {
  152. if (d->write_callback) {
  153. if (!d->write_callback (d->user_data)) {
  154. debug_ip("callback set wanna_die flag, terminating");
  155. return FALSE;
  156. }
  157. }
  158. event_del (d->ev);
  159. event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
  160. event_base_set (d->ev_base, d->ev);
  161. event_add (d->ev, d->tv);
  162. d->in_sendfile = FALSE;
  163. }
  164. #endif
  165. return TRUE;
  166. }
  167. #define BUFREMAIN(x) (x)->data->size - ((x)->pos - (x)->data->begin)
  168. static gboolean
  169. write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed)
  170. {
  171. GList *cur = NULL, *tmp;
  172. GError *err = NULL;
  173. rspamd_buffer_t *buf;
  174. ssize_t r;
  175. struct iovec *iov;
  176. guint i, len, blen;
  177. len = g_queue_get_length (d->out_buffers);
  178. if (len > 1) {
  179. /* IOV version */
  180. /* Unset delayed as actually we HAVE buffers to write */
  181. is_delayed = TRUE;
  182. cur = d->out_buffers->tail;
  183. iov = g_slice_alloc (len * sizeof (struct iovec));
  184. i = 0;
  185. while (cur) {
  186. buf = cur->data;
  187. blen = BUFREMAIN (buf);
  188. if (blen > 0) {
  189. iov[i].iov_base = buf->pos;
  190. iov[i].iov_len = blen;
  191. }
  192. else {
  193. iov[i].iov_base = NULL;
  194. iov[i].iov_len = 0;
  195. }
  196. i ++;
  197. cur = g_list_previous (cur);
  198. }
  199. /* Now try to write the whole vector */
  200. r = writev (fd, iov, len);
  201. if (r == -1 && errno != EAGAIN) {
  202. g_slice_free1 (len * sizeof (struct iovec), iov);
  203. if (d->err_callback) {
  204. err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
  205. d->err_callback (err, d->user_data);
  206. return FALSE;
  207. }
  208. }
  209. else if (r > 0) {
  210. /* Find pos inside buffers */
  211. cur = d->out_buffers->tail;
  212. i = 0;
  213. while (cur) {
  214. buf = cur->data;
  215. blen = BUFREMAIN (buf);
  216. if (r >= (ssize_t)blen) {
  217. tmp = cur;
  218. cur = g_list_previous (cur);
  219. /* Mark this buffer as read */
  220. g_queue_delete_link (d->out_buffers, tmp);
  221. r -= blen;
  222. }
  223. else {
  224. /* This buffer was not written completely */
  225. buf->pos += r;
  226. break;
  227. }
  228. i ++;
  229. cur = g_list_previous (cur);
  230. }
  231. g_slice_free1 (len * sizeof (struct iovec), iov);
  232. if (cur != 0) {
  233. /* Wait for other event */
  234. event_del (d->ev);
  235. event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
  236. event_base_set (d->ev_base, d->ev);
  237. event_add (d->ev, d->tv);
  238. return TRUE;
  239. }
  240. }
  241. else if (r == 0) {
  242. /* Got EOF while we wait for data */
  243. g_slice_free1 (len * sizeof (struct iovec), iov);
  244. if (d->err_callback) {
  245. err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF");
  246. d->err_callback (err, d->user_data);
  247. return FALSE;
  248. }
  249. }
  250. else if (r == -1 && errno == EAGAIN) {
  251. g_slice_free1 (len * sizeof (struct iovec), iov);
  252. debug_ip("partially write data, retry");
  253. /* Wait for other event */
  254. event_del (d->ev);
  255. event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
  256. event_base_set (d->ev_base, d->ev);
  257. event_add (d->ev, d->tv);
  258. return TRUE;
  259. }
  260. }
  261. else if (len == 1) {
  262. /* Single write version */
  263. buf = d->out_buffers->head->data;
  264. r = write (fd, buf->pos, BUFREMAIN (buf));
  265. if (r == -1 && errno != EAGAIN) {
  266. if (d->err_callback) {
  267. err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
  268. d->err_callback (err, d->user_data);
  269. return FALSE;
  270. }
  271. }
  272. else if (r > 0) {
  273. buf->pos += r;
  274. if (BUFREMAIN (buf) != 0) {
  275. /* Continue with this buffer */
  276. debug_ip("wrote %z bytes of %z", r, buf->data->len);
  277. return write_buffers (fd, d, is_delayed);
  278. }
  279. }
  280. else if (r == 0) {
  281. /* Got EOF while we wait for data */
  282. if (d->err_callback) {
  283. err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF");
  284. d->err_callback (err, d->user_data);
  285. return FALSE;
  286. }
  287. }
  288. else if (r == -1 && errno == EAGAIN) {
  289. debug_ip("partially write data, retry");
  290. /* Wait for other event */
  291. event_del (d->ev);
  292. event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
  293. event_base_set (d->ev_base, d->ev);
  294. event_add (d->ev, d->tv);
  295. return TRUE;
  296. }
  297. }
  298. if (cur == NULL) {
  299. /* Disable write event for this time */
  300. g_queue_clear (d->out_buffers);
  301. debug_ip("all buffers were written successfully");
  302. if (is_delayed && d->write_callback) {
  303. if (!d->write_callback (d->user_data)) {
  304. debug_ip("callback set wanna_die flag, terminating");
  305. return FALSE;
  306. }
  307. }
  308. event_del (d->ev);
  309. event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
  310. event_base_set (d->ev_base, d->ev);
  311. event_add (d->ev, d->tv);
  312. }
  313. else {
  314. /* Plan other write event */
  315. event_del (d->ev);
  316. event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
  317. event_base_set (d->ev_base, d->ev);
  318. event_add (d->ev, d->tv);
  319. }
  320. return TRUE;
  321. }
  322. static void
  323. read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read)
  324. {
  325. ssize_t r;
  326. GError *err = NULL;
  327. f_str_t res;
  328. gchar *c, *b;
  329. gchar *end;
  330. size_t len;
  331. enum io_policy saved_policy;
  332. if (d->wanna_die) {
  333. rspamd_remove_dispatcher (d);
  334. return;
  335. }
  336. if (d->in_buf == NULL) {
  337. d->in_buf = memory_pool_alloc_tmp (d->pool, sizeof (rspamd_buffer_t));
  338. if (d->policy == BUFFER_LINE || d->policy == BUFFER_ANY) {
  339. d->in_buf->data = fstralloc_tmp (d->pool, BUFSIZ);
  340. }
  341. else {
  342. d->in_buf->data = fstralloc_tmp (d->pool, d->nchars + 1);
  343. }
  344. d->in_buf->pos = d->in_buf->data->begin;
  345. }
  346. end = d->in_buf->pos;
  347. len = d->in_buf->data->len;
  348. if (BUFREMAIN (d->in_buf) == 0) {
  349. /* Buffer is full, try to call callback with overflow error */
  350. if (d->err_callback) {
  351. err = g_error_new (G_DISPATCHER_ERROR, E2BIG, "buffer overflow");
  352. d->err_callback (err, d->user_data);
  353. return;
  354. }
  355. }
  356. else if (!skip_read) {
  357. /* Try to read the whole buffer */
  358. r = read (fd, end, BUFREMAIN (d->in_buf));
  359. if (r == -1 && errno != EAGAIN) {
  360. if (d->err_callback) {
  361. err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
  362. d->err_callback (err, d->user_data);
  363. return;
  364. }
  365. }
  366. else if (r == 0) {
  367. /* Got EOF while we wait for data */
  368. #if 0
  369. if (d->err_callback) {
  370. err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF");
  371. d->err_callback (err, d->user_data);
  372. return;
  373. }
  374. #endif
  375. /* Read returned 0, it may be shutdown or full quit */
  376. if (!d->want_read) {
  377. d->half_closed = TRUE;
  378. /* Do not expect any read after this */
  379. event_del (d->ev);
  380. }
  381. else {
  382. if (d->err_callback) {
  383. err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF");
  384. d->err_callback (err, d->user_data);
  385. return;
  386. }
  387. }
  388. }
  389. else if (r == -1 && errno == EAGAIN) {
  390. debug_ip("partially read data, retry");
  391. return;
  392. }
  393. else {
  394. /* Set current position in buffer */
  395. d->in_buf->pos += r;
  396. d->in_buf->data->len += r;
  397. }
  398. debug_ip("read %z characters, policy is %s, watermark is: %z, buffer has %z bytes", r,
  399. d->policy == BUFFER_LINE ? "LINE" : "CHARACTER", d->nchars, d->in_buf->data->len);
  400. }
  401. saved_policy = d->policy;
  402. c = d->in_buf->data->begin;
  403. end = d->in_buf->pos;
  404. len = d->in_buf->data->len;
  405. b = c;
  406. r = 0;
  407. switch (d->policy) {
  408. case BUFFER_LINE:
  409. /** Variables:
  410. * b - begin of line
  411. * r - current position in buffer
  412. * *len - length of remaining buffer
  413. * c - pointer to current position (buffer->begin + r)
  414. * res - result string
  415. */
  416. while (r < (ssize_t)len) {
  417. if (*c == '\n') {
  418. res.begin = b;
  419. res.len = c - b;
  420. /* Strip EOL */
  421. if (d->strip_eol) {
  422. if (r != 0 && *(c - 1) == '\r') {
  423. res.len--;
  424. }
  425. }
  426. else {
  427. /* Include EOL in reply */
  428. res.len ++;
  429. }
  430. /* Call callback for a line */
  431. if (d->read_callback) {
  432. if (!d->read_callback (&res, d->user_data)) {
  433. return;
  434. }
  435. if (d->policy != saved_policy) {
  436. /* Drain buffer as policy is changed */
  437. /* Note that d->in_buffer is other pointer now, so we need to reinit all pointers */
  438. /* First detect how much symbols do we have */
  439. if (end == c) {
  440. /* In fact we read the whole buffer and change input policy, so just set current pos to begin of buffer */
  441. d->in_buf->pos = d->in_buf->data->begin;
  442. d->in_buf->data->len = 0;
  443. }
  444. else {
  445. /* Otherwise we need to move buffer */
  446. /* Reinit pointers */
  447. len = d->in_buf->data->len - r - 1;
  448. end = d->in_buf->data->begin + r + 1;
  449. memmove (d->in_buf->data->begin, end, len);
  450. d->in_buf->data->len = len;
  451. d->in_buf->pos = d->in_buf->data->begin + len;
  452. /* Process remaining buffer */
  453. read_buffers (fd, d, TRUE);
  454. }
  455. return;
  456. }
  457. }
  458. /* Set new begin of line */
  459. b = c + 1;
  460. }
  461. r++;
  462. c++;
  463. }
  464. /* Now drain remaining characters in buffer */
  465. memmove (d->in_buf->data->begin, b, c - b);
  466. d->in_buf->data->len = c - b;
  467. d->in_buf->pos = d->in_buf->data->begin + (c - b);
  468. break;
  469. case BUFFER_CHARACTER:
  470. r = d->nchars;
  471. if ((ssize_t)len >= r) {
  472. res.begin = b;
  473. res.len = r;
  474. c = b + r;
  475. if (d->read_callback) {
  476. if (!d->read_callback (&res, d->user_data)) {
  477. return;
  478. }
  479. /* Move remaining string to begin of buffer (draining) */
  480. if ((ssize_t)len > r) {
  481. len -= r;
  482. memmove (d->in_buf->data->begin, c, len);
  483. d->in_buf->data->len = len;
  484. d->in_buf->pos = d->in_buf->data->begin + len;
  485. b = d->in_buf->data->begin;
  486. }
  487. else {
  488. d->in_buf->data->len = 0;
  489. d->in_buf->pos = d->in_buf->data->begin;
  490. }
  491. if (d->policy != saved_policy && (ssize_t)len != r) {
  492. debug_ip("policy changed during callback, restart buffer's processing");
  493. read_buffers (fd, d, TRUE);
  494. return;
  495. }
  496. }
  497. }
  498. break;
  499. case BUFFER_ANY:
  500. res.begin = d->in_buf->data->begin;
  501. res.len = len;
  502. if (d->read_callback) {
  503. /*
  504. * Actually we do not want to send zero sized
  505. * buffers to a read callback
  506. */
  507. if (! (d->want_read && res.len == 0)) {
  508. if (!d->read_callback (&res, d->user_data)) {
  509. return;
  510. }
  511. }
  512. if (d->policy != saved_policy) {
  513. debug_ip("policy changed during callback, restart buffer's processing");
  514. read_buffers (fd, d, TRUE);
  515. return;
  516. }
  517. }
  518. d->in_buf->pos = d->in_buf->data->begin;
  519. d->in_buf->data->len = 0;
  520. break;
  521. }
  522. }
  523. #undef BUFREMAIN
  524. static void
  525. dispatcher_cb (gint fd, short what, void *arg)
  526. {
  527. rspamd_io_dispatcher_t *d = (rspamd_io_dispatcher_t *) arg;
  528. GError *err = NULL;
  529. debug_ip("in dispatcher callback, what: %d, fd: %d", (gint)what, fd);
  530. if ((what & EV_TIMEOUT) != 0) {
  531. if (d->err_callback) {
  532. err = g_error_new (G_DISPATCHER_ERROR, ETIMEDOUT, "IO timeout");
  533. d->err_callback (err, d->user_data);
  534. }
  535. }
  536. else if ((what & EV_READ) != 0) {
  537. read_buffers (fd, d, FALSE);
  538. }
  539. else if ((what & EV_WRITE) != 0) {
  540. /* No data to write, disable further EV_WRITE to this fd */
  541. if (d->in_sendfile) {
  542. sendfile_callback (d);
  543. }
  544. else {
  545. if (g_queue_get_length (d->out_buffers) == 0) {
  546. if (d->half_closed && !d->is_restored) {
  547. /* Socket is half closed and there is nothing more to write, closing connection */
  548. if (d->err_callback) {
  549. err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF");
  550. d->err_callback (err, d->user_data);
  551. return;
  552. }
  553. }
  554. else {
  555. /* Want read again */
  556. event_del (d->ev);
  557. event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
  558. event_base_set (d->ev_base, d->ev);
  559. event_add (d->ev, d->tv);
  560. if (d->is_restored && d->write_callback) {
  561. if (!d->write_callback (d->user_data)) {
  562. return;
  563. }
  564. d->is_restored = FALSE;
  565. }
  566. }
  567. }
  568. else {
  569. /* Delayed write */
  570. write_buffers (fd, d, TRUE);
  571. }
  572. }
  573. }
  574. }
  575. rspamd_io_dispatcher_t *
  576. rspamd_create_dispatcher (struct event_base *base, gint fd, enum io_policy policy,
  577. dispatcher_read_callback_t read_cb, dispatcher_write_callback_t write_cb, dispatcher_err_callback_t err_cb, struct timeval *tv, void *user_data)
  578. {
  579. rspamd_io_dispatcher_t *new;
  580. if (fd == -1) {
  581. return NULL;
  582. }
  583. new = g_malloc (sizeof (rspamd_io_dispatcher_t));
  584. bzero (new, sizeof (rspamd_io_dispatcher_t));
  585. new->pool = memory_pool_new (memory_pool_get_size ());
  586. if (tv != NULL) {
  587. new->tv = memory_pool_alloc (new->pool, sizeof (struct timeval));
  588. memcpy (new->tv, tv, sizeof (struct timeval));
  589. }
  590. else {
  591. new->tv = NULL;
  592. }
  593. new->nchars = 0;
  594. new->in_sendfile = FALSE;
  595. new->policy = policy;
  596. new->read_callback = read_cb;
  597. new->write_callback = write_cb;
  598. new->err_callback = err_cb;
  599. new->user_data = user_data;
  600. new->strip_eol = TRUE;
  601. new->half_closed = FALSE;
  602. new->want_read = TRUE;
  603. new->ev = memory_pool_alloc0 (new->pool, sizeof (struct event));
  604. new->fd = fd;
  605. new->ev_base = base;
  606. new->out_buffers = g_queue_new ();
  607. event_set (new->ev, fd, EV_WRITE, dispatcher_cb, (void *)new);
  608. event_base_set (new->ev_base, new->ev);
  609. event_add (new->ev, new->tv);
  610. return new;
  611. }
  612. void
  613. rspamd_remove_dispatcher (rspamd_io_dispatcher_t * dispatcher)
  614. {
  615. if (dispatcher != NULL) {
  616. event_del (dispatcher->ev);
  617. memory_pool_delete (dispatcher->pool);
  618. g_queue_free (dispatcher->out_buffers);
  619. g_free (dispatcher);
  620. }
  621. }
  622. void
  623. rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t * d, enum io_policy policy, size_t nchars)
  624. {
  625. f_str_t *tmp;
  626. gint t;
  627. if (d->policy != policy || nchars != d->nchars) {
  628. d->policy = policy;
  629. d->nchars = nchars ? nchars : BUFSIZ;
  630. /* Resize input buffer if needed */
  631. if (policy == BUFFER_CHARACTER && nchars != 0) {
  632. if (d->in_buf && d->in_buf->data->size < nchars) {
  633. tmp = fstralloc_tmp (d->pool, d->nchars + 1);
  634. memcpy (tmp->begin, d->in_buf->data->begin, d->in_buf->data->len);
  635. t = d->in_buf->pos - d->in_buf->data->begin;
  636. tmp->len = d->in_buf->data->len;
  637. d->in_buf->data = tmp;
  638. d->in_buf->pos = d->in_buf->data->begin + t;
  639. }
  640. }
  641. else if (policy == BUFFER_LINE || policy == BUFFER_ANY) {
  642. if (d->in_buf && d->nchars < BUFSIZ) {
  643. tmp = fstralloc_tmp (d->pool, BUFSIZ);
  644. memcpy (tmp->begin, d->in_buf->data->begin, d->in_buf->data->len);
  645. t = d->in_buf->pos - d->in_buf->data->begin;
  646. tmp->len = d->in_buf->data->len;
  647. d->in_buf->data = tmp;
  648. d->in_buf->pos = d->in_buf->data->begin + t;
  649. }
  650. d->strip_eol = TRUE;
  651. }
  652. }
  653. debug_ip("new input length watermark is %uz", d->nchars);
  654. }
  655. gboolean
  656. rspamd_dispatcher_write (rspamd_io_dispatcher_t * d, const void *data, size_t len, gboolean delayed, gboolean allocated)
  657. {
  658. rspamd_buffer_t *newbuf;
  659. newbuf = memory_pool_alloc_tmp (d->pool, sizeof (rspamd_buffer_t));
  660. if (len == 0) {
  661. /* Assume NULL terminated */
  662. len = strlen ((const gchar *)data);
  663. }
  664. if (!allocated) {
  665. newbuf->data = memory_pool_alloc_tmp (d->pool, sizeof (f_str_t));
  666. newbuf->data->begin = memory_pool_alloc_tmp (d->pool, len);
  667. newbuf->data->size = len;
  668. newbuf->data->len = len;
  669. /* We need to copy data to temporary internal buffer to avoid using of stack variables */
  670. memcpy (newbuf->data->begin, data, len);
  671. }
  672. else {
  673. newbuf->data = memory_pool_alloc_tmp (d->pool, sizeof (f_str_t));
  674. newbuf->data->begin = (gchar *)data;
  675. newbuf->data->size = len;
  676. }
  677. newbuf->pos = newbuf->data->begin;
  678. newbuf->data->len = len;
  679. g_queue_push_head (d->out_buffers, newbuf);
  680. if (!delayed) {
  681. debug_ip("plan write event");
  682. return write_buffers (d->fd, d, FALSE);
  683. }
  684. /* Otherwise plan write event */
  685. event_del (d->ev);
  686. event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
  687. event_base_set (d->ev_base, d->ev);
  688. event_add (d->ev, d->tv);
  689. return TRUE;
  690. }
  691. gboolean
  692. rspamd_dispatcher_sendfile (rspamd_io_dispatcher_t *d, gint fd, size_t len)
  693. {
  694. if (lseek (fd, 0, SEEK_SET) == -1) {
  695. msg_warn ("lseek failed: %s", strerror (errno));
  696. return FALSE;
  697. }
  698. d->offset = 0;
  699. d->in_sendfile = TRUE;
  700. d->sendfile_fd = fd;
  701. d->file_size = len;
  702. #ifndef HAVE_SENDFILE
  703. #ifdef HAVE_MMAP_NOCORE
  704. if ((d->map = mmap (NULL, len, PROT_READ, MAP_SHARED | MAP_NOCORE, fd, 0)) == MAP_FAILED) {
  705. #else
  706. if ((d->map = mmap (NULL, len, PROT_READ, MAP_SHARED, fd, 0)) == MAP_FAILED) {
  707. #endif
  708. msg_warn ("mmap failed: %s", strerror (errno));
  709. return FALSE;
  710. }
  711. #endif
  712. return sendfile_callback (d);
  713. }
  714. void
  715. rspamd_dispatcher_pause (rspamd_io_dispatcher_t * d)
  716. {
  717. debug_ip ("paused dispatcher");
  718. event_del (d->ev);
  719. d->is_restored = FALSE;
  720. }
  721. void
  722. rspamd_dispatcher_restore (rspamd_io_dispatcher_t * d)
  723. {
  724. if (!d->is_restored) {
  725. debug_ip ("restored dispatcher");
  726. event_del (d->ev);
  727. event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, d);
  728. event_base_set (d->ev_base, d->ev);
  729. event_add (d->ev, d->tv);
  730. d->is_restored = TRUE;
  731. }
  732. }
  733. void
  734. rspamd_dispacther_cleanup (rspamd_io_dispatcher_t *d)
  735. {
  736. g_queue_clear (d->out_buffers);
  737. /* Cleanup temporary data */
  738. memory_pool_cleanup_tmp (d->pool);
  739. d->in_buf = NULL;
  740. }
  741. #undef debug_ip
  742. /*
  743. * vi:ts=4
  744. */