You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

buffer.c 17KB


  1. /*
  2. * Copyright (c) 2009, Rambler media
  3. * All rights reserved.
  4. *
  5. * Redistribution and use in source and binary forms, with or without
  6. * modification, are permitted provided that the following conditions are met:
  7. * * Redistributions of source code must retain the above copyright
  8. * notice, this list of conditions and the following disclaimer.
  9. * * Redistributions in binary form must reproduce the above copyright
  10. * notice, this list of conditions and the following disclaimer in the
  11. * documentation and/or other materials provided with the distribution.
  12. *
  13. * THIS SOFTWARE IS PROVIDED BY Rambler media ''AS IS'' AND ANY
  14. * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  15. * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  16. * DISCLAIMED. IN NO EVENT SHALL Rambler BE LIABLE FOR ANY
  17. * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  18. * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  19. * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
  20. * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  21. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  22. * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  23. */
  24. #include "config.h"
  25. #include "buffer.h"
  26. #include "main.h"
  27. #define G_DISPATCHER_ERROR dispatcher_error_quark()
  28. static void dispatcher_cb (int fd, short what, void *arg);
  29. static inline GQuark
  30. dispatcher_error_quark (void)
  31. {
  32. return g_quark_from_static_string ("g-dispatcher-error-quark");
  33. }
  34. static gboolean
  35. sendfile_callback (rspamd_io_dispatcher_t *d)
  36. {
  37. ssize_t r;
  38. GError *err;
  39. #ifdef HAVE_SENDFILE
  40. #if defined(FREEBSD) || defined(DARWIN)
  41. off_t off = 0;
  42. /* FreeBSD version */
  43. if (sendfile (d->sendfile_fd, d->fd, d->offset, 0, 0, &off, 0) != 0) {
  44. if (errno != EAGAIN) {
  45. if (d->err_callback) {
  46. err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
  47. d->err_callback (err, d->user_data);
  48. return FALSE;
  49. }
  50. }
  51. else {
  52. debug_ip (d->peer_addr, "partially write data, retry");
  53. /* Wait for other event */
  54. d->offset += off;
  55. event_del (d->ev);
  56. event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
  57. event_add (d->ev, d->tv);
  58. }
  59. }
  60. else {
  61. if (d->write_callback) {
  62. if (!d->write_callback (d->user_data)) {
  63. debug_ip (d->peer_addr, "callback set wanna_die flag, terminating");
  64. return FALSE;
  65. }
  66. }
  67. event_del (d->ev);
  68. event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
  69. event_add (d->ev, d->tv);
  70. d->in_sendfile = FALSE;
  71. }
  72. #else
  73. /* Linux version */
  74. r = sendfile (d->fd, d->sendfile_fd, &d->offset, d->file_size);
  75. if (r == -1) {
  76. if (errno != EAGAIN) {
  77. if (d->err_callback) {
  78. err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
  79. d->err_callback (err, d->user_data);
  80. return FALSE;
  81. }
  82. }
  83. else {
  84. debug_ip (d->peer_addr, "partially write data, retry");
  85. /* Wait for other event */
  86. event_del (d->ev);
  87. event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
  88. event_add (d->ev, d->tv);
  89. }
  90. }
  91. else if (r + d->offset < d->file_size) {
  92. debug_ip (d->peer_addr, "partially write data, retry");
  93. /* Wait for other event */
  94. event_del (d->ev);
  95. event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
  96. event_add (d->ev, d->tv);
  97. }
  98. else {
  99. if (d->write_callback) {
  100. if (!d->write_callback (d->user_data)) {
  101. debug_ip (d->peer_addr, "callback set wanna_die flag, terminating");
  102. return FALSE;
  103. }
  104. }
  105. event_del (d->ev);
  106. event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
  107. event_add (d->ev, d->tv);
  108. d->in_sendfile = FALSE;
  109. }
  110. #endif
  111. #else
  112. r = write (d->fd, d->map, d->file_size - d->offset);
  113. if (r == -1) {
  114. if (errno != EAGAIN) {
  115. if (d->err_callback) {
  116. err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
  117. d->err_callback (err, d->user_data);
  118. return FALSE;
  119. }
  120. }
  121. else {
  122. debug_ip (d->peer_addr, "partially write data, retry");
  123. /* Wait for other event */
  124. event_del (d->ev);
  125. event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
  126. event_add (d->ev, d->tv);
  127. }
  128. }
  129. else if (r + d->offset < d->file_size) {
  130. d->offset += r;
  131. debug_ip (d->peer_addr, "partially write data, retry");
  132. /* Wait for other event */
  133. event_del (d->ev);
  134. event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
  135. event_add (d->ev, d->tv);
  136. }
  137. else {
  138. if (d->write_callback) {
  139. if (!d->write_callback (d->user_data)) {
  140. debug_ip (d->peer_addr, "callback set wanna_die flag, terminating");
  141. return FALSE;
  142. }
  143. }
  144. event_del (d->ev);
  145. event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
  146. event_add (d->ev, d->tv);
  147. d->in_sendfile = FALSE;
  148. }
  149. #endif
  150. return TRUE;
  151. }
  152. #define BUFREMAIN(x) (x)->data->size - ((x)->pos - (x)->data->begin)
  153. static gboolean
  154. write_buffers (int fd, rspamd_io_dispatcher_t * d, gboolean is_delayed)
  155. {
  156. GList *cur;
  157. GError *err;
  158. rspamd_buffer_t *buf;
  159. ssize_t r;
  160. /* Fix order */
  161. if (d->out_buffers) {
  162. d->out_buffers = g_list_reverse (d->out_buffers);
  163. }
  164. cur = g_list_first (d->out_buffers);
  165. while (cur) {
  166. buf = (rspamd_buffer_t *) cur->data;
  167. if (BUFREMAIN (buf) == 0) {
  168. /* Skip empty buffers */
  169. cur = g_list_next (cur);
  170. continue;
  171. }
  172. r = write (fd, buf->pos, BUFREMAIN (buf));
  173. if (r == -1 && errno != EAGAIN) {
  174. if (d->err_callback) {
  175. err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
  176. d->err_callback (err, d->user_data);
  177. return FALSE;
  178. }
  179. }
  180. else if (r > 0) {
  181. buf->pos += r;
  182. if (BUFREMAIN (buf) != 0) {
  183. /* Continue with this buffer */
  184. debug_ip (d->peer_addr, "wrote %z bytes of %z", r, buf->data->len);
  185. continue;
  186. }
  187. }
  188. else if (r == 0) {
  189. /* Got EOF while we wait for data */
  190. if (d->err_callback) {
  191. err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF");
  192. d->err_callback (err, d->user_data);
  193. return FALSE;
  194. }
  195. }
  196. else if (r == -1 && errno == EAGAIN) {
  197. debug_ip (d->peer_addr, "partially write data, retry");
  198. /* Wait for other event */
  199. event_del (d->ev);
  200. event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
  201. event_add (d->ev, d->tv);
  202. return TRUE;
  203. }
  204. cur = g_list_next (cur);
  205. }
  206. if (cur == NULL) {
  207. /* Disable write event for this time */
  208. g_list_free (d->out_buffers);
  209. d->out_buffers = NULL;
  210. debug_ip (d->peer_addr, "all buffers were written successfully");
  211. if (is_delayed && d->write_callback) {
  212. if (!d->write_callback (d->user_data)) {
  213. debug_ip (d->peer_addr, "callback set wanna_die flag, terminating");
  214. return FALSE;
  215. }
  216. }
  217. event_del (d->ev);
  218. event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
  219. event_add (d->ev, d->tv);
  220. }
  221. else {
  222. /* Plan other write event */
  223. event_del (d->ev);
  224. event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
  225. event_add (d->ev, d->tv);
  226. }
  227. return TRUE;
  228. }
  229. static void
  230. read_buffers (int fd, rspamd_io_dispatcher_t * d, gboolean skip_read)
  231. {
  232. ssize_t r;
  233. GError *err;
  234. f_str_t res;
  235. char *c, *b;
  236. char *end;
  237. size_t len;
  238. enum io_policy saved_policy;
  239. if (d->wanna_die) {
  240. rspamd_remove_dispatcher (d);
  241. return;
  242. }
  243. if (d->in_buf == NULL) {
  244. d->in_buf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t));
  245. if (d->policy == BUFFER_LINE || d->policy == BUFFER_ANY) {
  246. d->in_buf->data = fstralloc (d->pool, BUFSIZ);
  247. }
  248. else {
  249. d->in_buf->data = fstralloc (d->pool, d->nchars);
  250. }
  251. d->in_buf->pos = d->in_buf->data->begin;
  252. }
  253. end = d->in_buf->pos;
  254. len = d->in_buf->data->len;
  255. if (BUFREMAIN (d->in_buf) == 0) {
  256. /* Buffer is full, try to call callback with overflow error */
  257. if (d->err_callback) {
  258. err = g_error_new (G_DISPATCHER_ERROR, E2BIG, "buffer overflow");
  259. d->err_callback (err, d->user_data);
  260. return;
  261. }
  262. }
  263. else if (!skip_read) {
  264. /* Try to read the whole buffer */
  265. r = read (fd, end, BUFREMAIN (d->in_buf));
  266. if (r == -1 && errno != EAGAIN) {
  267. if (d->err_callback) {
  268. err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
  269. d->err_callback (err, d->user_data);
  270. return;
  271. }
  272. }
  273. else if (r == 0) {
  274. /* Got EOF while we wait for data */
  275. if (d->err_callback) {
  276. err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF");
  277. d->err_callback (err, d->user_data);
  278. return;
  279. }
  280. }
  281. else if (r == -1 && errno == EAGAIN) {
  282. debug_ip (d->peer_addr, "partially read data, retry");
  283. return;
  284. }
  285. else {
  286. /* Set current position in buffer */
  287. d->in_buf->pos += r;
  288. d->in_buf->data->len += r;
  289. }
  290. debug_ip (d->peer_addr, "read %z characters, policy is %s, watermark is: %z", r,
  291. d->policy == BUFFER_LINE ? "LINE" : "CHARACTER", d->nchars);
  292. }
  293. saved_policy = d->policy;
  294. c = d->in_buf->data->begin;
  295. end = d->in_buf->pos;
  296. len = d->in_buf->data->len;
  297. b = c;
  298. r = 0;
  299. switch (d->policy) {
  300. case BUFFER_LINE:
  301. /** Variables:
  302. * b - begin of line
  303. * r - current position in buffer
  304. * *len - length of remaining buffer
  305. * c - pointer to current position (buffer->begin + r)
  306. * res - result string
  307. */
  308. while (r < len) {
  309. if (*c == '\n') {
  310. res.begin = b;
  311. res.len = c - b;
  312. /* Strip EOL */
  313. if (d->strip_eol) {
  314. if (r != 0 && *(c - 1) == '\r') {
  315. res.len--;
  316. }
  317. }
  318. else {
  319. /* Include EOL in reply */
  320. res.len ++;
  321. }
  322. /* Call callback for a line */
  323. if (d->read_callback) {
  324. if (!d->read_callback (&res, d->user_data)) {
  325. return;
  326. }
  327. if (d->policy != saved_policy) {
  328. /* Drain buffer as policy is changed */
  329. /* Note that d->in_buffer is other pointer now, so we need to reinit all pointers */
  330. /* First detect how much symbols do we have */
  331. if (end == c) {
  332. /* In fact we read the whole buffer and change input policy, so just set current pos to begin of buffer */
  333. d->in_buf->pos = d->in_buf->data->begin;
  334. d->in_buf->data->len = 0;
  335. }
  336. else {
  337. /* Otherwise we need to move buffer */
  338. /* Reinit pointers */
  339. len = d->in_buf->data->len - r - 1;
  340. end = d->in_buf->data->begin + r + 1;
  341. memmove (d->in_buf->data->begin, end, len);
  342. d->in_buf->data->len = len;
  343. d->in_buf->pos = d->in_buf->data->begin + len;
  344. /* Process remaining buffer */
  345. read_buffers (fd, d, TRUE);
  346. }
  347. return;
  348. }
  349. }
  350. /* Set new begin of line */
  351. b = c + 1;
  352. }
  353. r++;
  354. c++;
  355. }
  356. /* Now drain remaining characters in buffer */
  357. memmove (d->in_buf->data->begin, b, c - b);
  358. d->in_buf->data->len = c - b;
  359. d->in_buf->pos = d->in_buf->data->begin + (c - b);
  360. break;
  361. case BUFFER_CHARACTER:
  362. r = d->nchars;
  363. if (len >= r) {
  364. res.begin = b;
  365. res.len = r;
  366. c = b + r;
  367. if (d->read_callback) {
  368. if (!d->read_callback (&res, d->user_data)) {
  369. return;
  370. }
  371. /* Move remaining string to begin of buffer (draining) */
  372. if (len > r) {
  373. len -= r;
  374. memmove (d->in_buf->data->begin, c, len);
  375. d->in_buf->data->len = len;
  376. d->in_buf->pos = d->in_buf->data->begin + len;
  377. b = d->in_buf->data->begin;
  378. c = b;
  379. }
  380. if (d->policy != saved_policy && len != r) {
  381. debug_ip (d->peer_addr, "policy changed during callback, restart buffer's processing");
  382. read_buffers (fd, d, TRUE);
  383. return;
  384. }
  385. }
  386. }
  387. break;
  388. case BUFFER_ANY:
  389. res.begin = d->in_buf->data->begin;
  390. res.len = len;
  391. if (d->read_callback) {
  392. if (!d->read_callback (&res, d->user_data)) {
  393. return;
  394. }
  395. if (d->policy != saved_policy) {
  396. debug_ip (d->peer_addr, "policy changed during callback, restart buffer's processing");
  397. read_buffers (fd, d, TRUE);
  398. return;
  399. }
  400. }
  401. d->in_buf->pos = d->in_buf->data->begin;
  402. d->in_buf->data->len = 0;
  403. break;
  404. }
  405. }
  406. #undef BUFREMAIN
  407. static void
  408. dispatcher_cb (int fd, short what, void *arg)
  409. {
  410. rspamd_io_dispatcher_t *d = (rspamd_io_dispatcher_t *) arg;
  411. GError *err;
  412. debug_ip (d->peer_addr, "in dispatcher callback, what: %d, fd: %d", (int)what, fd);
  413. switch (what) {
  414. case EV_TIMEOUT:
  415. if (d->err_callback) {
  416. err = g_error_new (G_DISPATCHER_ERROR, ETIMEDOUT, "IO timeout");
  417. d->err_callback (err, d->user_data);
  418. }
  419. break;
  420. case EV_WRITE:
  421. /* No data to write, disable further EV_WRITE to this fd */
  422. if (d->in_sendfile) {
  423. sendfile_callback (d);
  424. }
  425. else {
  426. if (d->out_buffers == NULL) {
  427. event_del (d->ev);
  428. event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
  429. event_add (d->ev, d->tv);
  430. }
  431. else {
  432. /* Delayed write */
  433. write_buffers (fd, d, TRUE);
  434. }
  435. }
  436. break;
  437. case EV_READ:
  438. read_buffers (fd, d, FALSE);
  439. break;
  440. }
  441. }
  442. rspamd_io_dispatcher_t *
  443. rspamd_create_dispatcher (int fd, enum io_policy policy,
  444. dispatcher_read_callback_t read_cb, dispatcher_write_callback_t write_cb, dispatcher_err_callback_t err_cb, struct timeval *tv, void *user_data)
  445. {
  446. rspamd_io_dispatcher_t *new;
  447. if (fd == -1) {
  448. return NULL;
  449. }
  450. new = g_malloc (sizeof (rspamd_io_dispatcher_t));
  451. bzero (new, sizeof (rspamd_io_dispatcher_t));
  452. new->pool = memory_pool_new (memory_pool_get_size ());
  453. if (tv != NULL) {
  454. new->tv = memory_pool_alloc (new->pool, sizeof (struct timeval));
  455. memcpy (new->tv, tv, sizeof (struct timeval));
  456. }
  457. else {
  458. new->tv = NULL;
  459. }
  460. new->nchars = 0;
  461. new->in_sendfile = FALSE;
  462. new->policy = policy;
  463. new->read_callback = read_cb;
  464. new->write_callback = write_cb;
  465. new->err_callback = err_cb;
  466. new->user_data = user_data;
  467. new->strip_eol = TRUE;
  468. new->ev = memory_pool_alloc0 (new->pool, sizeof (struct event));
  469. new->fd = fd;
  470. event_set (new->ev, fd, EV_WRITE, dispatcher_cb, (void *)new);
  471. event_add (new->ev, new->tv);
  472. return new;
  473. }
  474. void
  475. rspamd_remove_dispatcher (rspamd_io_dispatcher_t * dispatcher)
  476. {
  477. if (dispatcher != NULL) {
  478. event_del (dispatcher->ev);
  479. memory_pool_delete (dispatcher->pool);
  480. if (dispatcher->out_buffers) {
  481. g_list_free (dispatcher->out_buffers);
  482. }
  483. g_free (dispatcher);
  484. }
  485. }
  486. void
  487. rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t * d, enum io_policy policy, size_t nchars)
  488. {
  489. f_str_t *tmp;
  490. int t;
  491. if (d->policy != policy) {
  492. d->policy = policy;
  493. d->nchars = nchars ? nchars : BUFSIZ;
  494. /* Resize input buffer if needed */
  495. if (policy == BUFFER_CHARACTER && nchars != 0) {
  496. if (d->in_buf && d->in_buf->data->size < nchars) {
  497. tmp = fstralloc (d->pool, d->nchars);
  498. memcpy (tmp->begin, d->in_buf->data->begin, d->in_buf->data->len);
  499. t = d->in_buf->pos - d->in_buf->data->begin;
  500. tmp->len = d->in_buf->data->len;
  501. d->in_buf->data = tmp;
  502. d->in_buf->pos = d->in_buf->data->begin + t;
  503. }
  504. }
  505. else if (policy == BUFFER_LINE || policy == BUFFER_ANY) {
  506. if (d->in_buf && d->nchars < BUFSIZ) {
  507. tmp = fstralloc (d->pool, BUFSIZ);
  508. memcpy (tmp->begin, d->in_buf->data->begin, d->in_buf->data->len);
  509. t = d->in_buf->pos - d->in_buf->data->begin;
  510. tmp->len = d->in_buf->data->len;
  511. d->in_buf->data = tmp;
  512. d->in_buf->pos = d->in_buf->data->begin + t;
  513. }
  514. d->strip_eol = TRUE;
  515. }
  516. }
  517. debug_ip (d->peer_addr, "new input length watermark is %uz", d->nchars);
  518. }
  519. gboolean
  520. rspamd_dispatcher_write (rspamd_io_dispatcher_t * d, void *data, size_t len, gboolean delayed, gboolean allocated)
  521. {
  522. rspamd_buffer_t *newbuf;
  523. newbuf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t));
  524. if (len == 0) {
  525. /* Assume NULL terminated */
  526. len = strlen ((char *)data);
  527. }
  528. if (!allocated) {
  529. newbuf->data = fstralloc (d->pool, len);
  530. /* We need to copy data to temporary internal buffer to avoid using of stack variables */
  531. memcpy (newbuf->data->begin, data, len);
  532. }
  533. else {
  534. newbuf->data = memory_pool_alloc (d->pool, sizeof (f_str_t));
  535. newbuf->data->begin = data;
  536. newbuf->data->size = len;
  537. }
  538. newbuf->pos = newbuf->data->begin;
  539. newbuf->data->len = len;
  540. d->out_buffers = g_list_prepend (d->out_buffers, newbuf);
  541. if (!delayed) {
  542. debug_ip (d->peer_addr, "plan write event");
  543. return write_buffers (d->fd, d, FALSE);
  544. }
  545. return TRUE;
  546. }
  547. gboolean
  548. rspamd_dispatcher_sendfile (rspamd_io_dispatcher_t *d, int fd, size_t len)
  549. {
  550. if (lseek (fd, 0, SEEK_SET) == -1) {
  551. msg_warn ("lseek failed: %s", strerror (errno));
  552. return FALSE;
  553. }
  554. d->offset = 0;
  555. d->in_sendfile = TRUE;
  556. d->sendfile_fd = fd;
  557. d->file_size = len;
  558. #ifndef HAVE_SENDFILE
  559. #ifdef HAVE_MMAP_NOCORE
  560. if ((d->map = mmap (NULL, len, PROT_READ, MAP_SHARED | MAP_NOCORE, fd, 0)) == MAP_FAILED) {
  561. #else
  562. if ((d->map = mmap (NULL, len, PROT_READ, MAP_SHARED, fd, 0)) == MAP_FAILED) {
  563. #endif
  564. msg_warn ("mmap failed: %s", strerror (errno));
  565. return FALSE;
  566. }
  567. #endif
  568. return sendfile_callback (d);
  569. }
  570. void
  571. rspamd_dispatcher_pause (rspamd_io_dispatcher_t * d)
  572. {
  573. event_del (d->ev);
  574. }
  575. void
  576. rspamd_dispatcher_restore (rspamd_io_dispatcher_t * d)
  577. {
  578. event_add (d->ev, d->tv);
  579. }
  580. /*
  581. * vi:ts=4
  582. */