You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

lua_tcp_sync.lua 4.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. local rspamd_tcp = require "rspamd_tcp"
  2. local lua_util = require "lua_util"
  3. local exports = {}
  4. local N = 'tcp_sync'
  5. local tcp_sync = { _conn = nil, _data = '', _eof = false, _addr = '' }
  6. local metatable = {
  7. __tostring = function(self)
  8. return "class {tcp_sync connect to: " .. self._addr .. "}"
  9. end
  10. }
  11. function tcp_sync.new(connection)
  12. local self = {}
  13. for name, method in pairs(tcp_sync) do
  14. if name ~= 'new' then
  15. self[name] = method
  16. end
  17. end
  18. self._conn = connection
  19. setmetatable(self, metatable)
  20. return self
  21. end
  22. --[[[
  23. -- @method tcp_sync.read_once()
  24. --
  25. -- Acts exactly like low-level tcp_sync.read_once()
  26. -- the only exception is that if there is some pending data,
  27. -- it's returned immediately and no underlying call is performed
  28. --
  29. -- @return
  30. -- true, {data} if everything is fine
  31. -- false, {error message} otherwise
  32. --
  33. --]]
  34. function tcp_sync:read_once()
  35. local is_ok, data
  36. if self._data:len() > 0 then
  37. data = self._data
  38. self._data = nil
  39. return true, data
  40. end
  41. is_ok, data = self._conn:read_once()
  42. return is_ok, data
  43. end
  44. --[[[
  45. -- @method tcp_sync.read_until(pattern)
  46. --
  47. -- Reads data from the connection until pattern is found
  48. -- returns all bytes before the pattern
  49. --
  50. -- @param {pattern} Read data until pattern is found
  51. -- @return
  52. -- true, {data} if everything is fine
  53. -- false, {error message} otherwise
  54. -- @example
  55. --
  56. --]]
  57. function tcp_sync:read_until(pattern)
  58. repeat
  59. local pos_start, pos_end = self._data:find(pattern, 1, true)
  60. if pos_start then
  61. local data = self._data:sub(1, pos_start - 1)
  62. self._data = self._data:sub(pos_end + 1)
  63. return true, data
  64. end
  65. local is_ok, more_data = self._conn:read_once()
  66. if not is_ok then
  67. return is_ok, more_data
  68. end
  69. self._data = self._data .. more_data
  70. until false
  71. end
  72. --[[[
  73. -- @method tcp_sync.read_bytes(n)
  74. --
  75. -- Reads {n} bytes from the stream
  76. --
  77. -- @param {n} Number of bytes to read
  78. -- @return
  79. -- true, {data} if everything is fine
  80. -- false, {error message} otherwise
  81. --
  82. --]]
  83. function tcp_sync:read_bytes(n)
  84. repeat
  85. if self._data:len() >= n then
  86. local data = self._data:sub(1, n)
  87. self._data = self._data:sub(n + 1)
  88. return true, data
  89. end
  90. local is_ok, more_data = self._conn:read_once()
  91. if not is_ok then
  92. return is_ok, more_data
  93. end
  94. self._data = self._data .. more_data
  95. until false
  96. end
  97. --[[[
  98. -- @method tcp_sync.read_until_eof(n)
  99. --
  100. -- Reads stream until EOF is reached
  101. --
  102. -- @return
  103. -- true, {data} if everything is fine
  104. -- false, {error message} otherwise
  105. --
  106. --]]
  107. function tcp_sync:read_until_eof()
  108. while not self:eof() do
  109. local is_ok, more_data = self._conn:read_once()
  110. if not is_ok then
  111. if self:eof() then
  112. -- this error is EOF (connection terminated)
  113. -- exactly what we were waiting for
  114. break
  115. end
  116. return is_ok, more_data
  117. end
  118. self._data = self._data .. more_data
  119. end
  120. local data = self._data
  121. self._data = ''
  122. return true, data
  123. end
  124. --[[[
  125. -- @method tcp_sync.write(n)
  126. --
  127. -- Writes data into the stream.
  128. --
  129. -- @return
  130. -- true if everything is fine
  131. -- false, {error message} otherwise
  132. --
  133. --]]
  134. function tcp_sync:write(data)
  135. return self._conn:write(data)
  136. end
  137. --[[[
  138. -- @method tcp_sync.close()
  139. --
  140. -- Closes the connection. If the connection was created with task,
  141. -- this method is called automatically as soon as the task is done
  142. -- Calling this method helps to prevent connections leak.
  143. -- The object is finally destroyed by garbage collector.
  144. --
  145. -- @return
  146. --
  147. --]]
  148. function tcp_sync:close()
  149. return self._conn:close()
  150. end
  151. --[[[
  152. -- @method tcp_sync.eof()
  153. --
  154. -- @return
  155. -- true if last "read" operation ended with EOF
  156. -- false otherwise
  157. --
  158. --]]
  159. function tcp_sync:eof()
  160. if not self._eof and self._conn:eof() then
  161. self._eof = true
  162. end
  163. return self._eof
  164. end
  165. --[[[
  166. -- @function tcp_sync.shutdown(n)
  167. --
  168. -- half-close socket
  169. --
  170. -- @return
  171. --
  172. --]]
  173. function tcp_sync:shutdown()
  174. return self._conn:shutdown()
  175. end
  176. exports.connect = function(args)
  177. local is_ok, connection = rspamd_tcp.connect_sync(args)
  178. if not is_ok then
  179. return is_ok, connection
  180. end
  181. local instance = tcp_sync.new(connection)
  182. instance._addr = string.format("%s:%s", tostring(args.host), tostring(args.port))
  183. lua_util.debugm(N, args.task, 'Connected to %s', instance._addr)
  184. return true, instance
  185. end
  186. return exports