You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786
  1. /*
  2. * Copyright (c) 2009-2012, Vsevolod Stakhov
  3. * All rights reserved.
  4. *
  5. * Redistribution and use in source and binary forms, with or without
  6. * modification, are permitted provided that the following conditions are met:
  7. * * Redistributions of source code must retain the above copyright
  8. * notice, this list of conditions and the following disclaimer.
  9. * * Redistributions in binary form must reproduce the above copyright
  10. * notice, this list of conditions and the following disclaimer in the
  11. * documentation and/or other materials provided with the distribution.
  12. *
  13. * THIS SOFTWARE IS PROVIDED BY AUTHOR ''AS IS'' AND ANY
  14. * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  15. * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  16. * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY
  17. * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  18. * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  19. * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
  20. * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  21. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  22. * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  23. */
  24. #include "config.h"
  25. #include "buffer.h"
  26. #include "main.h"
  27. #ifdef HAVE_SYS_SENDFILE_H
  28. #include <sys/sendfile.h>
  29. #endif
  30. #define G_DISPATCHER_ERROR dispatcher_error_quark()
  31. #define debug_ip(...) rspamd_conditional_debug(rspamd_main->logger, d->peer_addr, __FUNCTION__, __VA_ARGS__)
  32. static void dispatcher_cb (gint fd, short what, void *arg);
  33. static inline GQuark
  34. dispatcher_error_quark (void)
  35. {
  36. return g_quark_from_static_string ("g-dispatcher-error-quark");
  37. }
  38. static gboolean
  39. sendfile_callback (rspamd_io_dispatcher_t *d)
  40. {
  41. GError *err;
  42. #ifdef HAVE_SENDFILE
  43. # if defined(FREEBSD) || defined(DARWIN)
  44. off_t off = 0;
  45. #if defined(FREEBSD)
  46. /* FreeBSD version */
  47. if (sendfile (d->sendfile_fd, d->fd, d->offset, 0, NULL, &off, 0) != 0) {
  48. #elif defined(DARWIN)
  49. /* Darwin version */
  50. if (sendfile (d->sendfile_fd, d->fd, d->offset, &off, NULL, 0) != 0) {
  51. #endif
  52. if (errno != EAGAIN) {
  53. if (d->err_callback) {
  54. err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
  55. d->err_callback (err, d->user_data);
  56. return FALSE;
  57. }
  58. }
  59. else {
  60. debug_ip("partially write data, retry");
  61. /* Wait for other event */
  62. d->offset += off;
  63. event_del (d->ev);
  64. event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
  65. event_base_set (d->ev_base, d->ev);
  66. event_add (d->ev, d->tv);
  67. }
  68. }
  69. else {
  70. if (d->write_callback) {
  71. if (!d->write_callback (d->user_data)) {
  72. debug_ip("callback set wanna_die flag, terminating");
  73. return FALSE;
  74. }
  75. }
  76. event_del (d->ev);
  77. event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
  78. event_base_set (d->ev_base, d->ev);
  79. event_add (d->ev, d->tv);
  80. d->in_sendfile = FALSE;
  81. }
  82. # else
  83. ssize_t r;
  84. /* Linux version */
  85. r = sendfile (d->fd, d->sendfile_fd, &d->offset, d->file_size);
  86. if (r == -1) {
  87. if (errno != EAGAIN) {
  88. if (d->err_callback) {
  89. err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
  90. d->err_callback (err, d->user_data);
  91. return FALSE;
  92. }
  93. }
  94. else {
  95. debug_ip("partially write data, retry");
  96. /* Wait for other event */
  97. event_del (d->ev);
  98. event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
  99. event_base_set (d->ev_base, d->ev);
  100. event_add (d->ev, d->tv);
  101. }
  102. }
  103. else if (r + d->offset < (ssize_t)d->file_size) {
  104. debug_ip("partially write data, retry");
  105. /* Wait for other event */
  106. event_del (d->ev);
  107. event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
  108. event_base_set (d->ev_base, d->ev);
  109. event_add (d->ev, d->tv);
  110. }
  111. else {
  112. if (d->write_callback) {
  113. if (!d->write_callback (d->user_data)) {
  114. debug_ip("callback set wanna_die flag, terminating");
  115. return FALSE;
  116. }
  117. }
  118. event_del (d->ev);
  119. event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
  120. event_base_set (d->ev_base, d->ev);
  121. event_add (d->ev, d->tv);
  122. d->in_sendfile = FALSE;
  123. }
  124. # endif
  125. #else
  126. ssize_t r;
  127. r = write (d->fd, d->map, d->file_size - d->offset);
  128. if (r == -1) {
  129. if (errno != EAGAIN) {
  130. if (d->err_callback) {
  131. err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
  132. d->err_callback (err, d->user_data);
  133. return FALSE;
  134. }
  135. }
  136. else {
  137. debug_ip("partially write data, retry");
  138. /* Wait for other event */
  139. event_del (d->ev);
  140. event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
  141. event_base_set (d->ev_base, d->ev);
  142. event_add (d->ev, d->tv);
  143. }
  144. }
  145. else if (r + d->offset < d->file_size) {
  146. d->offset += r;
  147. debug_ip("partially write data, retry");
  148. /* Wait for other event */
  149. event_del (d->ev);
  150. event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
  151. event_base_set (d->ev_base, d->ev);
  152. event_add (d->ev, d->tv);
  153. }
  154. else {
  155. if (d->write_callback) {
  156. if (!d->write_callback (d->user_data)) {
  157. debug_ip("callback set wanna_die flag, terminating");
  158. return FALSE;
  159. }
  160. }
  161. event_del (d->ev);
  162. event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
  163. event_base_set (d->ev_base, d->ev);
  164. event_add (d->ev, d->tv);
  165. d->in_sendfile = FALSE;
  166. }
  167. #endif
  168. return TRUE;
  169. }
  170. #define BUFREMAIN(x) (x)->data->size - ((x)->pos - (x)->data->begin)
  171. #define APPEND_OUT_BUFFER(d, buf) do { \
  172. DL_APPEND((d)->out_buffers.buffers, buf); \
  173. (d)->out_buffers.pending ++; \
  174. } while (0)
  175. #define DELETE_OUT_BUFFER(d, buf) do { \
  176. DL_DELETE((d)->out_buffers.buffers, (buf)); \
  177. g_string_free((buf->data), (buf)->allocated); \
  178. g_slice_free1(sizeof (struct rspamd_out_buffer_s), (buf)); \
  179. (d)->out_buffers.pending --; \
  180. } while (0)
  181. static gboolean
  182. write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed)
  183. {
  184. GError *err = NULL;
  185. struct rspamd_out_buffer_s *cur = NULL, *tmp;
  186. ssize_t r;
  187. struct iovec *iov;
  188. guint i, len;
  189. len = d->out_buffers.pending;
  190. while (len > 0) {
  191. /* Unset delayed as actually we HAVE buffers to write */
  192. is_delayed = TRUE;
  193. iov = g_slice_alloc (len * sizeof (struct iovec));
  194. i = 0;
  195. DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp) {
  196. iov[i].iov_base = cur->data->str;
  197. iov[i].iov_len = cur->data->len;
  198. i ++;
  199. }
  200. /* Now try to write the whole vector */
  201. r = writev (fd, iov, len);
  202. if (r == -1 && errno != EAGAIN) {
  203. g_slice_free1 (len * sizeof (struct iovec), iov);
  204. if (d->err_callback) {
  205. err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
  206. d->err_callback (err, d->user_data);
  207. return FALSE;
  208. }
  209. }
  210. else if (r > 0) {
  211. /* Find pos inside buffers */
  212. DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp) {
  213. if (r >= (ssize_t)cur->data->len) {
  214. /* Mark this buffer as read */
  215. r -= cur->data->len;
  216. DELETE_OUT_BUFFER (d, cur);
  217. }
  218. else {
  219. /* This buffer was not written completely */
  220. g_string_erase (cur->data, 0, r);
  221. break;
  222. }
  223. }
  224. g_slice_free1 (len * sizeof (struct iovec), iov);
  225. if (d->out_buffers.pending > 0) {
  226. /* Wait for other event */
  227. event_del (d->ev);
  228. event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
  229. event_base_set (d->ev_base, d->ev);
  230. event_add (d->ev, d->tv);
  231. return TRUE;
  232. }
  233. }
  234. else if (r == 0) {
  235. /* Got EOF while we wait for data */
  236. g_slice_free1 (len * sizeof (struct iovec), iov);
  237. if (d->err_callback) {
  238. err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF");
  239. d->err_callback (err, d->user_data);
  240. return FALSE;
  241. }
  242. }
  243. else if (r == -1 && errno == EAGAIN) {
  244. g_slice_free1 (len * sizeof (struct iovec), iov);
  245. debug_ip("partially write data, retry");
  246. /* Wait for other event */
  247. event_del (d->ev);
  248. event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
  249. event_base_set (d->ev_base, d->ev);
  250. event_add (d->ev, d->tv);
  251. return TRUE;
  252. }
  253. len = d->out_buffers.pending;
  254. }
  255. if (d->out_buffers.pending == 0) {
  256. /* Disable write event for this time */
  257. debug_ip ("all buffers were written successfully");
  258. if (is_delayed && d->write_callback) {
  259. if (!d->write_callback (d->user_data)) {
  260. debug_ip("callback set wanna_die flag, terminating");
  261. return FALSE;
  262. }
  263. }
  264. event_del (d->ev);
  265. event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
  266. event_base_set (d->ev_base, d->ev);
  267. event_add (d->ev, d->tv);
  268. }
  269. else {
  270. /* Plan other write event */
  271. event_del (d->ev);
  272. event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
  273. event_base_set (d->ev_base, d->ev);
  274. event_add (d->ev, d->tv);
  275. }
  276. return TRUE;
  277. }
  278. static void
  279. read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read)
  280. {
  281. ssize_t r;
  282. GError *err = NULL;
  283. f_str_t res;
  284. gchar *c, *b;
  285. gchar *end;
  286. size_t len;
  287. enum io_policy saved_policy;
  288. if (d->wanna_die) {
  289. rspamd_remove_dispatcher (d);
  290. return;
  291. }
  292. if (d->in_buf == NULL) {
  293. d->in_buf = memory_pool_alloc_tmp (d->pool, sizeof (rspamd_buffer_t));
  294. if (d->policy == BUFFER_LINE || d->policy == BUFFER_ANY) {
  295. d->in_buf->data = fstralloc_tmp (d->pool, d->default_buf_size);
  296. }
  297. else {
  298. d->in_buf->data = fstralloc_tmp (d->pool, d->nchars + 1);
  299. }
  300. d->in_buf->pos = d->in_buf->data->begin;
  301. }
  302. end = d->in_buf->pos;
  303. len = d->in_buf->data->len;
  304. if (BUFREMAIN (d->in_buf) == 0) {
  305. /* Buffer is full, try to call callback with overflow error */
  306. if (d->err_callback) {
  307. err = g_error_new (G_DISPATCHER_ERROR, E2BIG, "buffer overflow");
  308. d->err_callback (err, d->user_data);
  309. return;
  310. }
  311. }
  312. else if (!skip_read) {
  313. /* Try to read the whole buffer */
  314. r = read (fd, end, BUFREMAIN (d->in_buf));
  315. if (r == -1 && errno != EAGAIN) {
  316. if (d->err_callback) {
  317. err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
  318. d->err_callback (err, d->user_data);
  319. return;
  320. }
  321. }
  322. else if (r == 0) {
  323. /* Got EOF while we wait for data */
  324. #if 0
  325. if (d->err_callback) {
  326. err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF");
  327. d->err_callback (err, d->user_data);
  328. return;
  329. }
  330. #endif
  331. /* Read returned 0, it may be shutdown or full quit */
  332. if (!d->want_read) {
  333. d->half_closed = TRUE;
  334. /* Do not expect any read after this */
  335. event_del (d->ev);
  336. }
  337. else {
  338. if (d->err_callback) {
  339. err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF");
  340. d->err_callback (err, d->user_data);
  341. return;
  342. }
  343. }
  344. }
  345. else if (r == -1 && errno == EAGAIN) {
  346. debug_ip("partially read data, retry");
  347. return;
  348. }
  349. else {
  350. /* Set current position in buffer */
  351. d->in_buf->pos += r;
  352. d->in_buf->data->len += r;
  353. }
  354. debug_ip("read %z characters, policy is %s, watermark is: %z, buffer has %z bytes", r,
  355. d->policy == BUFFER_LINE ? "LINE" : "CHARACTER", d->nchars, d->in_buf->data->len);
  356. }
  357. saved_policy = d->policy;
  358. c = d->in_buf->data->begin;
  359. end = d->in_buf->pos;
  360. len = d->in_buf->data->len;
  361. b = c;
  362. r = 0;
  363. switch (d->policy) {
  364. case BUFFER_LINE:
  365. /** Variables:
  366. * b - begin of line
  367. * r - current position in buffer
  368. * *len - length of remaining buffer
  369. * c - pointer to current position (buffer->begin + r)
  370. * res - result string
  371. */
  372. while (r < (ssize_t)len) {
  373. if (*c == '\n') {
  374. res.begin = b;
  375. res.len = c - b;
  376. /* Strip EOL */
  377. if (d->strip_eol) {
  378. if (r != 0 && *(c - 1) == '\r') {
  379. res.len--;
  380. }
  381. }
  382. else {
  383. /* Include EOL in reply */
  384. res.len ++;
  385. }
  386. /* Call callback for a line */
  387. if (d->read_callback) {
  388. if (!d->read_callback (&res, d->user_data)) {
  389. return;
  390. }
  391. if (d->policy != saved_policy) {
  392. /* Drain buffer as policy is changed */
  393. /* Note that d->in_buffer is other pointer now, so we need to reinit all pointers */
  394. /* First detect how much symbols do we have */
  395. if (end == c) {
  396. /* In fact we read the whole buffer and change input policy, so just set current pos to begin of buffer */
  397. d->in_buf->pos = d->in_buf->data->begin;
  398. d->in_buf->data->len = 0;
  399. }
  400. else {
  401. /* Otherwise we need to move buffer */
  402. /* Reinit pointers */
  403. len = d->in_buf->data->len - r - 1;
  404. end = d->in_buf->data->begin + r + 1;
  405. memmove (d->in_buf->data->begin, end, len);
  406. d->in_buf->data->len = len;
  407. d->in_buf->pos = d->in_buf->data->begin + len;
  408. /* Process remaining buffer */
  409. read_buffers (fd, d, TRUE);
  410. }
  411. return;
  412. }
  413. }
  414. /* Set new begin of line */
  415. b = c + 1;
  416. }
  417. r++;
  418. c++;
  419. }
  420. /* Now drain remaining characters in buffer */
  421. memmove (d->in_buf->data->begin, b, c - b);
  422. d->in_buf->data->len = c - b;
  423. d->in_buf->pos = d->in_buf->data->begin + (c - b);
  424. break;
  425. case BUFFER_CHARACTER:
  426. r = d->nchars;
  427. if ((ssize_t)len >= r) {
  428. res.begin = b;
  429. res.len = r;
  430. c = b + r;
  431. if (d->read_callback) {
  432. if (!d->read_callback (&res, d->user_data)) {
  433. return;
  434. }
  435. /* Move remaining string to begin of buffer (draining) */
  436. if ((ssize_t)len > r) {
  437. len -= r;
  438. memmove (d->in_buf->data->begin, c, len);
  439. d->in_buf->data->len = len;
  440. d->in_buf->pos = d->in_buf->data->begin + len;
  441. b = d->in_buf->data->begin;
  442. }
  443. else {
  444. d->in_buf->data->len = 0;
  445. d->in_buf->pos = d->in_buf->data->begin;
  446. }
  447. if (d->policy != saved_policy && (ssize_t)len != r) {
  448. debug_ip("policy changed during callback, restart buffer's processing");
  449. read_buffers (fd, d, TRUE);
  450. return;
  451. }
  452. }
  453. }
  454. break;
  455. case BUFFER_ANY:
  456. res.begin = d->in_buf->data->begin;
  457. res.len = len;
  458. if (d->read_callback) {
  459. /*
  460. * Actually we do not want to send zero sized
  461. * buffers to a read callback
  462. */
  463. if (! (d->want_read && res.len == 0)) {
  464. if (!d->read_callback (&res, d->user_data)) {
  465. return;
  466. }
  467. }
  468. if (d->policy != saved_policy) {
  469. debug_ip("policy changed during callback, restart buffer's processing");
  470. read_buffers (fd, d, TRUE);
  471. return;
  472. }
  473. }
  474. d->in_buf->pos = d->in_buf->data->begin;
  475. d->in_buf->data->len = 0;
  476. break;
  477. }
  478. }
  479. #undef BUFREMAIN
  480. static void
  481. dispatcher_cb (gint fd, short what, void *arg)
  482. {
  483. rspamd_io_dispatcher_t *d = (rspamd_io_dispatcher_t *) arg;
  484. GError *err = NULL;
  485. debug_ip("in dispatcher callback, what: %d, fd: %d", (gint)what, fd);
  486. if ((what & EV_TIMEOUT) != 0) {
  487. if (d->err_callback) {
  488. err = g_error_new (G_DISPATCHER_ERROR, ETIMEDOUT, "IO timeout");
  489. d->err_callback (err, d->user_data);
  490. }
  491. }
  492. else if ((what & EV_READ) != 0) {
  493. read_buffers (fd, d, FALSE);
  494. }
  495. else if ((what & EV_WRITE) != 0) {
  496. /* No data to write, disable further EV_WRITE to this fd */
  497. if (d->in_sendfile) {
  498. sendfile_callback (d);
  499. }
  500. else {
  501. if (d->out_buffers.pending == 0) {
  502. if (d->half_closed && !d->is_restored) {
  503. /* Socket is half closed and there is nothing more to write, closing connection */
  504. if (d->err_callback) {
  505. err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF");
  506. d->err_callback (err, d->user_data);
  507. return;
  508. }
  509. }
  510. else {
  511. /* Want read again */
  512. event_del (d->ev);
  513. event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
  514. event_base_set (d->ev_base, d->ev);
  515. event_add (d->ev, d->tv);
  516. if (d->is_restored && d->write_callback) {
  517. if (!d->write_callback (d->user_data)) {
  518. return;
  519. }
  520. d->is_restored = FALSE;
  521. }
  522. }
  523. }
  524. else {
  525. /* Delayed write */
  526. write_buffers (fd, d, TRUE);
  527. }
  528. }
  529. }
  530. }
  531. rspamd_io_dispatcher_t *
  532. rspamd_create_dispatcher (struct event_base *base, gint fd, enum io_policy policy,
  533. dispatcher_read_callback_t read_cb, dispatcher_write_callback_t write_cb, dispatcher_err_callback_t err_cb, struct timeval *tv, void *user_data)
  534. {
  535. rspamd_io_dispatcher_t *new;
  536. if (fd == -1) {
  537. return NULL;
  538. }
  539. new = g_slice_alloc0 (sizeof (rspamd_io_dispatcher_t));
  540. new->pool = memory_pool_new (memory_pool_get_size ());
  541. if (tv != NULL) {
  542. new->tv = memory_pool_alloc (new->pool, sizeof (struct timeval));
  543. memcpy (new->tv, tv, sizeof (struct timeval));
  544. }
  545. else {
  546. new->tv = NULL;
  547. }
  548. new->nchars = 0;
  549. new->in_sendfile = FALSE;
  550. new->policy = policy;
  551. new->read_callback = read_cb;
  552. new->write_callback = write_cb;
  553. new->err_callback = err_cb;
  554. new->user_data = user_data;
  555. new->strip_eol = TRUE;
  556. new->half_closed = FALSE;
  557. new->want_read = TRUE;
  558. new->is_restored = FALSE;
  559. new->default_buf_size = sysconf (_SC_PAGESIZE);
  560. new->ev = memory_pool_alloc0 (new->pool, sizeof (struct event));
  561. new->fd = fd;
  562. new->ev_base = base;
  563. event_set (new->ev, fd, EV_WRITE, dispatcher_cb, (void *)new);
  564. event_base_set (new->ev_base, new->ev);
  565. event_add (new->ev, new->tv);
  566. return new;
  567. }
  568. void
  569. rspamd_remove_dispatcher (rspamd_io_dispatcher_t * d)
  570. {
  571. struct rspamd_out_buffer_s *cur, *tmp;
  572. if (d != NULL) {
  573. DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp) {
  574. DELETE_OUT_BUFFER (d, cur);
  575. }
  576. event_del (d->ev);
  577. memory_pool_delete (d->pool);
  578. g_slice_free1 (sizeof (rspamd_io_dispatcher_t), d);
  579. }
  580. }
  581. void
  582. rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t * d, enum io_policy policy, size_t nchars)
  583. {
  584. f_str_t *tmp;
  585. gint t;
  586. if (d->policy != policy || nchars != d->nchars) {
  587. d->policy = policy;
  588. d->nchars = nchars ? nchars : d->default_buf_size;
  589. /* Resize input buffer if needed */
  590. if (policy == BUFFER_CHARACTER && nchars != 0) {
  591. if (d->in_buf && d->in_buf->data->size < nchars) {
  592. tmp = fstralloc_tmp (d->pool, d->nchars + 1);
  593. memcpy (tmp->begin, d->in_buf->data->begin, d->in_buf->data->len);
  594. t = d->in_buf->pos - d->in_buf->data->begin;
  595. tmp->len = d->in_buf->data->len;
  596. d->in_buf->data = tmp;
  597. d->in_buf->pos = d->in_buf->data->begin + t;
  598. }
  599. }
  600. else if (policy == BUFFER_LINE || policy == BUFFER_ANY) {
  601. if (d->in_buf && d->nchars < d->default_buf_size) {
  602. tmp = fstralloc_tmp (d->pool, d->default_buf_size);
  603. memcpy (tmp->begin, d->in_buf->data->begin, d->in_buf->data->len);
  604. t = d->in_buf->pos - d->in_buf->data->begin;
  605. tmp->len = d->in_buf->data->len;
  606. d->in_buf->data = tmp;
  607. d->in_buf->pos = d->in_buf->data->begin + t;
  608. }
  609. d->strip_eol = TRUE;
  610. }
  611. }
  612. debug_ip("new input length watermark is %uz", d->nchars);
  613. }
  614. gboolean
  615. rspamd_dispatcher_write (rspamd_io_dispatcher_t * d,
  616. const void *data, size_t len, gboolean delayed, gboolean allocated)
  617. {
  618. struct rspamd_out_buffer_s *newbuf;
  619. newbuf = g_slice_alloc (sizeof (struct rspamd_out_buffer_s));
  620. if (len == 0) {
  621. /* Assume NULL terminated */
  622. len = strlen ((const gchar *)data);
  623. }
  624. if (!allocated) {
  625. newbuf->data = g_string_new_len (data, len);
  626. newbuf->allocated = TRUE;
  627. }
  628. else {
  629. newbuf->data = g_string_new (NULL);
  630. newbuf->data->str = (gchar *)data;
  631. newbuf->data->len = len;
  632. newbuf->data->allocated_len = len;
  633. newbuf->allocated = FALSE;
  634. }
  635. APPEND_OUT_BUFFER (d, newbuf);
  636. if (!delayed) {
  637. debug_ip("plan write event");
  638. return write_buffers (d->fd, d, FALSE);
  639. }
  640. /* Otherwise plan write event */
  641. event_del (d->ev);
  642. event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
  643. event_base_set (d->ev_base, d->ev);
  644. event_add (d->ev, d->tv);
  645. return TRUE;
  646. }
  647. gboolean rspamd_dispatcher_write_string (rspamd_io_dispatcher_t *d,
  648. GString *str,
  649. gboolean delayed,
  650. gboolean free_on_write)
  651. {
  652. struct rspamd_out_buffer_s *newbuf;
  653. newbuf = g_slice_alloc (sizeof (struct rspamd_out_buffer_s));
  654. newbuf->data = str;
  655. newbuf->allocated = free_on_write;
  656. APPEND_OUT_BUFFER (d, newbuf);
  657. if (!delayed) {
  658. debug_ip("plan write event");
  659. return write_buffers (d->fd, d, FALSE);
  660. }
  661. /* Otherwise plan write event */
  662. event_del (d->ev);
  663. event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
  664. event_base_set (d->ev_base, d->ev);
  665. event_add (d->ev, d->tv);
  666. return TRUE;
  667. }
  668. gboolean
  669. rspamd_dispatcher_sendfile (rspamd_io_dispatcher_t *d, gint fd, size_t len)
  670. {
  671. if (lseek (fd, 0, SEEK_SET) == -1) {
  672. msg_warn ("lseek failed: %s", strerror (errno));
  673. return FALSE;
  674. }
  675. d->offset = 0;
  676. d->in_sendfile = TRUE;
  677. d->sendfile_fd = fd;
  678. d->file_size = len;
  679. #ifndef HAVE_SENDFILE
  680. #ifdef HAVE_MMAP_NOCORE
  681. if ((d->map = mmap (NULL, len, PROT_READ, MAP_SHARED | MAP_NOCORE, fd, 0)) == MAP_FAILED) {
  682. #else
  683. if ((d->map = mmap (NULL, len, PROT_READ, MAP_SHARED, fd, 0)) == MAP_FAILED) {
  684. #endif
  685. msg_warn ("mmap failed: %s", strerror (errno));
  686. return FALSE;
  687. }
  688. #endif
  689. return sendfile_callback (d);
  690. }
  691. void
  692. rspamd_dispatcher_pause (rspamd_io_dispatcher_t * d)
  693. {
  694. debug_ip ("paused dispatcher");
  695. event_del (d->ev);
  696. d->is_restored = FALSE;
  697. }
  698. void
  699. rspamd_dispatcher_restore (rspamd_io_dispatcher_t * d)
  700. {
  701. if (!d->is_restored) {
  702. debug_ip ("restored dispatcher");
  703. event_del (d->ev);
  704. event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, d);
  705. event_base_set (d->ev_base, d->ev);
  706. event_add (d->ev, d->tv);
  707. d->is_restored = TRUE;
  708. }
  709. }
  710. void
  711. rspamd_dispacther_cleanup (rspamd_io_dispatcher_t *d)
  712. {
  713. struct rspamd_out_buffer_s *cur, *tmp;
  714. DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp) {
  715. DELETE_OUT_BUFFER (d, cur);
  716. }
  717. /* Cleanup temporary data */
  718. memory_pool_cleanup_tmp (d->pool);
  719. d->in_buf = NULL;
  720. }
  721. #undef debug_ip
  722. /*
  723. * vi:ts=4
  724. */