aboutsummaryrefslogtreecommitdiffstats
path: root/lualib
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2018-09-05 11:23:18 +0100
committerGitHub <noreply@github.com>2018-09-05 11:23:18 +0100
commit3807688a67be66d00a24172c13b00b6fb1816d69 (patch)
tree0a7922e8dc8a7f75211a04f5732c58b1c51e61c7 /lualib
parent5d0a04467c748f9bf185737b33b1fbaf54e5333d (diff)
parent7c13830ca02e80e332641f66ea6b0c47c0ae8c94 (diff)
downloadrspamd-3807688a67be66d00a24172c13b00b6fb1816d69.tar.gz
rspamd-3807688a67be66d00a24172c13b00b6fb1816d69.zip
Merge pull request #2451 from negram/coroutines-tcp
Coroutines tcp
Diffstat (limited to 'lualib')
-rw-r--r--lualib/lua_tcp_sync.lua213
1 files changed, 213 insertions, 0 deletions
diff --git a/lualib/lua_tcp_sync.lua b/lualib/lua_tcp_sync.lua
new file mode 100644
index 000000000..a85a3f974
--- /dev/null
+++ b/lualib/lua_tcp_sync.lua
@@ -0,0 +1,213 @@
+local rspamd_tcp = require "rspamd_tcp"
+local lua_util = require "lua_util"
+
+local exports = {}
+local N = 'tcp_sync'
+
+local tcp_sync = {_conn = nil, _data = '', _eof = false, _addr = ''}
+local metatable = {
+ __tostring = function (self)
+ return "class {tcp_sync connect to: " .. self._addr .. "}"
+ end
+}
+
+function tcp_sync.new(connection)
+ local self = {}
+
+ for name, method in pairs(tcp_sync) do
+ if name ~= 'new' then
+ self[name] = method
+ end
+ end
+
+ self._conn = connection
+
+ setmetatable(self, metatable)
+
+ return self
+end
+
+--[[[
+-- @method tcp_sync.read_once()
+--
+-- Acts exactly like low-level tcp_sync.read_once()
+-- the only exception is that if there is some pending data,
+-- it's returned immediately and no underlying call is performed
+--
+-- @return
+-- true, {data} if everything is fine
+-- false, {error message} otherwise
+--
+--]]
+function tcp_sync:read_once()
+ local is_ok, data
+ if self._data:len() > 0 then
+ data = self._data
+ self._data = nil
+ return true, data
+ end
+
+ is_ok, data = self._conn:read_once()
+
+ return is_ok, data
+end
+
+--[[[
+-- @method tcp_sync.read_until(pattern)
+--
+-- Reads data from the connection until pattern is found
+-- returns all bytes before the pattern
+--
+-- @param {pattern} Read data until pattern is found
+-- @return
+-- true, {data} if everything is fine
+-- false, {error message} otherwise
+-- @example
+--
+--]]
+function tcp_sync:read_until(pattern)
+ repeat
+ local pos_start, pos_end = self._data:find(pattern, 1, true)
+ if pos_start then
+ local data = self._data:sub(1, pos_start - 1)
+ self._data = self._data:sub(pos_end + 1)
+ return true, data
+ end
+
+ local is_ok, more_data = self._conn:read_once()
+ if not is_ok then
+ return is_ok, more_data
+ end
+
+ self._data = self._data .. more_data
+ until false
+end
+
+--[[[
+-- @method tcp_sync.read_bytes(n)
+--
+-- Reads {n} bytes from the stream
+--
+-- @param {n} Number of bytes to read
+-- @return
+-- true, {data} if everything is fine
+-- false, {error message} otherwise
+--
+--]]
+function tcp_sync:read_bytes(n)
+ repeat
+ if self._data:len() >= n then
+ local data = self._data:sub(1, n)
+ self._data = self._data:sub(n + 1)
+ return true, data
+ end
+
+ local is_ok, more_data = self._conn:read_once()
+ if not is_ok then
+ return is_ok, more_data
+ end
+
+ self._data = self._data .. more_data
+ until false
+end
+
+--[[[
+-- @method tcp_sync.read_until_eof(n)
+--
+-- Reads stream until EOF is reached
+--
+-- @return
+-- true, {data} if everything is fine
+-- false, {error message} otherwise
+--
+--]]
+function tcp_sync:read_until_eof()
+ while not self:eof() do
+ local is_ok, more_data = self._conn:read_once()
+ if not is_ok then
+ if self:eof() then
+ -- this error is EOF (connection terminated)
+ -- exactly what we were waiting for
+ break
+ end
+ return is_ok, more_data
+ end
+ self._data = self._data .. more_data
+ end
+
+ local data = self._data
+ self._data = ''
+ return true, data
+end
+
+--[[[
+-- @method tcp_sync.write(n)
+--
+-- Writes data into the stream.
+--
+-- @return
+-- true if everything is fine
+-- false, {error message} otherwise
+--
+--]]
+function tcp_sync:write(data)
+ return self._conn:write(data)
+end
+
+--[[[
+-- @method tcp_sync.close()
+--
+-- Closes the connection. If the connection was created with task,
+-- this method is called automatically as soon as the task is done
+-- Calling this method helps to prevent connections leak.
+-- The object is finally destroyed by garbage collector.
+--
+-- @return
+--
+--]]
+function tcp_sync:close()
+ return self._conn:close()
+end
+
+--[[[
+-- @method tcp_sync.eof()
+--
+-- @return
+-- true if last "read" operation ended with EOF
+-- false otherwise
+--
+--]]
+function tcp_sync:eof()
+ if not self._eof and self._conn:eof() then
+ self._eof = true
+ end
+ return self._eof
+end
+
+--[[[
+-- @function tcp_sync.shutdown(n)
+--
+-- half-close socket
+--
+-- @return
+--
+--]]
+function tcp_sync:shutdown()
+ return self._conn:shutdown()
+end
+
+exports.connect = function (args)
+ local is_ok, connection = rspamd_tcp.connect_sync(args)
+ if not is_ok then
+ return is_ok, connection
+ end
+
+ local instance = tcp_sync.new(connection)
+ instance._addr = string.format("%s:%s", tostring(args.host), tostring(args.port))
+
+ lua_util.debugm(N, args.task, 'Connected to %s', instance._addr)
+
+ return true, instance
+end
+
+return exports \ No newline at end of file