1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
|
/**
* @file buffer.h
* Implements buffered IO
*/
#ifndef RSPAMD_BUFFER_H
#define RSPAMD_BUFFER_H
#include "config.h"
#include "mem_pool.h"
#include "fstring.h"
typedef gboolean (*dispatcher_read_callback_t)(f_str_t *in, void *user_data);
typedef gboolean (*dispatcher_write_callback_t)(void *user_data);
typedef void (*dispatcher_err_callback_t)(GError *err, void *user_data);
/**
* Types of IO handling
*/
enum io_policy {
BUFFER_LINE, /**< call handler when we have line ready */
BUFFER_CHARACTER, /**< call handler when we have some characters */
BUFFER_ANY /**< call handler whenever we got data in buffer */
};
/**
* Buffer structure
*/
typedef struct rspamd_buffer_s {
f_str_t *data; /**< buffer logic */
gchar *pos; /**< current position */
} rspamd_buffer_t;
typedef struct rspamd_io_dispatcher_s {
rspamd_buffer_t *in_buf; /**< input buffer */
GQueue *out_buffers; /**< out buffers chain */
struct timeval *tv; /**< io timeout */
struct event *ev; /**< libevent io event */
memory_pool_t *pool; /**< where to store data */
enum io_policy policy; /**< IO policy */
size_t nchars; /**< how many chars to read */
gint fd; /**< descriptor */
guint32 peer_addr; /**< address of peer for debugging */
gboolean wanna_die; /**< if dispatcher should be stopped */
dispatcher_read_callback_t read_callback; /**< read callback */
dispatcher_write_callback_t write_callback; /**< write callback */
dispatcher_err_callback_t err_callback; /**< error callback */
void *user_data; /**< user's data for callbacks */
off_t offset; /**< for sendfile use */
size_t file_size;
gint sendfile_fd;
gboolean in_sendfile; /**< whether buffer is in sendfile mode */
gboolean strip_eol; /**< strip or not line ends in BUFFER_LINE policy */
gboolean is_restored; /**< call a callback when dispatcher is restored */
struct event_base *ev_base; /**< event base for io operations */
#ifndef HAVE_SENDFILE
void *map;
#endif
} rspamd_io_dispatcher_t;
/**
* Creates rspamd IO dispatcher for specified descriptor
* @param fd descriptor to IO
* @param policy IO policy
* @param read_cb read callback handler
* @param write_cb write callback handler
* @param err_cb error callback handler
* @param tv IO timeout
* @param user_data pointer to user's data
* @return new dispatcher object or NULL in case of failure
*/
rspamd_io_dispatcher_t* rspamd_create_dispatcher (struct event_base *base, gint fd,
enum io_policy policy,
dispatcher_read_callback_t read_cb,
dispatcher_write_callback_t write_cb,
dispatcher_err_callback_t err_cb,
struct timeval *tv,
void *user_data);
/**
* Set new policy for dispatcher
* @param d pointer to dispatcher's object
* @param policy IO policy
* @param nchars number of characters in buffer for character policy
*/
void rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t *d,
enum io_policy policy,
size_t nchars);
/**
* Write data when it would be possible
* @param d pointer to dispatcher's object
* @param data data to write
* @param len length of data
*/
gboolean rspamd_dispatcher_write (rspamd_io_dispatcher_t *d,
void *data,
size_t len, gboolean delayed, gboolean allocated) G_GNUC_WARN_UNUSED_RESULT;
/**
* Send specified descriptor to dispatcher
* @param d pointer to dispatcher's object
* @param fd descriptor of file
* @param len length of data
*/
gboolean rspamd_dispatcher_sendfile (rspamd_io_dispatcher_t *d, gint fd, size_t len) G_GNUC_WARN_UNUSED_RESULT;
/**
* Pause IO events on dispatcher
* @param d pointer to dispatcher's object
*/
void rspamd_dispatcher_pause (rspamd_io_dispatcher_t *d);
/**
* Restore IO events on dispatcher
* @param d pointer to dispatcher's object
*/
void rspamd_dispatcher_restore (rspamd_io_dispatcher_t *d);
/**
* Frees dispatcher object
* @param dispatcher pointer to dispatcher's object
*/
void rspamd_remove_dispatcher (rspamd_io_dispatcher_t *dispatcher);
/**
* Cleanup dispatcher freeing all temporary data
* @param dispatcher pointer to dispatcher's object
*/
void rspamd_dispacther_cleanup (rspamd_io_dispatcher_t *dispatcher);
#endif
|