aboutsummaryrefslogtreecommitdiffstats
path: root/lualib/lua_tcp_sync.lua
diff options
context:
space:
mode:
authorMikhail Galanin <mgalanin@mimecast.com>2018-08-30 16:50:55 +0100
committerMikhail Galanin <mgalanin@mimecast.com>2018-08-30 16:50:55 +0100
commita78803aeb558c0ebb9ada2a0f71f960ac31f373d (patch)
tree340ec4be7267235b779025f6d4db745ec714483b /lualib/lua_tcp_sync.lua
parent112ed7966d7fcaf7f0a16a79b0af51e7d339e744 (diff)
downloadrspamd-a78803aeb558c0ebb9ada2a0f71f960ac31f373d.tar.gz
rspamd-a78803aeb558c0ebb9ada2a0f71f960ac31f373d.zip
[Minor] Added coroutines support for TCP library
Diffstat (limited to 'lualib/lua_tcp_sync.lua')
-rw-r--r--lualib/lua_tcp_sync.lua213
1 files changed, 213 insertions, 0 deletions
diff --git a/lualib/lua_tcp_sync.lua b/lualib/lua_tcp_sync.lua
new file mode 100644
index 000000000..b506338f1
--- /dev/null
+++ b/lualib/lua_tcp_sync.lua
@@ -0,0 +1,213 @@
+local rspamd_tcp = require "rspamd_tcp"
+local lua_util = require "lua_util"
+
+local exports = {}
+local N = 'tcp_sync'
+
+local tcp_sync = {_conn = nil, _data = '', _eof = false, _addr = ''}
+local metatable = {
+ __tostring = function (self)
+ return "class {tcp_sync connect to: " .. self._addr .. "}"
+ end
+}
+
+function tcp_sync.new(connection)
+ local self = {}
+
+ for name, method in pairs(tcp_sync) do
+ if name ~= 'new' then
+ self[name] = method
+ end
+ end
+
+ self._conn = connection
+
+ setmetatable(self, metatable)
+
+ return self
+end
+
+--[[[
+-- @function tcp_sync.read_once()
+--
+-- Acts exactly like low-level tcp_sync.read_once()
+-- the only exception is that if there is some pending data,
+-- it's returned immediately and no underlying call is performed
+--
+-- @return
+-- true, {data} if everything is fine
+-- false, {error message} otherwise
+--
+--]]
+function tcp_sync:read_once()
+ local is_ok, data
+ if self._data:len() > 0 then
+ data = self._data
+ self._data = nil
+ return true, data
+ end
+
+ is_ok, data = self._conn:read_once()
+
+ return is_ok, data
+end
+
+--[[[
+-- @function tcp_sync.read_until(pattern)
+--
+-- Reads data from the connection until pattern is found
+-- returns all bytes before the pattern
+--
+-- @param {pattern} Read data until pattern is found
+-- @return
+-- true, {data} if everything is fine
+-- false, {error message} otherwise
+-- @example
+--
+--]]
+function tcp_sync:read_until(pattern)
+ repeat
+ local pos_start, pos_end = self._data:find(pattern, 1, true)
+ if pos_start then
+ local data = self._data:sub(1, pos_start - 1)
+ self._data = self._data:sub(pos_end + 1)
+ return true, data
+ end
+
+ local is_ok, more_data = self._conn:read_once()
+ if not is_ok then
+ return is_ok, more_data
+ end
+
+ self._data = self._data .. more_data
+ until false
+end
+
+--[[[
+-- @function tcp_sync.read_bytes(n)
+--
+-- Reads {n} bytes from the stream
+--
+-- @param {n} Number of bytes to read
+-- @return
+-- true, {data} if everything is fine
+-- false, {error message} otherwise
+--
+--]]
+function tcp_sync:read_bytes(n)
+ repeat
+ if self._data:len() >= n then
+ local data = self._data:sub(1, n)
+ self._data = self._data:sub(n + 1)
+ return true, data
+ end
+
+ local is_ok, more_data = self._conn:read_once()
+ if not is_ok then
+ return is_ok, more_data
+ end
+
+ self._data = self._data .. more_data
+ until false
+end
+
+--[[[
+-- @function tcp_sync.read_until_eof(n)
+--
+-- Reads stream until EOF is reached
+--
+-- @return
+-- true, {data} if everything is fine
+-- false, {error message} otherwise
+--
+--]]
+function tcp_sync:read_until_eof()
+ while not self:eof() do
+ local is_ok, more_data = self._conn:read_once()
+ if not is_ok then
+ if self:eof() then
+ -- this error is EOF (connection terminated)
+ -- exactly what we were waiting for
+ break
+ end
+ return is_ok, more_data
+ end
+ self._data = self._data .. more_data
+ end
+
+ local data = self._data
+ self._data = ''
+ return true, data
+end
+
+--[[[
+-- @function tcp_sync.write(n)
+--
+-- Writes data into the stream.
+--
+-- @return
+-- true if everything is fine
+-- false, {error message} otherwise
+--
+--]]
+function tcp_sync:write(data)
+ return self._conn:write(data)
+end
+
+--[[[
+-- @function tcp_sync.close()
+--
+-- Closes the connection. If the connection was created with task,
+-- this method is called automatically as soon as the task is done
+-- Calling this method helps to prevent connections leak.
+-- The object is finally destroyed by garbage collector.
+--
+-- @return
+--
+--]]
+function tcp_sync:close()
+ return self._conn:close()
+end
+
+--[[[
+-- @function tcp_sync.eof()
+--
+-- @return
+-- true if last "read" operation ended with EOF
+-- false otherwise
+--
+--]]
+function tcp_sync:eof()
+ if not self._eof and self._conn:eof() then
+ self._eof = true
+ end
+ return self._eof
+end
+
+--[[[
+-- @function tcp_sync.shutdown(n)
+--
+-- half-close socket
+--
+-- @return
+--
+--]]
+function tcp_sync:shutdown()
+ return self._conn:shutdown()
+end
+
+exports.connect = function (args)
+ local is_ok, connection = rspamd_tcp.connect_sync(args)
+ if not is_ok then
+ return is_ok, connection
+ end
+
+ local instance = tcp_sync.new(connection)
+ instance._addr = string.format("%s:%s", tostring(args.host), tostring(args.port))
+
+ lua_util.debugm(N, args.task, 'Connected to %s', instance._addr)
+
+ return true, instance
+end
+
+return exports \ No newline at end of file