summaryrefslogtreecommitdiffstats
path: root/src/buffer.h
blob: 5ed42bfb367268b314e9e2d59ecf19d94d95c2b9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
/**
 * @file buffer.h
 * Implements buffered IO
 */

#ifndef RSPAMD_BUFFER_H
#define RSPAMD_BUFFER_H

#include "config.h"
#include "mem_pool.h"
#include "fstring.h"

typedef gboolean (*dispatcher_read_callback_t)(f_str_t *in, void *user_data);
typedef gboolean (*dispatcher_write_callback_t)(void *user_data);
typedef void (*dispatcher_err_callback_t)(GError *err, void *user_data);

/**
 * Types of IO handling
 */
enum io_policy {
	BUFFER_LINE,													/**< call handler when we have line ready */
	BUFFER_CHARACTER,												/**< call handler when we have some characters */
	BUFFER_ANY														/**< call handler whenever we got data in buffer */
};

/**
 * Buffer structure
 */
typedef struct rspamd_buffer_s {
	f_str_t *data;													/**< buffer logic			*/
	gchar *pos;														/**< current position		*/
} rspamd_buffer_t;

struct rspamd_out_buffer_s {
	GString *data;
	gboolean allocated;
	struct rspamd_out_buffer_s *prev, *next;
};

typedef struct rspamd_io_dispatcher_s {
	rspamd_buffer_t *in_buf;										/**< input buffer			*/
	struct {
		guint pending;
		struct rspamd_out_buffer_s *buffers;
	} out_buffers;													/**< output buffers chain	*/
	struct timeval *tv;												/**< io timeout				*/
	struct event *ev;												/**< libevent io event		*/
	rspamd_mempool_t *pool;											/**< where to store data	*/
	enum io_policy policy;											/**< IO policy				*/
	size_t nchars;													/**< how many chars to read	*/
	gint fd;															/**< descriptor				*/
	guint32 peer_addr;												/**< address of peer for debugging */
	gboolean wanna_die;												/**< if dispatcher should be stopped */
	dispatcher_read_callback_t read_callback;						/**< read callback			*/
	dispatcher_write_callback_t write_callback;						/**< write callback			*/
	dispatcher_err_callback_t err_callback;							/**< error callback			*/
	void *user_data;												/**< user's data for callbacks */
	gulong default_buf_size;										/**< default size for buffering */
	off_t offset;													/**< for sendfile use		*/
	size_t file_size;
	gint sendfile_fd;
	gboolean in_sendfile;											/**< whether buffer is in sendfile mode */
	gboolean strip_eol;												/**< strip or not line ends in BUFFER_LINE policy */
	gboolean is_restored;											/**< call a callback when dispatcher is restored */
	gboolean half_closed;											/**< connection is half closed */
	gboolean want_read;												/**< whether we want to read more data */
	struct event_base *ev_base;										/**< event base for io operations */
#ifndef HAVE_SENDFILE
	void *map;
#endif
} rspamd_io_dispatcher_t;

/**
 * Creates rspamd IO dispatcher for specified descriptor
 * @param fd descriptor to IO
 * @param policy IO policy
 * @param read_cb read callback handler
 * @param write_cb write callback handler
 * @param err_cb error callback handler
 * @param tv IO timeout
 * @param user_data pointer to user's data
 * @return new dispatcher object or NULL in case of failure
 */
rspamd_io_dispatcher_t* rspamd_create_dispatcher (struct event_base *base, gint fd,
												  enum io_policy policy,
												  dispatcher_read_callback_t read_cb,
												  dispatcher_write_callback_t write_cb,
												  dispatcher_err_callback_t err_cb,
												  struct timeval *tv,
												  void *user_data);

/**
 * Set new policy for dispatcher
 * @param d pointer to dispatcher's object
 * @param policy IO policy
 * @param nchars number of characters in buffer for character policy
 */
void rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t *d, 
												  enum io_policy policy,
												  size_t nchars);

/**
 * Write data when it would be possible
 * @param d pointer to dispatcher's object
 * @param data data to write
 * @param len length of data
 */
gboolean rspamd_dispatcher_write (rspamd_io_dispatcher_t *d,
												  const void *data,
												  size_t len, gboolean delayed,
												  gboolean allocated) G_GNUC_WARN_UNUSED_RESULT;

/**
 * Write a GString to dispatcher
 * @param d dipatcher object
 * @param str string to write
 * @param delayed delay write
 * @param free_on_write free string after writing to a socket
 * @return TRUE if write has been queued successfully
 */
gboolean rspamd_dispatcher_write_string (rspamd_io_dispatcher_t *d,
												  GString *str,
												  gboolean delayed,
												  gboolean free_on_write) G_GNUC_WARN_UNUSED_RESULT;

/**
 * Send specified descriptor to dispatcher
 * @param d pointer to dispatcher's object
 * @param fd descriptor of file
 * @param len length of data
 */
gboolean rspamd_dispatcher_sendfile (rspamd_io_dispatcher_t *d, gint fd, size_t len) G_GNUC_WARN_UNUSED_RESULT;

/**
 * Pause IO events on dispatcher
 * @param d pointer to dispatcher's object
 */
void rspamd_dispatcher_pause (rspamd_io_dispatcher_t *d);

/**
 * Restore IO events on dispatcher
 * @param d pointer to dispatcher's object
 */
void rspamd_dispatcher_restore (rspamd_io_dispatcher_t *d);

/**
 * Frees dispatcher object
 * @param dispatcher pointer to dispatcher's object
 */
void rspamd_remove_dispatcher (rspamd_io_dispatcher_t *dispatcher);

/**
 * Cleanup dispatcher freeing all temporary data
 * @param dispatcher pointer to dispatcher's object
 */
void rspamd_dispacther_cleanup (rspamd_io_dispatcher_t *dispatcher);

#endif